Planetary-scale answers, unlocked.
A Hands-On Guide for Working with Large-Scale Spatial Data. Learn more.
You can now use Databricks for geospatial with Wherobots.
Authors
Apache Parquet is a modern columnar data file format optimized for analytical workload. It is widely adopted in the big data ecosystem. Spatial scientists may also want to use parquet to store spatial data, especially when analyzing large scale datasets on Spark or cloud data warehouses. Problems arise when storing spatial data to parquet files.
geom
geometry
GeoParquet is an incubating OGC standard for addressing these problems. It defines how to serialize geometry objects as parquet values and the file-level metadata for describing geometry columns. Implementations that follow this standard could easily exchange parquet files containing spatial data, and enjoy the benefits of parquet columnar format when running analytics on GeoParquet files.
The main idea of the GeoParquet specification is that it defines a standard schema for file-level metadata attached to the footer of parquet files. It defines which columns are for geometry data, and the encoding of geometry data in each column. For example, we can load a GeoParquet file using GeoPandas, and see that the column for storing geometry objects are interpreted as geometry field:
>>> import geopandas as gpd >>> df_geom = gpd.read_parquet('./sample.parquet') >>> df_geom id g 0 1 POLYGON ((-111.50299 33.17965, -111.50299 33.8... 1 2 POLYGON ((-111.50299 33.17965, -111.50299 33.8... >>> df_geom.dtypes id object g geometry dtype: objectf
We can inspect the metadata of this GeoParquet file using PyArrow. The metadata keyed by geo stores information on the geometry columns:
geo
>>> import pyarrow as pa >>> import pyarrow.parquet >>> import json >>> table = pa.parquet.read_table('./sample.parquet') >>> metadata_str = table.schema.metadata[b'geo'].decode('utf-8') >>> metadata = json.loads(metadata_str) >>> print(json.dumps(metadata, indent=2)) { "primary_column": "g", "columns": { "g": { "encoding": "WKB", "crs": null, "geometry_type": "Polygon", "bbox": [ -112.473156, 33.179649, -111.502991, 33.868652 ] } }, "version": "0.4.0", "creator": { "library": "geopandas", "version": "0.12.2" } }
The metadata designated that column g is a primary geometry column encoded as WKB. There are some additional informations about this column such as geometry types and CRS of geometries in this column. Readers can refer to the column metadata section of the specification to understand how to interpret these information.
g
There is an interesting metadata for geometry columns keyed by bbox. It is the minimum bounding box of all the geometries in this column. This metadata is useful for skipping unrelated GeoParquet files when user runs a spatial range query on a dataset made up of a bunch of GeoParquet files. For example, the query window shown as the red rectangle only overlaps with the bounding box of part1.parquet and part2.parquet, so the query engine can safely skip scanning part3.parquet and part4.parquet, reducing the IO cost and answer the query faster.
bbox
Apache Sedona 1.4.1 supports reading and writing GeoParquet files on Apache Spark 3.0 ~ 3.4. We’ll go through an example to demonstrate the usage of GeoParquet support of Apache Sedona, and how to speed up your spatial queries using GeoParquet.
We’ll use the TIGER census 2022 Shapefile dataset in our examples. Readers can navigate to https://www2.census.gov/geo/tiger/TIGER2022/TABBLOCK20/ and download the datasets. We’ve already prepared the data for Arizona state in our public S3 bucket so we can simply run this code to load the Shapefiles.
spatialRdd = ShapefileReader.readToGeometryRDD(sc, "s3a://wherobots-examples/data/census-2022/tl_2022_04_tabblock20") df = Adapter.toDf(spatialRdd, spark) df.show(5) +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+ | geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20| GEOID20| NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20| ALAND20|AWATER20| INTPTLAT20| INTPTLON20|HOUSING20|POP20| +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+ |POLYGON ((-109.80...| 04| 009| 961100| 1175|040099611001175|Block 1175| G5040| R| | | S| 0| 57613|+32.9046017|-109.8078124| 0| 0| |POLYGON ((-110.44...| 04| 009| 961600| 1025|040099616001025|Block 1025| G5040| R| | | S|100235475| 8124|+32.8481268|-110.3910394| 3| 2| |POLYGON ((-109.26...| 04| 003| 000501| 1186|040030005011186|Block 1186| G5040| R| | | S| 28230312| 67735|+31.5716354|-109.2328547| 0| 0| |POLYGON ((-109.18...| 04| 003| 000501| 1195|040030005011195|Block 1195| G5040| R| | | S| 12013891| 451563|+31.5362891|-109.1563449| 0| 0| |POLYGON ((-113.35...| 04| 005| 002301| 3702|040050023013702|Block 3702| G5040| R| | | S| 118622| 1600804|+35.9649737|-113.3170363| 0| 0| +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+
Now we can simply save the DataFrame as GeoParquet using df.write.format("geoparquet").save:
df.write.format("geoparquet").save
df.write.format("geoparquet").save(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet")
We can call sedona.read.format("geoparquet").load to load DataFrame from GeoParquet files. The geometry columns will be decoded as geometry objects automatically.
sedona.read.format("geoparquet").load
df_geoparquet = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet") df_geoparquet.show(5) df_geoparquet.printSchema() +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+ | geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20| GEOID20| NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20| ALAND20|AWATER20| INTPTLAT20| INTPTLON20|HOUSING20|POP20| +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+ |POLYGON ((-109.80...| 04| 009| 961100| 1175|040099611001175|Block 1175| G5040| R| | | S| 0| 57613|+32.9046017|-109.8078124| 0| 0| |POLYGON ((-110.44...| 04| 009| 961600| 1025|040099616001025|Block 1025| G5040| R| | | S|100235475| 8124|+32.8481268|-110.3910394| 3| 2| |POLYGON ((-109.26...| 04| 003| 000501| 1186|040030005011186|Block 1186| G5040| R| | | S| 28230312| 67735|+31.5716354|-109.2328547| 0| 0| |POLYGON ((-109.18...| 04| 003| 000501| 1195|040030005011195|Block 1195| G5040| R| | | S| 12013891| 451563|+31.5362891|-109.1563449| 0| 0| |POLYGON ((-113.35...| 04| 005| 002301| 3702|040050023013702|Block 3702| G5040| R| | | S| 118622| 1600804|+35.9649737|-113.3170363| 0| 0| +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+---------+--------+-----------+------------+---------+-----+ only showing top 5 rows root |-- geometry: geometry (nullable = true) |-- STATEFP20: string (nullable = true) |-- COUNTYFP20: string (nullable = true) |-- TRACTCE20: string (nullable = true) |-- BLOCKCE20: string (nullable = true) |-- GEOID20: string (nullable = true) |-- NAME20: string (nullable = true) |-- MTFCC20: string (nullable = true) |-- UR20: string (nullable = true) |-- UACE20: string (nullable = true) |-- UATYPE20: string (nullable = true) |-- FUNCSTAT20: string (nullable = true) |-- ALAND20: string (nullable = true) |-- AWATER20: string (nullable = true) |-- INTPTLAT20: string (nullable = true) |-- INTPTLON20: string (nullable = true) |-- HOUSING20: string (nullable = true) |-- POP20: string (nullable = true)
As we can see, the geometry column was read back as a geometry column. Users can directly apply ST functions on this column without first parsing it from WKB.
The primary goal of GeoParquet is defining a standard so that we can exchange parquet files containing geospatial data amongst various geospatial libraries and platforms. For instance, the GeoParquet files saved by Apache Sedona could be loaded by GeoPandas and vice versa. We can load the GeoParquet files we saved in the previous section using GeoPandas and see that the geometry column was correctly loaded.
gpd_df = geopandas.read_parquet(DATA_ROOT_PATH + "/tl_2022_04_tabblock20.parquet") gpd_df.head() geometry STATEFP20 COUNTYFP20 TRACTCE20 BLOCKCE20 GEOID20 NAME20 MTFCC20 UR20 UACE20 UATYPE20 FUNCSTAT20 ALAND20 AWATER20 INTPTLAT20 INTPTLON20 HOUSING20 POP20 0 POLYGON ((-109.80930 32.90518, -109.80928 32.9... 04 009 961100 1175 040099611001175 Block 1175 G5040 R S 0 57613 +32.9046017 -109.8078124 0 0 1 POLYGON ((-110.44854 32.76838, -110.44853 32.7... 04 009 961600 1025 040099616001025 Block 1025 G5040 R S 100235475 8124 +32.8481268 -110.3910394 3 2 2 POLYGON ((-109.26002 31.57156, -109.25791 31.5... 04 003 000501 1186 040030005011186 Block 1186 G5040 R S 28230312 67735 +31.5716354 -109.2328547 0 0 3 POLYGON ((-109.18276 31.55277, -109.14511 31.5... 04 003 000501 1195 040030005011195 Block 1195 G5040 R S 12013891 451563 +31.5362891 -109.1563449 0 0 4 POLYGON ((-113.35416 36.04097, -113.35416 36.0... 04 005 002301 3702 040050023013702 Block 3702 G5040 R S 118622 1600804 +35.9649737 -113.3170363 0 0
gpd_df.plot()
The entire census 2022 dataset takes 8 GB and contains 8 million polygons. We’ve already stored all the data in a single GeoParquet file. Readers can directly load data from our GeoParquet file stored on our public S3 bucket. It is recommended to run the above query on Spark clusters with good network connection to AWS S3, otherwise you can download the dataset from here and store it in somewhere you have fast access to.
df_census_2022 = sedona.read.format("geoparquet").load("s3a://wherobots-examples/data/census-2022-geoparquet.snappy.parquet") df_census_2022.show(5) +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+ | geometry|STATEFP20|COUNTYFP20|TRACTCE20|BLOCKCE20| GEOID20| NAME20|MTFCC20|UR20|UACE20|UATYPE20|FUNCSTAT20|ALAND20|AWATER20| INTPTLAT20| INTPTLON20|HOUSING20|POP20| +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+ |POLYGON ((-91.968...| 22| 083| 970600| 3076|220839706003076|Block 3076| G5040| R| | | S|1806215| 0|+32.2045100|-091.9584632| 0| 0| |POLYGON ((-79.872...| 42| 129| 805200| 1000|421298052001000|Block 1000| G5040| U| 58162| U| S| 401352| 0|+40.1526521|-079.8673403| 68| 158| |POLYGON ((-95.451...| 48| 339| 693103| 2013|483396931032013|Block 2013| G5040| U| 87300| U| S| 12340| 0|+30.3059421|-095.4503930| 9| 43| |POLYGON ((-94.345...| 29| 037| 060904| 3004|290370609043004|Block 3004| G5040| U| 37189| U| S| 29419| 0|+38.6592501|-094.3441431| 18| 31| |POLYGON ((-87.965...| 55| 089| 640102| 3027|550896401023027|Block 3027| G5040| U| 34122| U| S| 3807| 0|+43.3208487|-087.9652632| 4| 3| +--------------------+---------+----------+---------+---------+---------------+----------+-------+----+------+--------+----------+-------+--------+-----------+------------+---------+-----+ only showing top 5 rows
Now let’s run some spatial range queries on this dataset. Here we define a range query to fetch records within the Phoenix city:
spatial_filter = "ST_Within(geometry, ST_PolygonFromEnvelope(-112.473156, 33.179649, -111.502991, 33.868652))"
We can run the query and count the result set, and use %%time to measure time elapsed running this query:
%%time
%%time df_census_2022.where(spatial_filter).count() CPU times: user 15.3 ms, sys: 12.6 ms, total: 27.9 ms Wall time: 1min 10s Out[18]: 57812
The query took more than 1 minute to finish, and the result set contains 57812 rows. Now let’s explore ways to redistribute the dataset to achieve lower query latency.
GeoParquet files has bbox metadata for geometry columns. If we partition the dataset to multiple GeoParquet files such that geometry objects that are close to each other were stored in the same GeoParquet file, then each GeoParquet file will cover only a small portion of the entire space. We only need to scan the GeoParquet files whose bbox overlaps with the query window and skip scanning the rest. This optimization is called “filter pushdown” and it was already implemented by Apache Sedona since 1.4.0. In this section we explore different ways to repartition the dataset to speed up spatial range queries.
Census 2022 dataset comes up with a STATEFP20 column, which is the state ID of the record. We can partition the dataset by this field so that records for the same state were grouped together.
STATEFP20
df_census_2022.repartition("STATEFP20").write.partitionBy('STATEFP20').format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_statefp20.parquet') df_census_2022_part_statefp20 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_statefp20.parquet")
Now we can run the same spatial range query. Note that we don’t need to manually add a predicate for the partitioning field or manually figure out which state the query window overlaps with. Apache Sedona will prune GeoParquet files according to their bbox metadata.
%%time df_census_2022_part_statefp20.where(spatial_filter).count() CPU times: user 18.9 ms, sys: 0 ns, total: 18.9 ms Wall time: 7.3 s Out[17]: 57812
Now the query took less than 8 seconds. Let’s visualize the bounding boxes of each GeoParquet files to see the distribution of the partitioned dataset.
If your dataset does not have a field such as STATEFP20 to directly partition by, you can partition the dataset by the S2 cell ID of the geometry. This requires selecting a proper S2 resolution level suited for your dataset. Here we repartition the dataset by the level-4 S2 cell ID of the geometry values.
df_census_2022_s2 = df_census_2022.withColumn("s2", expr("array_max(ST_S2CellIds(ST_Centroid(geometry), 4))")) df_census_2022_s2.repartition("s2").write.partitionBy("s2").format("geoparquet").save(DATA_ROOT_PATH + '/census_2022_part_s2.parquet') df_census_2022_part_s2 = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_part_s2.parquet")
Let’s run the same query on this dataset partitioned by S2. Note that we don’t need to add a predicate for the s2 partitioning column.
s2
%%time df_census_2022_part_s2.where(spatial_filter).count() CPU times: user 15.3 ms, sys: 856 µs, total: 16.2 ms Wall time: 8.41 s Out[16]: 57812
The bounding boxes of each GeoParquet files looks like this.
An alternative approach is to sort the dataset by the GeoHash of the geometry values. The way GeoHash is encoded makes geometry objects that are close to each other has similar GeoHash values. We can utilize this property to redistribute our datasets for faster queries:
num_geoparquet_files = 100 sedona.conf.set("spark.sql.shuffle.partitions", num_geoparquet_files) sedona.conf.set("spark.sql.adaptive.enabled", "false") df_census_2022 = df_census_2022.withColumn("geohash", expr("ST_GeoHash(ST_Centroid(geometry), 20)"))\ .sort(col("geohash"))\ .drop("geohash") df_census_2022.write.format("geoparquet").save(DATA_ROOT_PATH + "/census_2022_sorted.parquet") sedona.conf.set("spark.sql.adaptive.enabled", "true")
Running the same query again, we can see it finishes in roughly 10 seconds.
%%time df_census_2022_sorted = sedona.read.format("geoparquet").load(DATA_ROOT_PATH + "/census_2022_sorted.parquet") df_census_2022_sorted.where(spatial_filter).count() CPU times: user 12.4 ms, sys: 6.42 ms, total: 18.8 ms Wall time: 11.7 s Out[18]: 57812
The bounding boxes of each GeoParquet files looks like this. Note that although the bounding boxes of files significantly overlaps with each other, we can still prune most of the files when running spatial queries.
Apache Sedona supports reading and writing GeoParquet files – A variant of Parquet for exchanging spatial data. Apache Sedona can perform spatial filter pushdown when running spatial range queries on GeoParquet files. You can group your GeoParquet dataset by spatial proximity to achieve faster spatial query performance.
Interested in learning more about Apache Sedona? Get access to this hands-on guide partnered with O’Reilly that provides real-world examples on practical solutions for the challenges of working with various types of geospatial data at scale.
Introducing RasterFlow: a planetary scale inference engine for Earth Intelligence
RasterFlow takes insights and embeddings from satellite and overhead imagery datasets into Apache Iceberg tables, with ease and efficiency at any scale.
Wherobots and Taylor Geospatial Engine Bring Fields-of-the-World Models to Production Scale
Agriculture depends on timely, reliable insight into what’s happening on the ground—what’s being planted, what’s being harvested, and how fields evolve over time. The Fields of The World (FTW) project was created to support exactly this mission, by building a fully open ecosystem of labeled data, software, standards and models to create a reliable global […]
How Aarden.ai Scaled Spatial Intelligence 300× Faster for Land Investments with Wherobots
When Aarden.ai emerged from stealth recently with $4M in funding to “empower landowners in data center and renewable energy deals,” the company joined a new wave of data and AI startups reimagining how physical-world data drives modern business. Their mission: help institutional land investors rapidly evaluate the value and potential uses of land across the country. […]
share this article
Awesome that you’d like to share our articles. Where would you like to share it to: