Your AI can now contextualize physical world data using Wherobots Spatial AI Coding Tools Learn More

Streaming Spatial Data into Wherobots with Spark Structured Streaming

Authors

Real-time Spatial Pipelines Shouldn’t Be This Hard (But They Were)

I’ve been doing geospatial work for over twenty years now. I’ve hand-rolled ETL pipelines, babysat cron jobs, and debugged more coordinate system mismatches than a person should reasonably endure in one lifetime. So when someone says “streaming spatial data,” my first reaction used to be something between a deep sigh and a nervous laugh.

Here’s the thing: streaming tabular data with Spark Structured Streaming is a well-trodden path. There are tutorials everywhere. But streaming spatial data where you need to create geometry objects, apply spatial business rules, and land the results in a format that actually understands what a Point is (pun definitely intended); that’s where most guides quietly end.

This post walks through a complete, working pipeline that takes raw telemetry data (fleet tracking, asset monitoring, IoT sensors, etc.) and streams it into a Wherobots managed Iceberg tables with full Sedona geometry support. No hand-waving… Real code… The kind of thing you can run on Wherobots Cloud this afternoon.


What We’re Building

The architecture is intentionally straightforward. Three components, one direction, no backflips required:

  1. Data Source — Parquet files landing in an S3 bucket. Each file contains asset tracking events: lat/lon/altitude, speed, heading, operator, asset ID, and status. Think of this as the output from any fleet management system, IoT gateway, or sensor network.
  2. Streaming Ingest Job — A Spark Structured Streaming job running on Wherobots that watches the S3 path, picks up new Parquet files as they arrive, converts raw coordinates into Sedona geometry objects, applies business rules (like a speeding flag), and writes each micro-batch to an Iceberg table in the Wherobots catalog.
  3. Viewer Notebook — A Wherobots notebook that reads the catalog table and gives you stats, batch-level detail, a bar chart, and an interactive map colored by speeding status. The “so what” layer.
  4. A Downstream Application Layer — Once the data is prepared and ready for consumption, it can be fed into a downstream application or database that can serve the data to other consumers: a map, or another data processing system.

The generator that produces the Parquet files is a separate concern; any system that drops Parquet into S3 works. We’re focused on the Wherobots side: how you consume, transform, and catalog streaming spatial data.


The Source Schema

The source data uses a flat schema — no geometry column yet. That’s deliberate. Most real-world telemetry arrives as plain numbers (latitude, longitude, altitude), not as WKB or WKT geometry strings. The spatial enrichment happens during ingest.

ColumnTypeDescription
event_idSTRINGUnique event identifier
timestampTIMESTAMPEvent time
xDOUBLELongitude
yDOUBLELatitude
zDOUBLEAltitude (meters)
speed_mphDOUBLESpeed in miles per hour
headingDOUBLECompass heading (degrees)
operator_nameSTRINGFleet operator
asset_idSTRINGIndividual asset identifier
asset_typeSTRINGAsset category (e.g., truck, drone)
statusSTRINGOperational status

Eleven columns. No geometry. That’s what Sedona is for.


Setting Up the Streaming Ingest

SedonaContext Initialization

The first thing the ingest job does is create a SedonaContext. On Wherobots Cloud, Spark is already pre-initialized — you just need to wrap it with Sedona capabilities:

from sedona.spark import *

config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

Ensuring the Catalog Schema and Table Exist

Before the stream starts, we make sure the target schema and table exist. This is idempotent — safe to run every time the job starts:

sedona.sql("CREATE SCHEMA IF NOT EXISTS wherobots.streaming_demo")

sedona.sql("""
    CREATE TABLE IF NOT EXISTS wherobots.streaming_demo.asset_tracks (
        event_id STRING,
        timestamp TIMESTAMP,
        x DOUBLE,
        y DOUBLE,
        z DOUBLE,
        speed_mph DOUBLE,
        heading DOUBLE,
        operator_name STRING,
        asset_id STRING,
        asset_type STRING,
        status STRING,
        geometry GEOMETRY,
        is_speeding BOOLEAN,
        source_file STRING,
        batch_id LONG,
        job_run_id STRING
    ) 
""")

One thing to call out here:

The table has more columns than the source. The source Parquet has 11 fields. The table has 16. The extra five (geometry, is_speeding, source_file, batch_id, job_run_id) are all derived during the ingest process. This is the whole point; we’re enriching raw telemetry into spatially-aware, business-rule-tagged, lineage-tracked records.


The Spatial Transform

This is where the real work happens, and honestly it’s surprisingly compact. Here’s the transform that runs on every micro-batch:

transformed = raw.selectExpr(
    "event_id",
    "timestamp",
    "x", "y", "z",
    "speed_mph",
    "heading",
    "operator_name",
    "asset_id",
    "asset_type",
    "status",
    "ST_SetSRID(ST_PointZM(x, y, z, unix_timestamp(timestamp)), 4326) AS geometry",
    "CASE WHEN speed_mph > 65 THEN true ELSE false END AS is_speeding",
    "input_file_name() AS source_file",
)

Three derived columns in one selectExpr. Let’s break them down.

Creating Geometry with ST_PointZM

ST_SetSRID(ST_PointZM(x, y, z, unix_timestamp(timestamp)), 4326) AS geometry

ST_PointZM creates a 4D point geometry:

  • X = longitude
  • Y = latitude
  • Z = altitude in meters
  • M = the “measure” dimension — here we encode the unix timestamp

This is a pattern I’ve come to appreciate. By encoding time as the M value, each point carries its full spatiotemporal context in the geometry itself. If you later need to compute distances or trajectories, the temporal ordering is baked right into the geometry. We wrap the whole thing in ST_SetSRID(..., 4326) to declare it’s in WGS 84.

Business Rule: The Speeding Flag

CASE WHEN speed_mph > 65 THEN true ELSE false END AS is_speeding

Nothing fancy. Speed over 65 mph? You’re speeding. This is computed once, during ingest, and stored in the catalog. Downstream consumers (dashboards, notebooks, APIs) read it directly — no need to recompute. This is the “silver layer” principle applied to streaming: apply your business rules at ingest time so the data is analysis-ready when it lands.

Business Rule: Geofence Violation Detection

Here’s where things get spatial. The is_speeding flag is a simple column-level rule — no external data needed. But what about spatial business rules that depend on other geometry? Things like: “is this asset inside a school zone buffer?” or “did this truck enter a restricted area?”

That’s where Wherobots  in_fence column. During each micro-batch, we left-join the asset points against a table of geofence buffer polygons using ST_Intersects. If a point intersects any buffer, in_fence = true.

Why This Has to Happen Inside foreachBatch

Your first instinct might be to add the geofence check in the selectExpr alongside the spatial transform. I tried that. Spark won’t let you join a streaming DataFrame against a catalog table directly — the streaming DF is “unresolved” and the join planner doesn’t know what to do with it. The foreachBatch callback is the escape hatch: by the time your function is called, the DataFrame is fully materialized (static), so you can join it against anything.

You might also try createOrReplaceTempView so you can write the join as SQL. That works locally, but on Wherobots Cloud it fails with TABLE_OR_VIEW_NOT_FOUND because the internal DROP that precedes the CREATE hits the catalog resolver. DataFrame API join avoids this entirely.

Loading the Geofence Table: Broadcast + Cache

The geofence table is small (typically dozens to a few hundred buffer polygons) and it rarely changes while the stream is running. Re-reading it from the catalog on every micro-batch would be wasteful. Instead, we load it once at startup, broadcast it so every executor gets a local copy, and cache it so Spark doesn’t re-evaluate it:

geofence_df = (
    spark.table(GEOFENCE_TABLE)
    .select(F.col("geometry").alias("fence_geom"))
    .cache()
)
geofence_df.count()  # materialize the cache

F.broadcast() tells Spark to use a broadcast hash join instead of a shuffle join and the small geofence DataFrame gets shipped to every executor and held in memory, so the join against each micro-batch is local and fast. The .cache() ensures the table read only happens once. The .count() call forces materialization so the cache is warm before the first batch arrives.

One tradeoff: if you update the geofence table while the stream is running, the stream won’t see the changes until you restart. For a table that changes rarely (new school zones don’t appear every 30 seconds), that’s a fine deal.

The Spatial Join Pattern

# geofence_df is the cached DataFrame loaded at startup.
# F.broadcast() here — at the join site — so Spark sees it as
# part of the join plan and uses a broadcast hash join (no shuffle).
df = (
    df.alias("a")
    .join(
        F.broadcast(geofence_df).alias("b"),
        F.expr("ST_Intersects(a.geometry, b.fence_geom)"),
        "left",
    )
    .select("a.*", F.expr("b.fence_geom IS NOT NULL AS in_fence"))
)

A few things to note:

  • Alias the geometry column on the geofence side to fence_geom. Both tables have a geometry column; without the alias, Spark can’t resolve the join condition.
  • Left join ensures every asset point survives, even if it doesn’t intersect any fence. Non-matching points get in_fence = false.
  • The IS NOT NULL check on fence_geom is what converts the join result into a boolean flag.

File-Level Lineage with input_file_name()

input_file_name() AS source_file

input_file_name() is a Spark SQL function that resolves to the path of the source file for each row. It has to be called in the selectExpr during the read phase — before foreachBatch materializes the DataFrame — because the file metadata is lost after materialization. This gives you row-level lineage back to the exact Parquet file that produced each record. Useful for debugging, auditing, and answering the question “where did this data come from?”


The foreachBatch Writer

The write side uses Spark’s foreachBatch pattern, which gives you a regular DataFrame and a batch ID for each micro-batch:

def write_batch(df, batch_id):
    count = df.count()
    if count > 0:
        df = (
            df.withColumn("batch_id", F.lit(batch_id).cast("long"))
              .withColumn("job_run_id", F.lit(JOB_RUN_ID))
        )
        df.writeTo(CATALOG_TABLE).append()

Adding Batch and Job Run Lineage

Two columns are added inside foreachBatch because they can’t be computed earlier:

  • batch_id — Spark’s micro-batch identifier. Resets to 0 on each job run.
  • job_run_id — Pulled from the Wherobots environment variable WBC_LABEL_product_instance_id. This is the unique ID for each Wherobots Job Run, which means you can trace every record back to the specific job execution that produced it.

Together, batch_id + job_run_id give you globally unique batch identification across job runs. Since batch_id resets to 0 on every restart, job_run_id is what makes cross-run analysis possible.

The Wherobots-Idiomatic Write Pattern

df.writeTo(CATALOG_TABLE).append()

Not df.write.format("iceberg").mode("append").save(path). The .writeTo().append() pattern uses the catalog-managed table reference, which means Wherobots handles the underlying storage location, metadata, and Iceberg commit lifecycle. It’s cleaner, and it’s how catalog-native writes are meant to work.


Wiring It All Together

The streaming query connects the read, transform, and write stages:

query = (
    transformed.writeStream
    .foreachBatch(write_batch)
    .option("checkpointLocation", CHECKPOINT_PATH)
    .trigger(processingTime="30 seconds")
    .start()
)

The checkpoint location (an S3 path) tracks which files have already been processed. If the job restarts, it picks up exactly where it left off — no duplicate processing, no missed files. The trigger interval of 30 seconds means Spark checks for new files twice a minute.


Viewing the Results

Once data starts landing in the catalog, a Wherobots notebook can read it directly:

df = sedona.table("wherobots.streaming_demo.asset_tracks")

That’s it. No path management, no format specification, no schema declaration. The catalog knows what the table is, where it lives, and what the schema looks like.

Ingest Stats and Lineage

The viewer notebook computes summary statistics including lineage metrics:

stats = df.agg(
    F.count("*").alias("total_records"),
    F.countDistinct("asset_id").alias("unique_assets"),
    F.countDistinct("batch_id").alias("unique_batches"),
    F.countDistinct("job_run_id").alias("unique_job_runs"),
    F.countDistinct("source_file").alias("unique_source_files"),
    F.min("timestamp").alias("earliest_event"),
    F.max("timestamp").alias("latest_event"),
    F.round(F.avg("speed_mph"), 1).alias("avg_speed_mph"),
)

Batch-Level Detail

Because we captured batch_id, job_run_id, and source_file during ingest, we can inspect exactly what landed in each batch:

df.groupBy("job_run_id", "batch_id").agg(
    F.count("*").alias("records"),
    F.countDistinct("source_file").alias("files_in_batch"),
    F.min("timestamp").alias("first_event"),
    F.max("timestamp").alias("last_event"),
).orderBy("job_run_id", "batch_id").show(50, truncate=False)

Records per Batch — Bar Chart

A quick matplotlib chart shows the volume of each micro-batch, which is useful for understanding whether your trigger interval and maxFilesPerTrigger settings are well-tuned:

import matplotlib.pyplot as plt

batch_pdf = (
    df.groupBy("batch_id")
    .agg(F.count("*").alias("records"))
    .orderBy("batch_id")
    .toPandas()
)

fig, ax = plt.subplots(figsize=(12, 4))
ax.bar(batch_pdf["batch_id"], batch_pdf["records"], color="steelblue")
ax.set_xlabel("Batch ID")
ax.set_ylabel("Records")
ax.set_title("Records per Batch")
plt.tight_layout()
plt.show()

The Map: PyDeck with Speeding Visualization

The payoff. A PyDeck map over Iowa with CARTO’s dark-matter basemap, two stacked layers, and a four-color scheme that encodes both is_speeding and in_fence at a glance:

  • Blue — normal: not speeding, not in a geofence
  • Red  — speeding only
  • Orange  — geofence violation only
  • Magenta  — speeding and in a geofence

Points that hit a geofence get a larger radius (24px, or 30px if also speeding) so they pop out of the sea of blue dots.

def assign_color(row):
    if row["is_speeding"] and row["in_fence"]:
        return (200, 30, 200)   # magenta — both violations
    elif row["in_fence"]:
        return (255, 140, 0)    # orange — geofence only
    elif row["is_speeding"]:
        return (220, 50, 50)    # red — speeding only
    else:
        return (30, 100, 220)   # blue — normal

sample_pdf["radius"] = sample_pdf.apply(
    lambda r: 30 if (r["is_speeding"] and r["in_fence"])
              else 24 if r["in_fence"]
              else 12,
    axis=1,
)

The map itself uses two layers stacked together. Underneath, a PolygonLayer renders the geofence buffer polygons as semi-transparent orange regions, so you can see the zones that triggered violations. On top, the ScatterplotLayer plots every asset point with the four-color scheme:

import pydeck as pdk

geofence_layer = pdk.Layer(
    "PolygonLayer",
    data=geofence_pdf,
    get_polygon="coordinates",
    get_fill_color=[255, 140, 0, 40],
    get_line_color=[255, 140, 0, 160],
    get_line_width=2,
    pickable=True,
)

points_layer = pdk.Layer(
    "ScatterplotLayer",
    data=sample_pdf,
    get_position=["x", "y"],
    get_fill_color=["color_r", "color_g", "color_b", 180],
    get_radius="radius",
    pickable=True,
    auto_highlight=True,
)

deck = pdk.Deck(
    layers=[geofence_layer, points_layer],
    initial_view_state=pdk.ViewState(latitude=41.9, longitude=-93.4, zoom=6.5),
    tooltip={
        "text": "{asset_id} ({asset_type})\\n{operator_name}\\nSpeed: {speed_mph} mph\\nSpeeding: {is_speeding}\\nIn Fence: {in_fence}\\nBatch: {batch_id}"
    },
    map_style="<https://basemaps.cartocdn.com/gl/dark-matter-gl-style/style.json>",
)
deck.show()

The sample is capped at 50,000 points for rendering performance. And because both is_speeding and in_fence are already in the catalog, we’re just reading them — not recomputing anything in the notebook. That’s the whole point of doing the enrichment at ingest time.


Why This Pattern Matters

Let me step back and talk about why this approach is worth your time, beyond the “hey, cool map” factor.

Spatial Enrichment at Ingest, Not at Query Time

In a lot of organizations, raw coordinates sit in tables as plain doubles, and every analyst who needs geometry has to create it themselves. That means everyone is writing their own ST_Point calls, hoping they got the coordinate order right, and probably not setting the SRID. Baking the geometry creation into the ingest pipeline means it’s done once, correctly, and everyone downstream gets a proper GEOMETRY column with SRID 4326.

Business Rules as First-Class Columns

The is_speeding flag is one example, but we went further. The in_fence column demonstrates the same principle with a spatial join: during each micro-batch, every point is checked against a table of geofence buffer polygons using ST_Intersects. Speed violations, geofence containment, proximity to restricted areas are all computed during ingest using Sedona on Wherobots and stored as columns in the Iceberg table. Downstream consumers don’t need to know the logic. They just filter on a boolean.

Lineage Without Extra Infrastructure

source_file, batch_id, and job_run_id give you three levels of traceability — file, batch, and job run — without deploying a separate lineage system. When someone asks “where did this record come from?”, you can answer with a SQL query against the same table.

Iceberg: Geometry as a Native Type

This is the part that would have made 2010-me cry tears of joy. The GEOMETRY column in a Iceberg table is a native type (as of Iceberg v3), not a serialized blob or a WKT string. Spatial predicates can push down to the storage layer. You can query the table with ST_Contains, ST_DWithin, ST_Intersects — the full Sedona function catalog — and the Iceberg metadata helps prune partitions that can’t possibly match. That’s not something you get from storing WKT in a STRING column in a regular Parquet table.

Catalog-Managed Tables, Not Path-Managed Files

sedona.table("wherobots.streaming_demo.asset_tracks") is all a downstream consumer needs. No S3 paths, no Parquet glob patterns, no format hints. The Wherobots catalog manages the table lifecycle: schema evolution, ACID transactions, snapshot isolation, time travel. If you’ve ever debugged a pipeline that broke because someone renamed an S3 prefix, you understand why this matters.


Key Takeaways

  • Spark Structured Streaming works natively on Wherobots. You can use readStream / writeStream with foreachBatch just like you would in any Spark environment — but with Sedona spatial functions and Iceberg tables available out of the box.
  • Convert coordinates to geometry at ingest time. Use ST_PointZM (or ST_Point, ST_PointZ depending on your data) with ST_SetSRID to create proper geometry objects. Don’t make every downstream consumer do this.
  • Apply business rules during ingest. Flags like is_speeding, geofence containment, proximity alerts — compute them once and store them as columns. The silver layer principle applied to streaming.
  • Capture lineage in the stream. input_file_name() for file-level tracing, batch_id from foreachBatch, and WBC_LABEL_product_instance_id for job-run-level tracing. Three columns, zero additional infrastructure.
  • Write with .writeTo().append(), not .write.format().save(). Use the catalog-managed write pattern. Let Wherobots handle the storage details.

About the Author

Daniel Smith is a Solution Architect at Wherobots with over two decades of experience in geospatial technology. He builds demos, helps customers, breaks things, fixes them, and writes about what he learns from time to time. When he’s not wrestling with coordinate reference systems or touching things he shouldn’t, he’s probably skating or convincing his kids that geography is, in fact, the coolest subject. You can find him on LinkedIn.


Ready to stream spatial data into your own Wherobots catalog?

Get Started with Wherobots