Spatial Data, GeoParquet, and Apache Sedona

Apache Parquet is a modern columnar data file format optimized for analytical workload. It is widely adopted in the big data ecosystem. Spatial scientists may also want to use parquet to store spatial data, especially when analyzing large scale datasets on Spark or cloud data warehouses. Problems arise when storing spatial data to parquet files.

  1. Parquet does not have native support for geometry objects such as point, linestring and polygon. These geometry objects needs to be serialized to be stored as a primitive type (such as BINARY) in parquet files. There’s no easy way to tell which column stores geometry objects by simply looking at the schema of the files. Users may adopt some conventions such that geometry objects were stored in geom or geometry column, or user has to read some metadata about the dataset to find which columns are geometry data.
  2. There are many serialization formats such as WKB, WKT, GeoJSON, etc. Users need to know which serialization format to use to properly read the geometry objects from geometry columns.

GeoParquet is an incubating OGC standard for addressing these problems. It defines how to serialize geometry objects as parquet values and the file-level metadata for describing geometry columns. Implementations that follow this standard could easily exchange parquet files containing spatial data, and enjoy the benefits of parquet columnar format when running analytics on GeoParquet files.

A Brief Introduction to GeoParquet Specification

The main idea of the GeoParquet specification is that it defines a standard schema for file-level metadata attached to the footer of parquet files. It defines which columns are for geometry data, and the encoding of geometry data in each column. For example, we can load a GeoParquet file using GeoPandas, and see that the column for storing geometry objects are interpreted as geometry field:

>>> import geopandas as gpd
>>> df_geom = gpd.read_parquet('./sample.parquet')
>>> df_geom
  id                                                  g
0  1  POLYGON ((-111.50299 33.17965, -111.50299 33.8...
1  2  POLYGON ((-111.50299 33.17965, -111.50299 33.8...
>>> df_geom.dtypes
id      object
g     geometry
dtype: objectf

We can inspect the metadata of this GeoParquet file using PyArrow. The metadata keyed by geo stores information on the geometry columns:

>>> import pyarrow as pa
>>> import pyarrow.parquet
>>> import json
>>> table = pa.parquet.read_table('./sample.parquet')
>>> metadata_str = table.schema.metadata[b'geo'].decode('utf-8')
>>> metadata = json.loads(metadata_str)
>>> print(json.dumps(metadata, indent=2))
{
  "primary_column": "g",
  "columns": {
    "g": {
      "encoding": "WKB",
      "crs": null,
      "geometry_type": "Polygon",
      "bbox": [
        -112.473156,
        33.179649,
        -111.502991,
        33.868652
      ]
    }
  },
  "version": "0.4.0",
  "creator": {
    "library": "geopandas",
    "version": "0.12.2"
  }
}

The metadata designated that column g is a primary geometry column encoded as WKB. There are some additional informations about this column such as geometry types and CRS of geometries in this column. Readers can refer to the column metadata section of the specification to understand how to interpret these information.

There is an interesting metadata for geometry columns keyed by bbox. It is the minimum bounding box of all the geometries in this column. This metadata is useful for skipping unrelated GeoParquet files when user runs a spatial range query on a dataset made up of a bunch of GeoParquet files. For example, the query window shown as the red rectangle only overlaps with the bounding box of part1.parquet and part2.parquet, so the query engine can safely skip scanning part3.parquet and part4.parquet, reducing the IO cost and answer the query faster.

Working with GeoParquet in Apache Sedona

Apache Sedona 1.4.1 supports reading and writing GeoParquet files on Apache Spark 3.0 ~ 3.4. We’ll go through an example to demonstrate the usage of GeoParquet support of Apache Sedona, and how to speed up your spatial queries using GeoParquet.

Saving DataFrames to GeoParquet Files

We’ll use the TIGER census 2022 Shapefile dataset in our examples. Readers can navigate to https://www2.census.gov/geo/tiger/TIGER2022/TABBLOCK20/ and download the datasets. We’ve already prepared the data for Arizona state in our public S3 bucket so we can simply run this code to load the Shapefiles.

spatialRdd = ShapefileReader.readToGeometryRDD(sc, "s3a://wherobots-examples/data/census-2022/tl_2022_04_tabblock20")
df = Adapter.toDf(spatialRdd, spark)
df.show(5)

+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|            geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20|        GEOID20|    NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|  ALAND20|AWATER20| INTPTLAT20|  INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|POLYGON ((-109.80...|       04|       009|   961100|     1175|040099611001175|Block 1175|  G5040|   R|      |        |         S|        0|   57613|+32.9046017|-109.8078124|        0|    0|
|POLYGON ((-110.44...|       04|       009|   961600|     1025|040099616001025|Block 1025|  G5040|   R|      |        |         S|100235475|    8124|+32.8481268|-110.3910394|        3|    2|
|POLYGON ((-109.26...|       04|       003|   000501|     1186|040030005011186|Block 1186|  G5040|   R|      |        |         S| 28230312|   67735|+31.5716354|-109.2328547|        0|    0|
|POLYGON ((-109.18...|       04|       003|   000501|     1195|040030005011195|Block 1195|  G5040|   R|      |        |         S| 12013891|  451563|+31.5362891|-109.1563449|        0|    0|
|POLYGON ((-113.35...|       04|       005|   002301|     3702|040050023013702|Block 3702|  G5040|   R|      |        |         S|   118622| 1600804|+35.9649737|-113.3170363|        0|    0|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+

Now we can simply save the DataFrame as GeoParquet using df.write.format("geoparquet").save:

df.write.format("geoparquet").save(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")

Loading DataFrames from GeoParquet Files

We can call sedona.read.format("geoparquet").load to load DataFrame from GeoParquet files. The geometry columns will be decoded as geometry objects automatically.

df_geoparquet = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
df_geoparquet.show(5)
df_geoparquet.printSchema()

+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|            geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20|        GEOID20|    NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|  ALAND20|AWATER20| INTPTLAT20|  INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|POLYGON ((-109.80...|       04|       009|   961100|     1175|040099611001175|Block 1175|  G5040|   R|      |        |         S|        0|   57613|+32.9046017|-109.8078124|        0|    0|
|POLYGON ((-110.44...|       04|       009|   961600|     1025|040099616001025|Block 1025|  G5040|   R|      |        |         S|100235475|    8124|+32.8481268|-110.3910394|        3|    2|
|POLYGON ((-109.26...|       04|       003|   000501|     1186|040030005011186|Block 1186|  G5040|   R|      |        |         S| 28230312|   67735|+31.5716354|-109.2328547|        0|    0|
|POLYGON ((-109.18...|       04|       003|   000501|     1195|040030005011195|Block 1195|  G5040|   R|      |        |         S| 12013891|  451563|+31.5362891|-109.1563449|        0|    0|
|POLYGON ((-113.35...|       04|       005|   002301|     3702|040050023013702|Block 3702|  G5040|   R|      |        |         S|   118622| 1600804|+35.9649737|-113.3170363|        0|    0|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
only showing top 5 rows

root
 |-- geometry: geometry (nullable = true)
 |-- STATEFP20: string (nullable = true)
 |-- COUNTYFP20: string (nullable = true)
 |-- TRACTCE20: string (nullable = true)
 |-- BLOCKCE20: string (nullable = true)
 |-- GEOID20: string (nullable = true)
 |-- NAME20: string (nullable = true)
 |-- MTFCC20: string (nullable = true)
 |-- UR20: string (nullable = true)
 |-- UACE20: string (nullable = true)
 |-- UATYPE20: string (nullable = true)
 |-- FUNCSTAT20: string (nullable = true)
 |-- ALAND20: string (nullable = true)
 |-- AWATER20: string (nullable = true)
 |-- INTPTLAT20: string (nullable = true)
 |-- INTPTLON20: string (nullable = true)
 |-- HOUSING20: string (nullable = true)
 |-- POP20: string (nullable = true)

As we can see, the geometry column was read back as a geometry column. Users can directly apply ST functions on this column without first parsing it from WKB.

Interoperability with GeoPandas

The primary goal of GeoParquet is defining a standard so that we can exchange parquet files containing geospatial data amongst various geospatial libraries and platforms. For instance, the GeoParquet files saved by Apache Sedona could be loaded by GeoPandas and vice versa. We can load the GeoParquet files we saved in the previous section using GeoPandas and see that the geometry column was correctly loaded.

gpd_df = geopandas.read_parquet(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
gpd_df.head()

geometry STATEFP20 COUNTYFP20 TRACTCE20 BLOCKCE20          GEOID20      NAME20 MTFCC20 UR20 UACE20 UATYPE20 FUNCSTAT20    ALAND20 AWATER20   INTPTLAT20    INTPTLON20 HOUSING20 POP20
0  POLYGON ((-109.80930 32.90518, -109.80928 32.9...        04        009    961100      1175  040099611001175  Block 1175   G5040    R                          S          0    57613  +32.9046017  -109.8078124         0     0
1  POLYGON ((-110.44854 32.76838, -110.44853 32.7...        04        009    961600      1025  040099616001025  Block 1025   G5040    R                          S  100235475     8124  +32.8481268  -110.3910394         3     2
2  POLYGON ((-109.26002 31.57156, -109.25791 31.5...        04        003    000501      1186  040030005011186  Block 1186   G5040    R                          S   28230312    67735  +31.5716354  -109.2328547         0     0
3  POLYGON ((-109.18276 31.55277, -109.14511 31.5...        04        003    000501      1195  040030005011195  Block 1195   G5040    R                          S   12013891   451563  +31.5362891  -109.1563449         0     0
4  POLYGON ((-113.35416 36.04097, -113.35416 36.0...        04        005    002301      3702  040050023013702  Block 3702   G5040    R                          S     118622  1600804  +35.9649737  -113.3170363         0     0
gpd_df.plot()

Load all Census 2022 Data

The entire census 2022 dataset takes 8 GB and contains 8 million polygons. We’ve already stored all the data in a single GeoParquet file. Readers can directly load data from our GeoParquet file stored on our public S3 bucket. It is recommended to run the above query on Spark clusters with good network connection to AWS S3, otherwise you can download the dataset from here and store it in somewhere you have fast access to.

df_census_2022 = sedona.read.format("geoparquet").load("s3a://wherobots-examples/data/census-2022-geoparquet.snappy.parquet")
df_census_2022.show(5)

+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
|            geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20|        GEOID20|    NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|ALAND20|AWATER20| INTPTLAT20|  INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
|POLYGON ((-91.968...|       22|       083|   970600|     3076|220839706003076|Block 3076|  G5040|   R|      |        |         S|1806215|       0|+32.2045100|-091.9584632|        0|    0|
|POLYGON ((-79.872...|       42|       129|   805200|     1000|421298052001000|Block 1000|  G5040|   U| 58162|       U|         S| 401352|       0|+40.1526521|-079.8673403|       68|  158|
|POLYGON ((-95.451...|       48|       339|   693103|     2013|483396931032013|Block 2013|  G5040|   U| 87300|       U|         S|  12340|       0|+30.3059421|-095.4503930|        9|   43|
|POLYGON ((-94.345...|       29|       037|   060904|     3004|290370609043004|Block 3004|  G5040|   U| 37189|       U|         S|  29419|       0|+38.6592501|-094.3441431|       18|   31|
|POLYGON ((-87.965...|       55|       089|   640102|     3027|550896401023027|Block 3027|  G5040|   U| 34122|       U|         S|   3807|       0|+43.3208487|-087.9652632|        4|    3|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
only showing top 5 rows

Query the dataset

Now let’s run some spatial range queries on this dataset. Here we define a range query to fetch records within the Phoenix city:

spatial_filter = "ST_Within(geometry, ST_PolygonFromEnvelope(-112.473156, 33.179649, -111.502991, 33.868652))"

We can run the query and count the result set, and use %%time to measure time elapsed running this query:

%%time
df_census_2022.where(spatial_filter).count()

CPU times: user 15.3 ms, sys: 12.6 ms, total: 27.9 ms
Wall time: 1min 10s
Out[18]: 57812

The query took more than 1 minute to finish, and the result set contains 57812 rows. Now let’s explore ways to redistribute the dataset to achieve lower query latency.

Redistribute the dataset for faster spatial queries

GeoParquet files has bbox metadata for geometry columns. If we partition the dataset to multiple GeoParquet files such that geometry objects that are close to each other were stored in the same GeoParquet file, then each GeoParquet file will cover only a small portion of the entire space. We only need to scan the GeoParquet files whose bbox overlaps with the query window and skip scanning the rest. This optimization is called “filter pushdown” and it was already implemented by Apache Sedona since 1.4.0. In this section we explore different ways to repartition the dataset to speed up spatial range queries.

Partition by State

Census 2022 dataset comes up with a STATEFP20 column, which is the state ID of the record. We can partition the dataset by this field so that records for the same state were grouped together.

df_census_2022.repartition("STATEFP20").write.partitionBy('STATEFP20').format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_statefp20.parquet')
df_census_2022_part_statefp20 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_statefp20.parquet")

Now we can run the same spatial range query. Note that we don’t need to manually add a predicate for the partitioning field or manually figure out which state the query window overlaps with. Apache Sedona will prune GeoParquet files according to their bbox metadata.

%%time
df_census_2022_part_statefp20.where(spatial_filter).count()

CPU times: user 18.9 ms, sys: 0 ns, total: 18.9 ms
Wall time: 7.3 s
Out[17]: 57812

Now the query took less than 8 seconds. Let’s visualize the bounding boxes of each GeoParquet files to see the distribution of the partitioned dataset.

file

Partition by S2

If your dataset does not have a field such as STATEFP20 to directly partition by, you can partition the dataset by the S2 cell ID of the geometry. This requires selecting a proper S2 resolution level suited for your dataset. Here we repartition the dataset by the level-4 S2 cell ID of the geometry values.

df_census_2022_s2 = df_census_2022.withColumn("s2", expr("array_max(ST_S2CellIds(ST_Centroid(geometry), 4))"))
df_census_2022_s2.repartition("s2").write.partitionBy("s2").format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_s2.parquet')
df_census_2022_part_s2 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_s2.parquet")

Let’s run the same query on this dataset partitioned by S2. Note that we don’t need to add a predicate for the s2 partitioning column.

%%time
df_census_2022_part_s2.where(spatial_filter).count()

CPU times: user 15.3 ms, sys: 856 µs, total: 16.2 ms
Wall time: 8.41 s
Out[16]: 57812

The bounding boxes of each GeoParquet files looks like this.

file

Sort by the GeoHash of geometry

An alternative approach is to sort the dataset by the GeoHash of the geometry values. The way GeoHash is encoded makes geometry objects that are close to each other has similar GeoHash values. We can utilize this property to redistribute our datasets for faster queries:

num_geoparquet_files = 100

sedona.conf.set("spark.sql.shuffle.partitions", num_geoparquet_files)
sedona.conf.set("spark.sql.adaptive.enabled", "false")
df_census_2022 = df_census_2022.withColumn("geohash", expr("ST_GeoHash(ST_Centroid(geometry), 20)"))\
    .sort(col("geohash"))\
    .drop("geohash")
df_census_2022.write.format("geoparquet").save(DATA_ROOT_PATH + "/census_2022_sorted.parquet")
sedona.conf.set("spark.sql.adaptive.enabled", "true")

Running the same query again, we can see it finishes in roughly 10 seconds.

%%time
df_census_2022_sorted = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_sorted.parquet")
df_census_2022_sorted.where(spatial_filter).count()

CPU times: user 12.4 ms, sys: 6.42 ms, total: 18.8 ms
Wall time: 11.7 s
Out[18]: 57812

The bounding boxes of each GeoParquet files looks like this. Note that although the bounding boxes of files significantly overlaps with each other, we can still prune most of the files when running spatial queries.

file

The Take-Away

Apache Sedona supports reading and writing GeoParquet files – A variant of Parquet for exchanging spatial data. Apache Sedona can perform spatial filter pushdown when running spatial range queries on GeoParquet files. You can group your GeoParquet dataset by spatial proximity to achieve faster spatial query performance.