Planetary-scale answers, unlocked.
A Hands-On Guide for Working with Large-Scale Spatial Data. Learn more.
Authors
I’ve been doing geospatial work for over twenty years now. I’ve hand-rolled ETL pipelines, babysat cron jobs, and debugged more coordinate system mismatches than a person should reasonably endure in one lifetime. So when someone says “streaming spatial data,” my first reaction used to be something between a deep sigh and a nervous laugh.
Here’s the thing: streaming tabular data with Spark Structured Streaming is a well-trodden path. There are tutorials everywhere. But streaming spatial data where you need to create geometry objects, apply spatial business rules, and land the results in a format that actually understands what a Point is (pun definitely intended); that’s where most guides quietly end.
This post walks through a complete, working pipeline that takes raw telemetry data (fleet tracking, asset monitoring, IoT sensors, etc.) and streams it into a Wherobots managed Iceberg tables with full Sedona geometry support. No hand-waving… Real code… The kind of thing you can run on Wherobots Cloud this afternoon.
The architecture is intentionally straightforward. Three components, one direction, no backflips required:
The generator that produces the Parquet files is a separate concern; any system that drops Parquet into S3 works. We’re focused on the Wherobots side: how you consume, transform, and catalog streaming spatial data.
The source data uses a flat schema — no geometry column yet. That’s deliberate. Most real-world telemetry arrives as plain numbers (latitude, longitude, altitude), not as WKB or WKT geometry strings. The spatial enrichment happens during ingest.
event_id
timestamp
x
y
z
speed_mph
heading
operator_name
asset_id
asset_type
status
Eleven columns. No geometry. That’s what Sedona is for.
The first thing the ingest job does is create a SedonaContext. On Wherobots Cloud, Spark is already pre-initialized — you just need to wrap it with Sedona capabilities:
SedonaContext
from sedona.spark import * config = SedonaContext.builder().getOrCreate() sedona = SedonaContext.create(config)
Before the stream starts, we make sure the target schema and table exist. This is idempotent — safe to run every time the job starts:
sedona.sql("CREATE SCHEMA IF NOT EXISTS wherobots.streaming_demo") sedona.sql(""" CREATE TABLE IF NOT EXISTS wherobots.streaming_demo.asset_tracks ( event_id STRING, timestamp TIMESTAMP, x DOUBLE, y DOUBLE, z DOUBLE, speed_mph DOUBLE, heading DOUBLE, operator_name STRING, asset_id STRING, asset_type STRING, status STRING, geometry GEOMETRY, is_speeding BOOLEAN, source_file STRING, batch_id LONG, job_run_id STRING ) """)
One thing to call out here:
The table has more columns than the source. The source Parquet has 11 fields. The table has 16. The extra five (geometry, is_speeding, source_file, batch_id, job_run_id) are all derived during the ingest process. This is the whole point; we’re enriching raw telemetry into spatially-aware, business-rule-tagged, lineage-tracked records.
geometry
is_speeding
source_file
batch_id
job_run_id
This is where the real work happens, and honestly it’s surprisingly compact. Here’s the transform that runs on every micro-batch:
transformed = raw.selectExpr( "event_id", "timestamp", "x", "y", "z", "speed_mph", "heading", "operator_name", "asset_id", "asset_type", "status", "ST_SetSRID(ST_PointZM(x, y, z, unix_timestamp(timestamp)), 4326) AS geometry", "CASE WHEN speed_mph > 65 THEN true ELSE false END AS is_speeding", "input_file_name() AS source_file", )
Three derived columns in one selectExpr. Let’s break them down.
selectExpr
ST_SetSRID(ST_PointZM(x, y, z, unix_timestamp(timestamp)), 4326) AS geometry
ST_PointZM creates a 4D point geometry:
ST_PointZM
This is a pattern I’ve come to appreciate. By encoding time as the M value, each point carries its full spatiotemporal context in the geometry itself. If you later need to compute distances or trajectories, the temporal ordering is baked right into the geometry. We wrap the whole thing in ST_SetSRID(..., 4326) to declare it’s in WGS 84.
ST_SetSRID(..., 4326)
CASE WHEN speed_mph > 65 THEN true ELSE false END AS is_speeding
Nothing fancy. Speed over 65 mph? You’re speeding. This is computed once, during ingest, and stored in the catalog. Downstream consumers (dashboards, notebooks, APIs) read it directly — no need to recompute. This is the “silver layer” principle applied to streaming: apply your business rules at ingest time so the data is analysis-ready when it lands.
Here’s where things get spatial. The is_speeding flag is a simple column-level rule — no external data needed. But what about spatial business rules that depend on other geometry? Things like: “is this asset inside a school zone buffer?” or “did this truck enter a restricted area?”
That’s where Wherobots in_fence column. During each micro-batch, we left-join the asset points against a table of geofence buffer polygons using ST_Intersects. If a point intersects any buffer, in_fence = true.
in_fence
ST_Intersects
in_fence = true
foreachBatch
Your first instinct might be to add the geofence check in the selectExpr alongside the spatial transform. I tried that. Spark won’t let you join a streaming DataFrame against a catalog table directly — the streaming DF is “unresolved” and the join planner doesn’t know what to do with it. The foreachBatch callback is the escape hatch: by the time your function is called, the DataFrame is fully materialized (static), so you can join it against anything.
You might also try createOrReplaceTempView so you can write the join as SQL. That works locally, but on Wherobots Cloud it fails with TABLE_OR_VIEW_NOT_FOUND because the internal DROP that precedes the CREATE hits the catalog resolver. DataFrame API join avoids this entirely.
createOrReplaceTempView
TABLE_OR_VIEW_NOT_FOUND
The geofence table is small (typically dozens to a few hundred buffer polygons) and it rarely changes while the stream is running. Re-reading it from the catalog on every micro-batch would be wasteful. Instead, we load it once at startup, broadcast it so every executor gets a local copy, and cache it so Spark doesn’t re-evaluate it:
geofence_df = ( spark.table(GEOFENCE_TABLE) .select(F.col("geometry").alias("fence_geom")) .cache() ) geofence_df.count() # materialize the cache
F.broadcast() tells Spark to use a broadcast hash join instead of a shuffle join and the small geofence DataFrame gets shipped to every executor and held in memory, so the join against each micro-batch is local and fast. The .cache() ensures the table read only happens once. The .count() call forces materialization so the cache is warm before the first batch arrives.
F.broadcast()
.cache()
.count()
One tradeoff: if you update the geofence table while the stream is running, the stream won’t see the changes until you restart. For a table that changes rarely (new school zones don’t appear every 30 seconds), that’s a fine deal.
# geofence_df is the cached DataFrame loaded at startup. # F.broadcast() here — at the join site — so Spark sees it as # part of the join plan and uses a broadcast hash join (no shuffle). df = ( df.alias("a") .join( F.broadcast(geofence_df).alias("b"), F.expr("ST_Intersects(a.geometry, b.fence_geom)"), "left", ) .select("a.*", F.expr("b.fence_geom IS NOT NULL AS in_fence")) )
A few things to note:
fence_geom
in_fence = false
IS NOT NULL
input_file_name() AS source_file
input_file_name() is a Spark SQL function that resolves to the path of the source file for each row. It has to be called in the selectExpr during the read phase — before foreachBatch materializes the DataFrame — because the file metadata is lost after materialization. This gives you row-level lineage back to the exact Parquet file that produced each record. Useful for debugging, auditing, and answering the question “where did this data come from?”
input_file_name()
The write side uses Spark’s foreachBatch pattern, which gives you a regular DataFrame and a batch ID for each micro-batch:
def write_batch(df, batch_id): count = df.count() if count > 0: df = ( df.withColumn("batch_id", F.lit(batch_id).cast("long")) .withColumn("job_run_id", F.lit(JOB_RUN_ID)) ) df.writeTo(CATALOG_TABLE).append()
Two columns are added inside foreachBatch because they can’t be computed earlier:
WBC_LABEL_product_instance_id
Together, batch_id + job_run_id give you globally unique batch identification across job runs. Since batch_id resets to 0 on every restart, job_run_id is what makes cross-run analysis possible.
df.writeTo(CATALOG_TABLE).append()
Not df.write.format("iceberg").mode("append").save(path). The .writeTo().append() pattern uses the catalog-managed table reference, which means Wherobots handles the underlying storage location, metadata, and Iceberg commit lifecycle. It’s cleaner, and it’s how catalog-native writes are meant to work.
df.write.format("iceberg").mode("append").save(path)
.writeTo().append()
The streaming query connects the read, transform, and write stages:
query = ( transformed.writeStream .foreachBatch(write_batch) .option("checkpointLocation", CHECKPOINT_PATH) .trigger(processingTime="30 seconds") .start() )
The checkpoint location (an S3 path) tracks which files have already been processed. If the job restarts, it picks up exactly where it left off — no duplicate processing, no missed files. The trigger interval of 30 seconds means Spark checks for new files twice a minute.
Once data starts landing in the catalog, a Wherobots notebook can read it directly:
df = sedona.table("wherobots.streaming_demo.asset_tracks")
That’s it. No path management, no format specification, no schema declaration. The catalog knows what the table is, where it lives, and what the schema looks like.
The viewer notebook computes summary statistics including lineage metrics:
stats = df.agg( F.count("*").alias("total_records"), F.countDistinct("asset_id").alias("unique_assets"), F.countDistinct("batch_id").alias("unique_batches"), F.countDistinct("job_run_id").alias("unique_job_runs"), F.countDistinct("source_file").alias("unique_source_files"), F.min("timestamp").alias("earliest_event"), F.max("timestamp").alias("latest_event"), F.round(F.avg("speed_mph"), 1).alias("avg_speed_mph"), )
Because we captured batch_id, job_run_id, and source_file during ingest, we can inspect exactly what landed in each batch:
df.groupBy("job_run_id", "batch_id").agg( F.count("*").alias("records"), F.countDistinct("source_file").alias("files_in_batch"), F.min("timestamp").alias("first_event"), F.max("timestamp").alias("last_event"), ).orderBy("job_run_id", "batch_id").show(50, truncate=False)
A quick matplotlib chart shows the volume of each micro-batch, which is useful for understanding whether your trigger interval and maxFilesPerTrigger settings are well-tuned:
maxFilesPerTrigger
import matplotlib.pyplot as plt batch_pdf = ( df.groupBy("batch_id") .agg(F.count("*").alias("records")) .orderBy("batch_id") .toPandas() ) fig, ax = plt.subplots(figsize=(12, 4)) ax.bar(batch_pdf["batch_id"], batch_pdf["records"], color="steelblue") ax.set_xlabel("Batch ID") ax.set_ylabel("Records") ax.set_title("Records per Batch") plt.tight_layout() plt.show()
The payoff. A PyDeck map over Iowa with CARTO’s dark-matter basemap, two stacked layers, and a four-color scheme that encodes both is_speeding and in_fence at a glance:
Points that hit a geofence get a larger radius (24px, or 30px if also speeding) so they pop out of the sea of blue dots.
def assign_color(row): if row["is_speeding"] and row["in_fence"]: return (200, 30, 200) # magenta — both violations elif row["in_fence"]: return (255, 140, 0) # orange — geofence only elif row["is_speeding"]: return (220, 50, 50) # red — speeding only else: return (30, 100, 220) # blue — normal sample_pdf["radius"] = sample_pdf.apply( lambda r: 30 if (r["is_speeding"] and r["in_fence"]) else 24 if r["in_fence"] else 12, axis=1, )
The map itself uses two layers stacked together. Underneath, a PolygonLayer renders the geofence buffer polygons as semi-transparent orange regions, so you can see the zones that triggered violations. On top, the ScatterplotLayer plots every asset point with the four-color scheme:
PolygonLayer
ScatterplotLayer
import pydeck as pdk geofence_layer = pdk.Layer( "PolygonLayer", data=geofence_pdf, get_polygon="coordinates", get_fill_color=[255, 140, 0, 40], get_line_color=[255, 140, 0, 160], get_line_width=2, pickable=True, ) points_layer = pdk.Layer( "ScatterplotLayer", data=sample_pdf, get_position=["x", "y"], get_fill_color=["color_r", "color_g", "color_b", 180], get_radius="radius", pickable=True, auto_highlight=True, ) deck = pdk.Deck( layers=[geofence_layer, points_layer], initial_view_state=pdk.ViewState(latitude=41.9, longitude=-93.4, zoom=6.5), tooltip={ "text": "{asset_id} ({asset_type})\\n{operator_name}\\nSpeed: {speed_mph} mph\\nSpeeding: {is_speeding}\\nIn Fence: {in_fence}\\nBatch: {batch_id}" }, map_style="<https://basemaps.cartocdn.com/gl/dark-matter-gl-style/style.json>", ) deck.show()
The sample is capped at 50,000 points for rendering performance. And because both is_speeding and in_fence are already in the catalog, we’re just reading them — not recomputing anything in the notebook. That’s the whole point of doing the enrichment at ingest time.
Let me step back and talk about why this approach is worth your time, beyond the “hey, cool map” factor.
In a lot of organizations, raw coordinates sit in tables as plain doubles, and every analyst who needs geometry has to create it themselves. That means everyone is writing their own ST_Point calls, hoping they got the coordinate order right, and probably not setting the SRID. Baking the geometry creation into the ingest pipeline means it’s done once, correctly, and everyone downstream gets a proper GEOMETRY column with SRID 4326.
ST_Point
GEOMETRY
The is_speeding flag is one example, but we went further. The in_fence column demonstrates the same principle with a spatial join: during each micro-batch, every point is checked against a table of geofence buffer polygons using ST_Intersects. Speed violations, geofence containment, proximity to restricted areas are all computed during ingest using Sedona on Wherobots and stored as columns in the Iceberg table. Downstream consumers don’t need to know the logic. They just filter on a boolean.
source_file, batch_id, and job_run_id give you three levels of traceability — file, batch, and job run — without deploying a separate lineage system. When someone asks “where did this record come from?”, you can answer with a SQL query against the same table.
This is the part that would have made 2010-me cry tears of joy. The GEOMETRY column in a Iceberg table is a native type (as of Iceberg v3), not a serialized blob or a WKT string. Spatial predicates can push down to the storage layer. You can query the table with ST_Contains, ST_DWithin, ST_Intersects — the full Sedona function catalog — and the Iceberg metadata helps prune partitions that can’t possibly match. That’s not something you get from storing WKT in a STRING column in a regular Parquet table.
ST_Contains
ST_DWithin
sedona.table("wherobots.streaming_demo.asset_tracks") is all a downstream consumer needs. No S3 paths, no Parquet glob patterns, no format hints. The Wherobots catalog manages the table lifecycle: schema evolution, ACID transactions, snapshot isolation, time travel. If you’ve ever debugged a pipeline that broke because someone renamed an S3 prefix, you understand why this matters.
sedona.table("wherobots.streaming_demo.asset_tracks")
readStream
writeStream
ST_PointZ
ST_SetSRID
.write.format().save()
Daniel Smith is a Solution Architect at Wherobots with over two decades of experience in geospatial technology. He builds demos, helps customers, breaks things, fixes them, and writes about what he learns from time to time. When he’s not wrestling with coordinate reference systems or touching things he shouldn’t, he’s probably skating or convincing his kids that geography is, in fact, the coolest subject. You can find him on LinkedIn.
Ready to stream spatial data into your own Wherobots catalog?
Get Started with Wherobots
Introducing RasterFlow: a planetary scale inference engine for Earth Intelligence
RasterFlow takes insights and embeddings from satellite and overhead imagery datasets into Apache Iceberg tables, with ease and efficiency at any scale.
Take-aways from the 2026 Geospatial Embeddings Workshop at Clark University
Some brief take-aways from a workshop to set standards for storing and sharing geospatial embeddings.
Introducing developer tools that let AI build with physical world data
Your AI can now understand and query spatial data using the Wherobots MCP server, VS Code extension, and CLI.
How Agricultural Fields Change in AlphaEarth Foundations
Explore how Alpha Earth Embeddings reveal crop cycles and field change over time using RGB views, PCA, and embedding-distance analysis.
share this article
Awesome that you’d like to share our articles. Where would you like to share it to: