What is Apache Sedona?

Apache Sedona Overview

Apache Sedona is a cluster computing system for processing large-scale spatial data. It treats spatial data as a first class citizen by extending the functionality of distributed compute frameworks like Apache Spark, Apache Flink, and Snowflake. Apache Sedona was created at Arizona State University under the name Geospark, in the paper “Spatial Data Management in Apache Spark: The GeoSpark Perspective and Beyond.”

Apache Sedona introduces data types, operations, and indexing techniques optimized for spatial workloads on top of Apache Spark and other distributed compute frameworks. Let’s take a look at the workflow for analyzing spatial data with Apache Sedona.

Apache Sedona Architecture

Spatial Query Processing

The first step in spatial query processing is to ingest geospatial data into Apache Sedona. Data can be loaded from various sources such as files (Shapefiles, GeoJSON, Parquet, GeoTiff, CSV, etc) or databases intro Apache Sedona’s in-memory distributed spatial data structures (typically the Spatial DataFrame).

Next, Sedona makes use of spatial indexing techniques to accelerate query processing, such as R-trees or Quad trees. The spatial index is used to partition the data into smaller, manageable units, enabling efficient data retrieval during query processing.

Once the data is loaded and indexed spatial queries can be executed using Sedona’s query execution engine. Sedona supports a wide range of spatial operations, such as spatial joins, distance calculations, and spatial aggregations.

Sedona optimizes spatial queries to improve performance. The query optimizer determines an efficient query plan by considering the spatial predicates, available indexes, and the distribution of data across the cluster.

Spatial queries are executed in a distributed manner using Sedona’s computational capabilities. The query execution engine distributes the query workload across the cluster, with each node processing a portion of the data. Intermediate results are combined to produce the final result set. Since spatial objects can be very complex with many coordinates and topology, Sedona implements a custom serializer for efficiently moving spatial data throughout the cluster.

Common Apache Sedona Use Cases

So what exactly are users doing with Apache Sedona? Here are some common examples of what users are doing with Apache Sedona:

  • Creating custom weather, climate, and environmental quality assessment reports at national scale by combining vector parcel data with environmental raster data products.
  • Generating planetary scale GeoParquet files for public dissemination via cloud storage by combining, cleaning, and indexing multiple datasets.
  • Converting billions of daily point telemetry observations into routes traveled by vehicles.
  • Enriching parcel level data with demographic and environmental data at the national level to feed into a real estate investment suitability analysis.

Many of these use case can be described as geospatial ETL operations. ETL (extract, transform, load) is a data integration process that involves retrieving data from various sources, transforming and combining these datasets, then loading the transformed data into a target system or format for reporting or further analysis. Geospatial ETL shares many of the same challenges and requirements of traditional ETL processes with the additional complexities of managing the geospatial component of the data, working with geospatial data sources and formats, spatial data types and transformations, as well as the scalability and performance considerations required for spatial operations such as joins based on spatial relationships. For a more complete overview of use cases with Apache Sedona, you can read our ebook on it here.

Community Adoption

Apache Sedona has gained significant community adoption and has become a popular geospatial analytics library within the distributed computing and big data ecosystem. As an Apache Software Foundation (ASF) incubator project, Sedona’s governance, licensing, and community participation align with ASF principles.

Sedona has an active and growing developer community, with contributors from a number of different types of organizations and over 100 individuals interested in advancing the state of geospatial analytics and distributed computing. Sedona has reach over 38 million downloads with a rate of 2 million downloads per month with usage growing at a rate of 200% per year (as of the date this was published).

Organizations in industries including transportation, urban planning, environmental monitoring, logistics, insurance and risk analysis and more have adopted Apache Sedona. These organizations leverage Sedona’s capabilities to perform large-scale geospatial analysis, extract insights from geospatial data and build geospatial analytical applications at scale. The industry adoption of Apache Sedona showcases its practical relevance and real-world utility.

Apache Sedona has been featured in conferences, workshops, and research publications related to geospatial analytics, distributed computing, and big data processing. These presentations and publications contribute to the awareness, visibility, and adoption both within the enterprise and within the research and academic communities.

Resources

As you get started with Apache Sedona the following resources will be useful throughout your journey in the world of large-scale geospatial data analytics.

The best place to start learning about Apache Sedona is the authoritative book on the topic, which was recently published in early release format “Cloud Native Geospatial Analytics with Apache Sedona”. The team behind the project will continue to release chapters until the book is complete over the coming months. You can get the latest version of the book here.

Want to keep up with the latest developer news from the Wherobots and Apache Sedona community? Sign up for the This Month In Wherobots Newsletter:

Havasu: A Table Format for Spatial Attributes in a Data Lake Architecture

Introduction

In the past decade, many organizations have been using BLOB storage (e.g., AWS S3) as a primary storage platform. These organizations collect tons of data and ingest it as files into S3 for its scalability, cost efficiency, and reliability. However, there has since been a need to interact with such data using SQL, which is familiar to developers, in a similar approach to querying relational databases. That led to the invention of open table formats such as Apache Iceberg, which enables users to perform wide-ranging database operations, including concurrent data updates, table metadata handling, time travel and versioning, on files stored in object stores such as S3 without the need to load such files in a relational database.

However, when dealing with spatial data existing open data formats such as Apache Iceberg, Hudi, and DeltaLake, do not provide native support for spatial attributes. This lack of native spatial support forces users to handle the spatial aspect of data at the application layer. This approach often cannot handle the intricacies and scale of spatial analytics and falls short in meeting the demands of analytical workloads leveraging spatial data. The significance of spatial analytics underscores the pressing need for its efficient management within the enterprise data stack, which includes managing spatial attributes in open table formats.

To remedy that, Havasu is an open table format that extends Apache Iceberg to support spatial data. Havasu introduces a range of pivotal features, including native support for manipulating and storing geometry and raster objects directly in data lake tables, and enables seamless querying and processing of spatial tables using Spatial SQL.

Key features of Havasu

ACID on Spatial Data

Havasu stands out for ensuring the ACID (Atomicity, Consistency, Isolation, and Durability) properties in spatial data transactions, a critical feature for reliable data management. This guarantees that each transaction is processed completely or not at all, maintaining data integrity even in complex environments. Furthermore, Havasu supports schema evolution, allowing for adaptable data structures without compromising existing data. This flexibility is key for efficiently managing and evolving spatial data over time, catering to the dynamic needs of spatial databases.

Native Spatial Support

Havasu supports geometry/geography and raster as primary data types, and seamlessly integrates with the compute layer. Users can easily read and write spatial data using Havasu, and process spatial data using any computation engine (e.g., WherobotsDB) as long as that computation engine implements a reader/writer for Havasu.

Efficiency

Computation engines (like WherobotsDB), when incorporating Havasu, can benefit from Havasu’s spatial filter push down support, which significantly accelerates spatial range queries. Havasu allows storing spatial data in object storages of the customer’s choice, and it decouples storage with computation, which makes storing vast amounts of spatial data very cost effective. Havasu also comes equipped with an efficient storage mode of raster data for parquet files, namely out-db storage, which enables high throughput of reading large amount of rasters.

Openness

Havasu is a table format with open specification. Havasu is based on Apache Iceberg, which has an open specification and was widely adopted by the big data ecosystem. The extension to the Apache Iceberg specification is also clearly specified so that any implementation adopting this extended specification is able to read and write Havasu tables. Customers can store Havasu tables in the storage of their choice, without being tightly coupled to one specific vendor or implementation.

Havasu in a Nutshell: Key Technical Insights

The open-source Havasu specification can be found in the Wherobots Havasu documentation. The Havasu table format extends the Iceberg table spec to support managing large spatial datasets as tables, in the following ways:

  • Primitive spatial data types and storage: the Havasu specification extends the Iceberg specification to support spatial data types.
  • Spatial statistics: extending the Iceberg manifest files to support spatial statistics.
  • Spatial filter push down and indexing: extending the Iceberg specifcation to support spatial filter pushdown and spatial indexing which greatly reduces the data retrieval overhead.

All other aspects of Iceberg spec are unchanged. For example, the Havasu specification does not change the fundamental organization of table files.

Primitive spatial data types and storage
Geometry

Geometry values consist of an optional spatial reference ID (abbreviated as SRID) and a geometry shape. The SRID is a 32-bit integer that identifies the coordinate system that the geometry shape is using. The interpretation of SRID is implementation dependent. For example, the SRID could be an EPSG code, or a code defined by a proprietary coordinate system. The geometry shape is one of the types defined by OGC Simple Features for SQL specification. The geometry shape can be stored in one of the following formats in underlying parquet files.

Encoding Parquet physical type Logical type Description
ewkb BINARY Extended Well-known binary (EWKB)
wkb BINARY Well-known binary (WKB)
wkt BINARY UTF8 Well-known text (WKT)
geojson BINARY UTF8 https://datatracker.ietf.org/doc/html/rfc7946

When the geometry column is at the root of the schema, and the geometry encoding is one of wkb and ewkb, the application can optionally write the GeoParquet metadata to the Parquet files. The GeoParquet metadata is defined by the GeoParquet specification.

Raster

A raster is one or more grids of cells. All the grids should have width rows and height columns. The grid cells are represented by the band field. The grids are geo-referenced using an affine transformation that maps the grid coordinates to world coordinates. The coordinate reference system (CRS) of the world coordinates is specified by the crs field. The CRS will be serialized as a WKT string when stored in data files.

Havasu supports persisting raster band values in two different ways:

  • in-db: The band values are stored in the same data file as the geo-referencing information. The band values are stored in the bands field of the raster value.
  • out-db: The band values are stored in files external to Havasu tables. The raster value stored in Havasu data file contains the geo-referencing information and URI of external raster files. The URI of external raster files are stored in the bands field of the raster value.
Spatial statistics

Havasu collects and records the spatial statistics of data files when writing data to the table. The spatial statistics includes the minimum bounding rectangle (MBR) of the geometries in the data file.

Geometry bounds

The bounds of geometry values should be derived using their minimum bounding rectangles (MBRs). The MBR of a geometry value is defined as the smallest rectangle that contains the geometry value. The SRID of geometry values are ignored when computing the MBRs. The MBRs of all geometry values in a data file should be unioned together as a single MBR, which is the MBR of the data file.

Raster bounds

Raster bounds are MBRs of rasters in WGS84 coordinate system. They are computed by transforming the envelope of the raster in its native coordinate system to WGS84. Raster bounds have a special rule for handling MBRs crossing the anti-meridian. Implementations of the Havasu specification should be able to handle MBRs crossing the anti-meridian correctly, otherwise spatial query optimizations will derive incomplete query results.

Spatial filter push down and indexing

Database engines can take advantage of the spatial statistics of data files to optimize the query execution plan. For example, if the query predicate is a spatial range query, the engine can use the spatial statistics to prune the data files that do not contain any data that satisfies the query predicate. This process is called spatial filter pushdown. How spatial query optimization is implemented in scan planning is implementation dependent. For example, in WherobotsDB, for a spatial range query ST_Within(geom, Q), where geom is the geometry field in a Havasu table, Q is a constant geometry as the query window, WherobotsDB converts the spatial query predicate to an inclusive projection ST_Intersects(MBR[geom], Q)MBR[geom] is the minimum bounding box of all values of geom in a data file. Then Sedona evaluates the projection using field statistics maintained in manifest files

data skipping

Spatial filter push down works best when the spatial data near to each other were stored in the same file. Havasu provides a syntax CREATE SPATIAL INDEX for rewriting the table to sort the records by geometry column.

CREATE SPATIAL INDEX FOR <table_name> USING hilbert(<geometry_column>, <precision>) [ WHERE <condition> ] [ OPTIONS <options> ];

This statement will rewrite the data files of the table and cluster the data by the geometry column. This feature is very useful when the table contains a large amount of data and the spatial filter is very selective. For example, if the table contains 1TB of data and the spatial filter will only select 1% of the data, ideally Havasu will only read ~ 10GB of data to answer the query.

Navigating Spatial Data with Havasu-Powered WherobotsDB Tables

WherobotsDB implements a reader/writer for the Havasu spatial table format. Users can perform many interesting spatial database operations on Havasu tables using WherobotsDB in Wherobots Cloud. Here we explore some common operations using WherobotsDB. For details, please read Wherobots documentation. To follow along create a free Wherobots Cloud account.

Create a new table in Wherobots table catalog

First, let’s create a Havasu table in the wherobots table catalog. This catalog by default is configured to use your Wherobots Cloud S3 storage, but another storage location can also be specified. We’ll use a dataset of Taxi rides as our example.

CREATE TABLE wherobots.test_db.taxi (
  pickup GEOMETRY,
  Trip_Pickup_DateTime STRING,
  Payment_Type STRING,
  Fare_Amt DECIMAL(10,0))
USING havasu.iceberg
-- By default this table will be stored in your Wherobots Cloud S3 account
-- Optionally specify other location
-- LOCATION 's3://path/to/warehouse/test_db/taxi'

List tables in Wherobots table catalog

We can view all tables within the wherobots table catalog using SHOW TABLES:

SHOW TABLES IN wherobots.test_db

We can see the wherobots.test_db.taxi table that we just created:

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  test_db|       taxi|      false|
+---------+-----------+-----------+

Describe a table in Wherobots table catalog

To view the columns and datatypes of each column we can describe the table:

DESCRIBE TABLE wherobots.test_db.taxi

Note here that our pickup column is of type geometry:

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|              pickup| geometry|   null|
|Trip_Pickup_DateTime|   string|   null|
|        Payment_Type|   string|   null|
|            Fare_Amt|   string|   null|
+--------------------+---------+-------+

Insert geometry data

We can insert data into our table using SQL. Here we specify the geometry value using the ST_GeomFromText function which takes a WKT string, in this case to describe the point geometry that represents the pickup location.

sedona.sql("""
INSERT INTO wherobots.test_db.taxi
VALUES(ST_GeomFromText('POINT (-73.96969 40.749244)'), '10/16/09 22:35', 'Credit', 42)
""")

We can also write spatial DataFrames to Havasu tables. Here we load a NYC taxi dataset into a Sedona DataFrame, then append the data to our wherobots.test_db.taxi Havasu table:

taxidf = sedona.read.format('csv').option("header","true").option("delimiter", ",").load("s3a://wherobots-examples-prod/data/nyc-taxi-data.csv")
taxidf = taxidf.selectExpr('ST_Point(CAST(Start_Lon AS Decimal(24,20)), CAST(Start_Lat AS Decimal(24,20))) AS pickup', 'Trip_Pickup_DateTime', 'Payment_Type', 'CAST(Fare_Amt AS DECIMAL)')
taxidf = taxidf.filter(col("pickup").isNotNull())

taxidf.writeTo("wherobots.test_db.taxi").append()

Create spatial index

Creating a spatial index will rewrite the table to sort records by the geometry column. Havasu supports the hilbert index strategy which will sort the data based on the Hilbert space filling curve which is very efficient at sorting geospatial data based on proximity. We can configure the precision by specifying a value for the precision parameter, which is the number of bits used to represent the Hilbert index.

sedona.sql("CREATE SPATIAL INDEX FOR wherobots.test_db.taxi USING hilbert(pickup, 10)")

Read data from Havasu table

We can query our Havasu tables using familiar SQL, however when using WherobotsDB we have the advantage of spatial queries using Spatial SQL functions. Here we search for all taxi pickups that occurred within a certain area around a given point:

sedona.sql("""
SELECT * FROM wherobots.test_db.taxi 
WHERE ST_Intersects(pickup, ST_Buffer(ST_GeomFromText('POINT (-73.96969 40.749244)'), 0.001))
""").show(truncate=False)
+----------------------------+--------------------+------------+--------+
|pickup                      |Trip_Pickup_DateTime|Payment_Type|Fare_Amt|
+----------------------------+--------------------+------------+--------+
|POINT (-73.96969 40.749244) |1/5/09 16:29        |Credit      |9       |
|POINT (-73.969387 40.749159)|1/20/09 14:38       |CASH        |7       |
|POINT (-73.969308 40.75001) |1/8/09 17:48        |CASH        |11      |
|POINT (-73.969355 40.749315)|1/7/09 16:52        |CASH        |10      |
|POINT (-73.970238 40.749497)|1/19/09 2:42        |Credit      |45      |
|POINT (-73.969492 40.749103)|1/21/09 19:34       |Credit      |15      |
|POINT (-73.970158 40.749055)|1/15/09 14:34       |CASH        |8       |
|POINT (-73.969638 40.748663)|1/27/09 17:46       |CASH        |9       |
|POINT (-73.970167 40.749033)|1/2/09 18:49        |CASH        |8       |
|POINT (-73.97059 40.749077) |1/18/09 20:39       |Credit      |10      |
|POINT (-73.970105 40.748985)|1/12/09 11:39       |CASH        |9       |
|POINT (-73.970228 40.749027)|1/8/09 16:07        |CASH        |5       |
|POINT (-73.9697 40.748737)  |1/5/09 18:04        |Credit      |6       |
|POINT (-73.970628 40.749132)|1/27/09 18:11       |CASH        |10      |
|POINT (-73.969573 40.748677)|1/29/09 19:35       |CASH        |5       |
|POINT (-73.969783 40.749163)|1/6/09 19:48        |Cash        |8       |
|POINT (-73.969522 40.748948)|1/4/09 16:24        |CASH        |5       |
|POINT (-73.969529 40.749625)|1/7/09 23:38        |CASH        |7       |
|POINT (-73.969473 40.749072)|1/29/09 18:04       |CASH        |16      |
|POINT (-73.970575 40.749395)|1/7/09 19:36        |CASH        |8       |
+----------------------------+--------------------+------------+--------+
only showing top 20 rows

Insert out-db raster data

We can also work with raster data in Havasu tables. Here we insert raster data into a Havasu table using the out-db option. You can read more about working with raster data in Havasu tables in the documentation.

sedona.sql("SELECT RS_FromPath('s3a://XXX.tif') as rast"). \\
    writeTo("wherobots.test_db.test_table").append()

Havasu-Powered Wherobots open data catalog

Wherobots collects open datasets from various data sources, then cleans and transforms them to Havasu format to enable linking enterprise data to the real physical world. All datasets are provided for free (except AWS data transfer fee). Certain datasets are only accessible by our Pro Edition users. To learn more, please read Wherobots Open Data

Dataset name Availability in Wherobots Type Count Description
Overture Maps buildings/building Community edition Polygon 785 million Any human-made structures with roofs or interior spaces
Overture Maps places/place Community edition Point 59 million Any business or point of interest within the world
Overture Maps admins/administrativeBoundary Community edition LineString 96 thousand Any officially defined border between two Administrative Localities
Overture Maps admins/locality Community edition Point 2948 Countries and hierarchical subdivisions of countries
Overture Maps transportation/connector Community edition Point 330 million Points of physical connection between two or more segments
Overture Maps transportation/segment Community edition LineString 294 million Center-line of a path which may be traveled
Google & Microsoft open buildings Professional edition Polygon 2.5 billion Google & Microsoft Open Buildings, combined by VIDA
LandSAT surface temperature Professional edition Raster (GeoTiff) 166K images, 10 TB size The temperature of the Earth’s surface in Kelvin, from Aug 2023 to Oct 2023
US Census ZCTA codes Professional edition Polygon 33144 ZIP Code Tabulation Areas defined in 2018
NYC TLC taxi trip records Professional edition Point 200 million NYC TLC taxi trip pickup and dropoff records per trip
Open Street Maps all nodes Professional edition Point 8 billion All the nodes of the OpenStreetMap Planet dataset
Open Street Maps postal codes Professional edition Polygon 154 thousand Boundaries of postal code areas as defined in OpenStreetMap
Weather events Professional edition Point 8.6 million Events such as rain, snow, storm, from 2016 – 2022
Wild fires Professional edition Point 1.8 million Wildfire that occurred in the United States from 1992 to 2015

The Wherobots open data catalog can be extremely useful when tables are combined, typically using spatial joins, to address real world business use cases like risk analysis, site selection, fleet vehicle optimization and answering other business intelligence questions.

Spatial join query to find zones prone to wild fires
Let’s see how we can make use of the Wherobots open data catalog using Havasu tables to perform a spatial join operation to find US zipcode regions that experience the most wild fires. To do this we will use the wherobots_open_data.us_census.zipcode Havasu table which contains the polygon geometries of US zipcodes and wherobots_open_data.weather.wild_fires which contains point geometries of wild fire events.

We perform a spatial join operation using the ST_Intersects spatial SQL function to define a predicate that will join fires that occur within their respective zipcodes.

fire_zone = sedona.sql(
    """
    SELECT
        z.geometry as zipgeom,
        z.ZCTA5CE10 as zipcode,
        f.FIRE_NAME
    FROM
        wherobots_open_data.us_census.zipcode z,
        wherobots_open_data.weather.wild_fires f
    WHERE
        ST_Intersects(z.geometry, f.geometry)
    """
)

We can then group this data by Zipcode to find the count of fires that occur in each zip code and visualize the results. This type of analysis can be useful for risk analysis and insurance premium pricing.

wildfire risk by county

Resources

Want to keep up with the latest developer news from the Wherobots and Apache Sedona community? Sign up for the This Month In Wherobots Newsletter:


Spatial Data, GeoParquet, and Apache Sedona

Apache Parquet is a modern columnar data file format optimized for analytical workload. It is widely adopted in the big data ecosystem. Spatial scientists may also want to use parquet to store spatial data, especially when analyzing large scale datasets on Spark or cloud data warehouses. Problems arise when storing spatial data to parquet files.

  1. Parquet does not have native support for geometry objects such as point, linestring and polygon. These geometry objects needs to be serialized to be stored as a primitive type (such as BINARY) in parquet files. There’s no easy way to tell which column stores geometry objects by simply looking at the schema of the files. Users may adopt some conventions such that geometry objects were stored in geom or geometry column, or user has to read some metadata about the dataset to find which columns are geometry data.
  2. There are many serialization formats such as WKB, WKT, GeoJSON, etc. Users need to know which serialization format to use to properly read the geometry objects from geometry columns.

GeoParquet is an incubating OGC standard for addressing these problems. It defines how to serialize geometry objects as parquet values and the file-level metadata for describing geometry columns. Implementations that follow this standard could easily exchange parquet files containing spatial data, and enjoy the benefits of parquet columnar format when running analytics on GeoParquet files.

A Brief Introduction to GeoParquet Specification

The main idea of the GeoParquet specification is that it defines a standard schema for file-level metadata attached to the footer of parquet files. It defines which columns are for geometry data, and the encoding of geometry data in each column. For example, we can load a GeoParquet file using GeoPandas, and see that the column for storing geometry objects are interpreted as geometry field:

>>> import geopandas as gpd
>>> df_geom = gpd.read_parquet('./sample.parquet')
>>> df_geom
  id                                                  g
0  1  POLYGON ((-111.50299 33.17965, -111.50299 33.8...
1  2  POLYGON ((-111.50299 33.17965, -111.50299 33.8...
>>> df_geom.dtypes
id      object
g     geometry
dtype: objectf

We can inspect the metadata of this GeoParquet file using PyArrow. The metadata keyed by geo stores information on the geometry columns:

>>> import pyarrow as pa
>>> import pyarrow.parquet
>>> import json
>>> table = pa.parquet.read_table('./sample.parquet')
>>> metadata_str = table.schema.metadata[b'geo'].decode('utf-8')
>>> metadata = json.loads(metadata_str)
>>> print(json.dumps(metadata, indent=2))
{
  "primary_column": "g",
  "columns": {
    "g": {
      "encoding": "WKB",
      "crs": null,
      "geometry_type": "Polygon",
      "bbox": [
        -112.473156,
        33.179649,
        -111.502991,
        33.868652
      ]
    }
  },
  "version": "0.4.0",
  "creator": {
    "library": "geopandas",
    "version": "0.12.2"
  }
}

The metadata designated that column g is a primary geometry column encoded as WKB. There are some additional informations about this column such as geometry types and CRS of geometries in this column. Readers can refer to the column metadata section of the specification to understand how to interpret these information.

There is an interesting metadata for geometry columns keyed by bbox. It is the minimum bounding box of all the geometries in this column. This metadata is useful for skipping unrelated GeoParquet files when user runs a spatial range query on a dataset made up of a bunch of GeoParquet files. For example, the query window shown as the red rectangle only overlaps with the bounding box of part1.parquet and part2.parquet, so the query engine can safely skip scanning part3.parquet and part4.parquet, reducing the IO cost and answer the query faster.

Working with GeoParquet in Apache Sedona

Apache Sedona 1.4.1 supports reading and writing GeoParquet files on Apache Spark 3.0 ~ 3.4. We’ll go through an example to demonstrate the usage of GeoParquet support of Apache Sedona, and how to speed up your spatial queries using GeoParquet.

Saving DataFrames to GeoParquet Files

We’ll use the TIGER census 2022 Shapefile dataset in our examples. Readers can navigate to https://www2.census.gov/geo/tiger/TIGER2022/TABBLOCK20/ and download the datasets. We’ve already prepared the data for Arizona state in our public S3 bucket so we can simply run this code to load the Shapefiles.

spatialRdd = ShapefileReader.readToGeometryRDD(sc, "s3a://wherobots-examples/data/census-2022/tl_2022_04_tabblock20")
df = Adapter.toDf(spatialRdd, spark)
df.show(5)

+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|            geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20|        GEOID20|    NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|  ALAND20|AWATER20| INTPTLAT20|  INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|POLYGON ((-109.80...|       04|       009|   961100|     1175|040099611001175|Block 1175|  G5040|   R|      |        |         S|        0|   57613|+32.9046017|-109.8078124|        0|    0|
|POLYGON ((-110.44...|       04|       009|   961600|     1025|040099616001025|Block 1025|  G5040|   R|      |        |         S|100235475|    8124|+32.8481268|-110.3910394|        3|    2|
|POLYGON ((-109.26...|       04|       003|   000501|     1186|040030005011186|Block 1186|  G5040|   R|      |        |         S| 28230312|   67735|+31.5716354|-109.2328547|        0|    0|
|POLYGON ((-109.18...|       04|       003|   000501|     1195|040030005011195|Block 1195|  G5040|   R|      |        |         S| 12013891|  451563|+31.5362891|-109.1563449|        0|    0|
|POLYGON ((-113.35...|       04|       005|   002301|     3702|040050023013702|Block 3702|  G5040|   R|      |        |         S|   118622| 1600804|+35.9649737|-113.3170363|        0|    0|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+

Now we can simply save the DataFrame as GeoParquet using df.write.format("geoparquet").save:

df.write.format("geoparquet").save(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")

Loading DataFrames from GeoParquet Files

We can call sedona.read.format("geoparquet").load to load DataFrame from GeoParquet files. The geometry columns will be decoded as geometry objects automatically.

df_geoparquet = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
df_geoparquet.show(5)
df_geoparquet.printSchema()

+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|            geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20|        GEOID20|    NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|  ALAND20|AWATER20| INTPTLAT20|  INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|POLYGON ((-109.80...|       04|       009|   961100|     1175|040099611001175|Block 1175|  G5040|   R|      |        |         S|        0|   57613|+32.9046017|-109.8078124|        0|    0|
|POLYGON ((-110.44...|       04|       009|   961600|     1025|040099616001025|Block 1025|  G5040|   R|      |        |         S|100235475|    8124|+32.8481268|-110.3910394|        3|    2|
|POLYGON ((-109.26...|       04|       003|   000501|     1186|040030005011186|Block 1186|  G5040|   R|      |        |         S| 28230312|   67735|+31.5716354|-109.2328547|        0|    0|
|POLYGON ((-109.18...|       04|       003|   000501|     1195|040030005011195|Block 1195|  G5040|   R|      |        |         S| 12013891|  451563|+31.5362891|-109.1563449|        0|    0|
|POLYGON ((-113.35...|       04|       005|   002301|     3702|040050023013702|Block 3702|  G5040|   R|      |        |         S|   118622| 1600804|+35.9649737|-113.3170363|        0|    0|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
only showing top 5 rows

root
 |-- geometry: geometry (nullable = true)
 |-- STATEFP20: string (nullable = true)
 |-- COUNTYFP20: string (nullable = true)
 |-- TRACTCE20: string (nullable = true)
 |-- BLOCKCE20: string (nullable = true)
 |-- GEOID20: string (nullable = true)
 |-- NAME20: string (nullable = true)
 |-- MTFCC20: string (nullable = true)
 |-- UR20: string (nullable = true)
 |-- UACE20: string (nullable = true)
 |-- UATYPE20: string (nullable = true)
 |-- FUNCSTAT20: string (nullable = true)
 |-- ALAND20: string (nullable = true)
 |-- AWATER20: string (nullable = true)
 |-- INTPTLAT20: string (nullable = true)
 |-- INTPTLON20: string (nullable = true)
 |-- HOUSING20: string (nullable = true)
 |-- POP20: string (nullable = true)

As we can see, the geometry column was read back as a geometry column. Users can directly apply ST functions on this column without first parsing it from WKB.

Interoperability with GeoPandas

The primary goal of GeoParquet is defining a standard so that we can exchange parquet files containing geospatial data amongst various geospatial libraries and platforms. For instance, the GeoParquet files saved by Apache Sedona could be loaded by GeoPandas and vice versa. We can load the GeoParquet files we saved in the previous section using GeoPandas and see that the geometry column was correctly loaded.

gpd_df = geopandas.read_parquet(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
gpd_df.head()

geometry STATEFP20 COUNTYFP20 TRACTCE20 BLOCKCE20          GEOID20      NAME20 MTFCC20 UR20 UACE20 UATYPE20 FUNCSTAT20    ALAND20 AWATER20   INTPTLAT20    INTPTLON20 HOUSING20 POP20
0  POLYGON ((-109.80930 32.90518, -109.80928 32.9...        04        009    961100      1175  040099611001175  Block 1175   G5040    R                          S          0    57613  +32.9046017  -109.8078124         0     0
1  POLYGON ((-110.44854 32.76838, -110.44853 32.7...        04        009    961600      1025  040099616001025  Block 1025   G5040    R                          S  100235475     8124  +32.8481268  -110.3910394         3     2
2  POLYGON ((-109.26002 31.57156, -109.25791 31.5...        04        003    000501      1186  040030005011186  Block 1186   G5040    R                          S   28230312    67735  +31.5716354  -109.2328547         0     0
3  POLYGON ((-109.18276 31.55277, -109.14511 31.5...        04        003    000501      1195  040030005011195  Block 1195   G5040    R                          S   12013891   451563  +31.5362891  -109.1563449         0     0
4  POLYGON ((-113.35416 36.04097, -113.35416 36.0...        04        005    002301      3702  040050023013702  Block 3702   G5040    R                          S     118622  1600804  +35.9649737  -113.3170363         0     0
gpd_df.plot()

Load all Census 2022 Data

The entire census 2022 dataset takes 8 GB and contains 8 million polygons. We’ve already stored all the data in a single GeoParquet file. Readers can directly load data from our GeoParquet file stored on our public S3 bucket. It is recommended to run the above query on Spark clusters with good network connection to AWS S3, otherwise you can download the dataset from here and store it in somewhere you have fast access to.

df_census_2022 = sedona.read.format("geoparquet").load("s3a://wherobots-examples/data/census-2022-geoparquet.snappy.parquet")
df_census_2022.show(5)

+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
|            geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20|        GEOID20|    NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|ALAND20|AWATER20| INTPTLAT20|  INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
|POLYGON ((-91.968...|       22|       083|   970600|     3076|220839706003076|Block 3076|  G5040|   R|      |        |         S|1806215|       0|+32.2045100|-091.9584632|        0|    0|
|POLYGON ((-79.872...|       42|       129|   805200|     1000|421298052001000|Block 1000|  G5040|   U| 58162|       U|         S| 401352|       0|+40.1526521|-079.8673403|       68|  158|
|POLYGON ((-95.451...|       48|       339|   693103|     2013|483396931032013|Block 2013|  G5040|   U| 87300|       U|         S|  12340|       0|+30.3059421|-095.4503930|        9|   43|
|POLYGON ((-94.345...|       29|       037|   060904|     3004|290370609043004|Block 3004|  G5040|   U| 37189|       U|         S|  29419|       0|+38.6592501|-094.3441431|       18|   31|
|POLYGON ((-87.965...|       55|       089|   640102|     3027|550896401023027|Block 3027|  G5040|   U| 34122|       U|         S|   3807|       0|+43.3208487|-087.9652632|        4|    3|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
only showing top 5 rows

Query the dataset

Now let’s run some spatial range queries on this dataset. Here we define a range query to fetch records within the Phoenix city:

spatial_filter = "ST_Within(geometry, ST_PolygonFromEnvelope(-112.473156, 33.179649, -111.502991, 33.868652))"

We can run the query and count the result set, and use %%time to measure time elapsed running this query:

%%time
df_census_2022.where(spatial_filter).count()

CPU times: user 15.3 ms, sys: 12.6 ms, total: 27.9 ms
Wall time: 1min 10s
Out[18]: 57812

The query took more than 1 minute to finish, and the result set contains 57812 rows. Now let’s explore ways to redistribute the dataset to achieve lower query latency.

Redistribute the dataset for faster spatial queries

GeoParquet files has bbox metadata for geometry columns. If we partition the dataset to multiple GeoParquet files such that geometry objects that are close to each other were stored in the same GeoParquet file, then each GeoParquet file will cover only a small portion of the entire space. We only need to scan the GeoParquet files whose bbox overlaps with the query window and skip scanning the rest. This optimization is called “filter pushdown” and it was already implemented by Apache Sedona since 1.4.0. In this section we explore different ways to repartition the dataset to speed up spatial range queries.

Partition by State

Census 2022 dataset comes up with a STATEFP20 column, which is the state ID of the record. We can partition the dataset by this field so that records for the same state were grouped together.

df_census_2022.repartition("STATEFP20").write.partitionBy('STATEFP20').format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_statefp20.parquet')
df_census_2022_part_statefp20 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_statefp20.parquet")

Now we can run the same spatial range query. Note that we don’t need to manually add a predicate for the partitioning field or manually figure out which state the query window overlaps with. Apache Sedona will prune GeoParquet files according to their bbox metadata.

%%time
df_census_2022_part_statefp20.where(spatial_filter).count()

CPU times: user 18.9 ms, sys: 0 ns, total: 18.9 ms
Wall time: 7.3 s
Out[17]: 57812

Now the query took less than 8 seconds. Let’s visualize the bounding boxes of each GeoParquet files to see the distribution of the partitioned dataset.

file

Partition by S2

If your dataset does not have a field such as STATEFP20 to directly partition by, you can partition the dataset by the S2 cell ID of the geometry. This requires selecting a proper S2 resolution level suited for your dataset. Here we repartition the dataset by the level-4 S2 cell ID of the geometry values.

df_census_2022_s2 = df_census_2022.withColumn("s2", expr("array_max(ST_S2CellIds(ST_Centroid(geometry), 4))"))
df_census_2022_s2.repartition("s2").write.partitionBy("s2").format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_s2.parquet')
df_census_2022_part_s2 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_s2.parquet")

Let’s run the same query on this dataset partitioned by S2. Note that we don’t need to add a predicate for the s2 partitioning column.

%%time
df_census_2022_part_s2.where(spatial_filter).count()

CPU times: user 15.3 ms, sys: 856 µs, total: 16.2 ms
Wall time: 8.41 s
Out[16]: 57812

The bounding boxes of each GeoParquet files looks like this.

file

Sort by the GeoHash of geometry

An alternative approach is to sort the dataset by the GeoHash of the geometry values. The way GeoHash is encoded makes geometry objects that are close to each other has similar GeoHash values. We can utilize this property to redistribute our datasets for faster queries:

num_geoparquet_files = 100

sedona.conf.set("spark.sql.shuffle.partitions", num_geoparquet_files)
sedona.conf.set("spark.sql.adaptive.enabled", "false")
df_census_2022 = df_census_2022.withColumn("geohash", expr("ST_GeoHash(ST_Centroid(geometry), 20)"))\
    .sort(col("geohash"))\
    .drop("geohash")
df_census_2022.write.format("geoparquet").save(DATA_ROOT_PATH + "/census_2022_sorted.parquet")
sedona.conf.set("spark.sql.adaptive.enabled", "true")

Running the same query again, we can see it finishes in roughly 10 seconds.

%%time
df_census_2022_sorted = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_sorted.parquet")
df_census_2022_sorted.where(spatial_filter).count()

CPU times: user 12.4 ms, sys: 6.42 ms, total: 18.8 ms
Wall time: 11.7 s
Out[18]: 57812

The bounding boxes of each GeoParquet files looks like this. Note that although the bounding boxes of files significantly overlaps with each other, we can still prune most of the files when running spatial queries.

file

The Take-Away

Apache Sedona supports reading and writing GeoParquet files – A variant of Parquet for exchanging spatial data. Apache Sedona can perform spatial filter pushdown when running spatial range queries on GeoParquet files. You can group your GeoParquet dataset by spatial proximity to achieve faster spatial query performance.