Apache Parquet is a modern columnar data file format optimized for analytical workload. It is widely adopted in the big data ecosystem. Spatial scientists may also want to use parquet to store spatial data, especially when analyzing large scale datasets on Spark or cloud data warehouses. Problems arise when storing spatial data to parquet files.
- Parquet does not have native support for geometry objects such as point, linestring and polygon. These geometry objects needs to be serialized to be stored as a primitive type (such as BINARY) in parquet files. There’s no easy way to tell which column stores geometry objects by simply looking at the schema of the files. Users may adopt some conventions such that geometry objects were stored in
geom
orgeometry
column, or user has to read some metadata about the dataset to find which columns are geometry data. - There are many serialization formats such as WKB, WKT, GeoJSON, etc. Users need to know which serialization format to use to properly read the geometry objects from geometry columns.
GeoParquet is an incubating OGC standard for addressing these problems. It defines how to serialize geometry objects as parquet values and the file-level metadata for describing geometry columns. Implementations that follow this standard could easily exchange parquet files containing spatial data, and enjoy the benefits of parquet columnar format when running analytics on GeoParquet files.
A Brief Introduction to GeoParquet Specification
The main idea of the GeoParquet specification is that it defines a standard schema for file-level metadata attached to the footer of parquet files. It defines which columns are for geometry data, and the encoding of geometry data in each column. For example, we can load a GeoParquet file using GeoPandas, and see that the column for storing geometry objects are interpreted as geometry field:
>>> import geopandas as gpd
>>> df_geom = gpd.read_parquet('./sample.parquet')
>>> df_geom
id g
0 1 POLYGON ((-111.50299 33.17965, -111.50299 33.8...
1 2 POLYGON ((-111.50299 33.17965, -111.50299 33.8...
>>> df_geom.dtypes
id object
g geometry
dtype: objectf
We can inspect the metadata of this GeoParquet file using PyArrow. The metadata keyed by geo
stores information on the geometry columns:
>>> import pyarrow as pa
>>> import pyarrow.parquet
>>> import json
>>> table = pa.parquet.read_table('./sample.parquet')
>>> metadata_str = table.schema.metadata[b'geo'].decode('utf-8')
>>> metadata = json.loads(metadata_str)
>>> print(json.dumps(metadata, indent=2))
{
"primary_column": "g",
"columns": {
"g": {
"encoding": "WKB",
"crs": null,
"geometry_type": "Polygon",
"bbox": [
-112.473156,
33.179649,
-111.502991,
33.868652
]
}
},
"version": "0.4.0",
"creator": {
"library": "geopandas",
"version": "0.12.2"
}
}
The metadata designated that column g
is a primary geometry column encoded as WKB. There are some additional informations about this column such as geometry types and CRS of geometries in this column. Readers can refer to the column metadata section of the specification to understand how to interpret these information.
There is an interesting metadata for geometry columns keyed by bbox
. It is the minimum bounding box of all the geometries in this column. This metadata is useful for skipping unrelated GeoParquet files when user runs a spatial range query on a dataset made up of a bunch of GeoParquet files. For example, the query window shown as the red rectangle only overlaps with the bounding box of part1.parquet and part2.parquet, so the query engine can safely skip scanning part3.parquet and part4.parquet, reducing the IO cost and answer the query faster.
Working with GeoParquet in Apache Sedona
Apache Sedona 1.4.1 supports reading and writing GeoParquet files on Apache Spark 3.0 ~ 3.4. We’ll go through an example to demonstrate the usage of GeoParquet support of Apache Sedona, and how to speed up your spatial queries using GeoParquet.
Saving DataFrames to GeoParquet Files
We’ll use the TIGER census 2022 Shapefile dataset in our examples. Readers can navigate to https://www2.census.gov/geo/tiger/TIGER2022/TABBLOCK20/ and download the datasets. We’ve already prepared the data for Arizona state in our public S3 bucket so we can simply run this code to load the Shapefiles.
spatialRdd = ShapefileReader.readToGeometryRDD(sc, "s3a://wherobots-examples/data/census-2022/tl_2022_04_tabblock20")
df = Adapter.toDf(spatialRdd, spark)
df.show(5)
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
| geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20| GEOID20| NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20| ALAND20|AWATER20| INTPTLAT20| INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|POLYGON ((-109.80...| 04| 009| 961100| 1175|040099611001175|Block 1175| G5040| R| | | S| 0| 57613|+32.9046017|-109.8078124| 0| 0|
|POLYGON ((-110.44...| 04| 009| 961600| 1025|040099616001025|Block 1025| G5040| R| | | S|100235475| 8124|+32.8481268|-110.3910394| 3| 2|
|POLYGON ((-109.26...| 04| 003| 000501| 1186|040030005011186|Block 1186| G5040| R| | | S| 28230312| 67735|+31.5716354|-109.2328547| 0| 0|
|POLYGON ((-109.18...| 04| 003| 000501| 1195|040030005011195|Block 1195| G5040| R| | | S| 12013891| 451563|+31.5362891|-109.1563449| 0| 0|
|POLYGON ((-113.35...| 04| 005| 002301| 3702|040050023013702|Block 3702| G5040| R| | | S| 118622| 1600804|+35.9649737|-113.3170363| 0| 0|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
Now we can simply save the DataFrame as GeoParquet using df.write.format("geoparquet").save
:
df.write.format("geoparquet").save(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
Loading DataFrames from GeoParquet Files
We can call sedona.read.format("geoparquet").load
to load DataFrame from GeoParquet files. The geometry columns will be decoded as geometry objects automatically.
df_geoparquet = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
df_geoparquet.show(5)
df_geoparquet.printSchema()
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
| geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20| GEOID20| NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20| ALAND20|AWATER20| INTPTLAT20| INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
|POLYGON ((-109.80...| 04| 009| 961100| 1175|040099611001175|Block 1175| G5040| R| | | S| 0| 57613|+32.9046017|-109.8078124| 0| 0|
|POLYGON ((-110.44...| 04| 009| 961600| 1025|040099616001025|Block 1025| G5040| R| | | S|100235475| 8124|+32.8481268|-110.3910394| 3| 2|
|POLYGON ((-109.26...| 04| 003| 000501| 1186|040030005011186|Block 1186| G5040| R| | | S| 28230312| 67735|+31.5716354|-109.2328547| 0| 0|
|POLYGON ((-109.18...| 04| 003| 000501| 1195|040030005011195|Block 1195| G5040| R| | | S| 12013891| 451563|+31.5362891|-109.1563449| 0| 0|
|POLYGON ((-113.35...| 04| 005| 002301| 3702|040050023013702|Block 3702| G5040| R| | | S| 118622| 1600804|+35.9649737|-113.3170363| 0| 0|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
only showing top 5 rows
root
|-- geometry: geometry (nullable = true)
|-- STATEFP20: string (nullable = true)
|-- COUNTYFP20: string (nullable = true)
|-- TRACTCE20: string (nullable = true)
|-- BLOCKCE20: string (nullable = true)
|-- GEOID20: string (nullable = true)
|-- NAME20: string (nullable = true)
|-- MTFCC20: string (nullable = true)
|-- UR20: string (nullable = true)
|-- UACE20: string (nullable = true)
|-- UATYPE20: string (nullable = true)
|-- FUNCSTAT20: string (nullable = true)
|-- ALAND20: string (nullable = true)
|-- AWATER20: string (nullable = true)
|-- INTPTLAT20: string (nullable = true)
|-- INTPTLON20: string (nullable = true)
|-- HOUSING20: string (nullable = true)
|-- POP20: string (nullable = true)
As we can see, the geometry
column was read back as a geometry column. Users can directly apply ST functions on this column without first parsing it from WKB.
Interoperability with GeoPandas
The primary goal of GeoParquet is defining a standard so that we can exchange parquet files containing geospatial data amongst various geospatial libraries and platforms. For instance, the GeoParquet files saved by Apache Sedona could be loaded by GeoPandas and vice versa. We can load the GeoParquet files we saved in the previous section using GeoPandas and see that the geometry column was correctly loaded.
gpd_df = geopandas.read_parquet(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
gpd_df.head()
geometry STATEFP20 COUNTYFP20 TRACTCE20 BLOCKCE20 GEOID20 NAME20 MTFCC20 UR20 UACE20 UATYPE20 FUNCSTAT20 ALAND20 AWATER20 INTPTLAT20 INTPTLON20 HOUSING20 POP20
0 POLYGON ((-109.80930 32.90518, -109.80928 32.9... 04 009 961100 1175 040099611001175 Block 1175 G5040 R S 0 57613 +32.9046017 -109.8078124 0 0
1 POLYGON ((-110.44854 32.76838, -110.44853 32.7... 04 009 961600 1025 040099616001025 Block 1025 G5040 R S 100235475 8124 +32.8481268 -110.3910394 3 2
2 POLYGON ((-109.26002 31.57156, -109.25791 31.5... 04 003 000501 1186 040030005011186 Block 1186 G5040 R S 28230312 67735 +31.5716354 -109.2328547 0 0
3 POLYGON ((-109.18276 31.55277, -109.14511 31.5... 04 003 000501 1195 040030005011195 Block 1195 G5040 R S 12013891 451563 +31.5362891 -109.1563449 0 0
4 POLYGON ((-113.35416 36.04097, -113.35416 36.0... 04 005 002301 3702 040050023013702 Block 3702 G5040 R S 118622 1600804 +35.9649737 -113.3170363 0 0
gpd_df.plot()
Load all Census 2022 Data
The entire census 2022 dataset takes 8 GB and contains 8 million polygons. We’ve already stored all the data in a single GeoParquet file. Readers can directly load data from our GeoParquet file stored on our public S3 bucket. It is recommended to run the above query on Spark clusters with good network connection to AWS S3, otherwise you can download the dataset from here and store it in somewhere you have fast access to.
df_census_2022 = sedona.read.format("geoparquet").load("s3a://wherobots-examples/data/census-2022-geoparquet.snappy.parquet")
df_census_2022.show(5)
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
| geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20| GEOID20| NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|ALAND20|AWATER20| INTPTLAT20| INTPTLON20|HOUSING20|POP20|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
|POLYGON ((-91.968...| 22| 083| 970600| 3076|220839706003076|Block 3076| G5040| R| | | S|1806215| 0|+32.2045100|-091.9584632| 0| 0|
|POLYGON ((-79.872...| 42| 129| 805200| 1000|421298052001000|Block 1000| G5040| U| 58162| U| S| 401352| 0|+40.1526521|-079.8673403| 68| 158|
|POLYGON ((-95.451...| 48| 339| 693103| 2013|483396931032013|Block 2013| G5040| U| 87300| U| S| 12340| 0|+30.3059421|-095.4503930| 9| 43|
|POLYGON ((-94.345...| 29| 037| 060904| 3004|290370609043004|Block 3004| G5040| U| 37189| U| S| 29419| 0|+38.6592501|-094.3441431| 18| 31|
|POLYGON ((-87.965...| 55| 089| 640102| 3027|550896401023027|Block 3027| G5040| U| 34122| U| S| 3807| 0|+43.3208487|-087.9652632| 4| 3|
+--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+
only showing top 5 rows
Query the dataset
Now let’s run some spatial range queries on this dataset. Here we define a range query to fetch records within the Phoenix city:
spatial_filter = "ST_Within(geometry, ST_PolygonFromEnvelope(-112.473156, 33.179649, -111.502991, 33.868652))"
We can run the query and count the result set, and use %%time
to measure time elapsed running this query:
%%time
df_census_2022.where(spatial_filter).count()
CPU times: user 15.3 ms, sys: 12.6 ms, total: 27.9 ms
Wall time: 1min 10s
Out[18]: 57812
The query took more than 1 minute to finish, and the result set contains 57812 rows. Now let’s explore ways to redistribute the dataset to achieve lower query latency.
Redistribute the dataset for faster spatial queries
GeoParquet files has bbox
metadata for geometry columns. If we partition the dataset to multiple GeoParquet files such that geometry objects that are close to each other were stored in the same GeoParquet file, then each GeoParquet file will cover only a small portion of the entire space. We only need to scan the GeoParquet files whose bbox
overlaps with the query window and skip scanning the rest. This optimization is called “filter pushdown” and it was already implemented by Apache Sedona since 1.4.0. In this section we explore different ways to repartition the dataset to speed up spatial range queries.
Partition by State
Census 2022 dataset comes up with a STATEFP20
column, which is the state ID of the record. We can partition the dataset by this field so that records for the same state were grouped together.
df_census_2022.repartition("STATEFP20").write.partitionBy('STATEFP20').format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_statefp20.parquet')
df_census_2022_part_statefp20 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_statefp20.parquet")
Now we can run the same spatial range query. Note that we don’t need to manually add a predicate for the partitioning field or manually figure out which state the query window overlaps with. Apache Sedona will prune GeoParquet files according to their bbox
metadata.
%%time
df_census_2022_part_statefp20.where(spatial_filter).count()
CPU times: user 18.9 ms, sys: 0 ns, total: 18.9 ms
Wall time: 7.3 s
Out[17]: 57812
Now the query took less than 8 seconds. Let’s visualize the bounding boxes of each GeoParquet files to see the distribution of the partitioned dataset.
Partition by S2
If your dataset does not have a field such as STATEFP20
to directly partition by, you can partition the dataset by the S2 cell ID of the geometry. This requires selecting a proper S2 resolution level suited for your dataset. Here we repartition the dataset by the level-4 S2 cell ID of the geometry values.
df_census_2022_s2 = df_census_2022.withColumn("s2", expr("array_max(ST_S2CellIds(ST_Centroid(geometry), 4))"))
df_census_2022_s2.repartition("s2").write.partitionBy("s2").format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_s2.parquet')
df_census_2022_part_s2 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_s2.parquet")
Let’s run the same query on this dataset partitioned by S2. Note that we don’t need to add a predicate for the s2
partitioning column.
%%time
df_census_2022_part_s2.where(spatial_filter).count()
CPU times: user 15.3 ms, sys: 856 µs, total: 16.2 ms
Wall time: 8.41 s
Out[16]: 57812
The bounding boxes of each GeoParquet files looks like this.
Sort by the GeoHash of geometry
An alternative approach is to sort the dataset by the GeoHash of the geometry values. The way GeoHash is encoded makes geometry objects that are close to each other has similar GeoHash values. We can utilize this property to redistribute our datasets for faster queries:
num_geoparquet_files = 100
sedona.conf.set("spark.sql.shuffle.partitions", num_geoparquet_files)
sedona.conf.set("spark.sql.adaptive.enabled", "false")
df_census_2022 = df_census_2022.withColumn("geohash", expr("ST_GeoHash(ST_Centroid(geometry), 20)"))\
.sort(col("geohash"))\
.drop("geohash")
df_census_2022.write.format("geoparquet").save(DATA_ROOT_PATH + "/census_2022_sorted.parquet")
sedona.conf.set("spark.sql.adaptive.enabled", "true")
Running the same query again, we can see it finishes in roughly 10 seconds.
%%time
df_census_2022_sorted = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_sorted.parquet")
df_census_2022_sorted.where(spatial_filter).count()
CPU times: user 12.4 ms, sys: 6.42 ms, total: 18.8 ms
Wall time: 11.7 s
Out[18]: 57812
The bounding boxes of each GeoParquet files looks like this. Note that although the bounding boxes of files significantly overlaps with each other, we can still prune most of the files when running spatial queries.
The Take-Away
Apache Sedona supports reading and writing GeoParquet files – A variant of Parquet for exchanging spatial data. Apache Sedona can perform spatial filter pushdown when running spatial range queries on GeoParquet files. You can group your GeoParquet dataset by spatial proximity to achieve faster spatial query performance.