TABLE OF CONTENTS

    This post is a hands-on look at offshore ocean infrastructure and industrial vessel activity with Apache Sedona in Wherobots Cloud using data from Global Fishing Watch. We also see how GeoParquet can be used with this data to improve the efficiency of data retrieval and enable large-scale geospatial visualization.

    Global Fishing Watch Overview

    Earlier this month researchers affiliated with Global Fishing Watch published a study in the journal Nature that used machine learning and satellite imagery to reveal that 75 percent of the world’s industrial fishing vessels are not publicly tracked while 25 percent of transport and energy vessels are not publicly tracked. The researchers analyzed 2 million gigabytes of satellite radar and optical imagery to detect vessels and offshore infrastructure and then attempted to match these identified vessels with data from public ship tracking systems. You can find their study here along with links to all code and data used in the project.

    Global Fishing Watch is a global non-profit organization focused on creating and sharing knowledge about human activity in the oceans to ensure fair and sustainable usage of the world’s oceans. Their focus is on using big data solutions to analyze this activity (much of it from satellite imagery) and make it publicly available through a variety of open data and data products. You can find much of their open data freely available for download here.

    Exploring Global Fishing Watch Public Data With Apache Sedona

    In this post we will focus on working with the data published by Global Fishing Watch as a result of the study mentioned above which you can find in the dataset listing for "Paolo et al. (2024). Satellite mapping reveals extensive industrial activity at sea". To follow along, first create a free account in Wherobots Cloud then download the Global Fishing Watch data linked above. The dataset for this project includes two CSV files:

    • offshore_infrastructure_v20231106.csv 85MB – contains offshore infrastructure identified by satellite imagery and a machine learning algorithm, including the type of infrastructure (oil, wind, etc) and location
    • industrial_vessels_v20231013.csv 6.09GB – contains vessels detected in satellite imagery and classified as fishing or non-fishing, the location of the vessel, and if the vessel was matched to public AIS records (a system for reporting and tracking ship movements)

    We can download these data files and upload to Wherobots Cloud via the file browser, as shown in this image (note that I’ve uploaded a number of other files from Global Fishing Watch projects):

    Wherobots Cloud file browser

    By uploading data to Wherobots Cloud this way it will be private to our user in Wherobots Cloud and hosted in AWS S3. We can click the clipboard icon to the right of each file to retrieve the S3 URL for each file.

    Offshore Infrastructure

    Let’s start by loading the offshore infrastructure data (offshore_infrastructure_v20231106.csv). First, we’ll load this into a DataFrame.

    offshore_infra_df = sedona.read.format('csv'). \
        option('header','true').option('delimiter', ','). \
        load(S3_URL_INFRASTRUCTURE)
    
    offshore_infra_df.show(5)

    We can view the first few rows to get a sense of what data is included.

    +------------+--------------+--------------+----------------+-----------------+
    |structure_id|composite_date|         label|             lat|              lon|
    +------------+--------------+--------------+----------------+-----------------+
    |      115958|    2019-09-01|lake_maracaibo|9.71025137100954|-71.0518565233426|
    |      115958|    2018-12-01|lake_maracaibo|9.71025137100954|-71.0518565233426|
    |      115958|    2019-07-01|lake_maracaibo|9.71025137100954|-71.0518565233426|
    |      115958|    2018-06-01|lake_maracaibo|9.71025137100954|-71.0518565233426|
    |      115958|    2020-08-01|lake_maracaibo|9.71025137100954|-71.0518565233426|
    +------------+--------------+--------------+----------------+-----------------+
    only showing top 5 rows

    This dataset (the smaller of the two) has 1.4 million observations.

    offshore_infra_df.count()
    -------------------------------------
    1441242
    

    Next, we convert the string values of lat and lon to a Point geometry using the ST_POINT Spatial SQL function. Note that since the CSV format does not include a schema or types we must explicitly cast the types of any non-string values.

    offshore_infra_df = offshore_infra_df.selectExpr(
        'ST_POINT(CAST(lon AS float), CAST(lat AS float)) AS location',
        'structure_id',
        'label',
        'CAST(composite_date AS date) AS composite_date'
    )
    offshore_infra_df.createOrReplaceTempView("infrastructure")
    offshore_infra_df.show(5)

    We also cast composite_date to a date type.

    +--------------------+------------+--------------+--------------+
    |            location|structure_id|         label|composite_date|
    +--------------------+------------+--------------+--------------+
    |POINT (-71.051856...|      115958|lake_maracaibo|    2019-09-01|
    |POINT (-71.051856...|      115958|lake_maracaibo|    2018-12-01|
    |POINT (-71.051856...|      115958|lake_maracaibo|    2019-07-01|
    |POINT (-71.051856...|      115958|lake_maracaibo|    2018-06-01|
    |POINT (-71.051856...|      115958|lake_maracaibo|    2020-08-01|
    +--------------------+------------+--------------+--------------+
    only showing top 5 rows

    We can count the number of observations for each classification label using a GROUP BY operation.

    sedona.sql("SELECT COUNT(*) AS num, label FROM infrastructure GROUP BY label").show()

    We can see that "oil" is the most common classification for observed offshore infrastructure in this dataset.

    +------+--------------+
    |   num|         label|
    +------+--------------+
    |518138|           oil|
    | 42358|  possible_oil|
    |  8169| probable_wind|
    |  1082| possible_wind|
    | 23101|  probable_oil|
    |288390|lake_maracaibo|
    |436659|          wind|
    |123345|       unknown|
    +------+--------------+

    This dataset represents a time series of observations from 2017-2021 so individual pieces of infrastrucutre may be included mulitple times. Let’s see how many distinct structures are included in the data.

    sedona.sql("""
        WITH distinct_ids AS (SELECT DISTINCT(structure_id) FROM infrastructure) 
        SELECT COUNT(*) FROM distinct_ids
    """).show()
    --------------------
    39979 

    With just under 40,000 structures we should be able to visualize them using SedonaKepler. Let’s first combine some of the classification labels to simplify the visualization.

    distinct_infra_df = sedona.sql("""
        WITH grouped AS (
            SELECT structure_id, location, 
                CASE WHEN label IN ("possible_oil", "probable_oil", "oil") THEN 'oil' 
                    WHEN label IN ("possible_wind", "probably_wind", "wind") THEN "wind" 
                    WHEN label IN ("unknown") THEN "unknown" 
                    ELSE 'other' END AS label FROM infrastructure)
        SELECT any_value(location) AS location, any_value(label) AS label, structure_id FROM grouped
        GROUP BY structure_id
    """)

    To create visualizations using SedonaKepler we can pass the DataFrame to the create_map method:

    SedonaKepler.create_map(distinct_infra_df, name="Offshore Infrastructure")

    Here we can see the distribution of offshore infrastructure structures observed in the study.

    Offshore infrastructure

    Now that we have a sense of the scale of the offshore structures identified in the study, let’s explore the industrial vessel traffic dataset.

    Industrial Vessels

    First, we’ll load the industrial vessel CSV that we uploaded to Wherobots Cloud and count the number of rows.

    industrial_vessel_df = sedona.read.format('csv'). \
        option('header','true').option('delimiter', ','). \
        load(S3_URL_INDUSTRIAL)
    
    industrial_vessel_df.count()
    -----------------------------------
    23088625

    We have just over 23 million observations. We can print the schema of the DataFrame to get a sense of the data included.

    industrial_vessel_df.printSchema()
    root
     |-- timestamp: string (nullable = true)
     |-- detect_id: string (nullable = true)
     |-- scene_id: string (nullable = true)
     |-- lat: string (nullable = true)
     |-- lon: string (nullable = true)
     |-- mmsi: string (nullable = true)
     |-- length_m: string (nullable = true)
     |-- matching_score: string (nullable = true)
     |-- fishing_score: string (nullable = true)
     |-- matched_category: string (nullable = true)
     |-- overpasses_2017_2021: string (nullable = true)

    Similarly to the previous file we loaded, all types are strings so we’ll need to cast numeric and date fields explicitly, as well as convert the individual latitude and longitude values to a point geometry.

    industrial_vessel_df = industrial_vessel_df.selectExpr(
        'ST_Point(CAST(lon AS float), CAST(lat AS float)) AS location', 
        'CAST(timestamp AS Timestamp) AS timestamp', 
        'detect_id', 
        'scene_id', 
        'mmsi', 
        'CAST(length_m AS float) AS length_m', 
        'CAST(matching_score AS float) AS matching_score', 
        'CAST(fishing_score AS float) AS fishing_score', 
        'matched_category', 
        'CAST(overpasses_2017_2021 AS integer) AS overpasses_2017_2021')
    industrial_vessel_df.createOrReplaceTempView('iv')

    The data includes a matched_category field which indicates if the vessel identified was matched to the public vessel tracking system (AIS) and also if the vessel was classified as fishing or non-fishing.

    sedona.sql("""
    SELECT COUNT(*) AS num, matched_category 
    FROM iv
    GROUP BY matched_category
    """).show()

    Over 9 million vessel observations were not matched to the public vessel tracking system, indicating that many of these vessels may not have been not properly recording their location information.

    +--------+------------------+
    |num|  matched_category|
    +--------+------------------+
    | 2864709|   matched_fishing|
    |  803624|   matched_unknown|
    | 9187828|         unmatched|
    |10232464|matched_nonfishing|
    +--------+------------------+

    Let’s visualize an aggregation of the dataset to get a sense of the spatial distribution of the data. To do this we’ll use H3 hexagons overlaid over the area covered by the vessel observations. We will count the number of vessel observations in each hexagon’s area, and then visualize the results.

    To generate the H3 hexagons we’ll make use of two Spatial SQL functions:

    • ST_H3CellIDs – return an array of H3 cell ids that cover the given geometry at the specified resolution level
    • ST_H3ToGeom – return the H3 hexagon geometry for a given array of H3 cell ids

    You can read more about Sedona’s suppport for H3 in this article.

    h3_industrial_df = sedona.sql("""
    SELECT 
        COUNT(*) AS num, 
        ST_H3ToGeom(ST_H3CellIDs(location, 2, false)) as geometry, 
        ST_H3CellIDs(location, 2, false) as h3 
    FROM iv
    GROUP BY h3
    """)

    We can now visualize this aggregated data as a choropleth map using SedonaKepler.

    SedonaKepler.create_map(h3_industrial_df, name="Industrial Vessels" )

    Industrial vessel activity

    Note the distorted size of the hexagons in more extreme latitudes. This is an artifact of the Web Mercator map projection. When visualizing choropleth visualizations like this we typically instead want to choose a map projection that preserves the area of each hexagon so we can more easily interpret the spatial distribution of the choropleth.

    To do this we can convert our DataFrame to a GeoDataFrame, specifying an Equal Earth projection then using matplotlib to visualize the GeoDataFrame.

    h3_gdf = geopandas.GeoDataFrame(h3_industrial_df.toPandas(), geometry='geometry', crs="4326")
    h3_gdf = h3_gdf.to_crs(epsg=8857)
    
    ax = h3_gdf.plot(
        column="num",
        scheme="JenksCaspall",
        cmap="YlOrRd",
        legend=False,
        legend_kwds={"title": "Number of Observed Industrial Activities", "fontsize": 6, "title_fontsize": 6},
        figsize=(24,18)
    )
    
    ax.set_axis_off()

    Observed industrial activities

    Another option for visualization is to use the contextily package to include a basemap layer, but the downside is we must revert to the Web Mercator projection.

    Here we visualize not just the count of overall vessel observations, but specifically those unmatched to the public tracking system. This gives us an indication of areas where illicit fishing might be concentrated.

    h3_unmatched_gdf = geopandas.GeoDataFrame(h3_unmatched_df.toPandas(), geometry='geometry', crs="4326")
    h3_unmatched_gdf = h3_unmatched_gdf.to_crs(epsg=3857)
    
    ax = h3_unmatched_gdf.plot(
        column="num",
        scheme="JenksCaspall",
        cmap="YlOrRd",
        legend=True,
        legend_kwds={"title": "Number of Unmatched Industrial Activities", "fontsize": 6, "title_fontsize": 6},
        figsize=(12,8)
    )
    
    cx.add_basemap(ax,source=cx.providers.OpenStreetMap.Mapnik)
    ax.set_axis_off()

    Unmatched industrial vessels

    Now that we’ve explored the data, let’s see how we can improve the performance and efficiency of working with this data as GeoParquet.

    GeoParquet

    GeoParquet is a standard that specifies how to store geospatial data in the Apache Parquet file format. The goals of GeoParquet are largely to extend the benefits of Parquet to the geospatial ecosystem and enable efficient workflows for working with geospatial attributes in columnar data.

    Parquet is a popular columnar file format that enables efficient data storage and efficient retrieval. Efficient data storage is achieved using various compression and encoding methods while efficient retrieval is achieved by chunking the data in row groups and storing metadata about each row group that allows for only selecting subsets of the data within the scope of a predicate (this is known as predicate pushdown) and by storing column values next to each other allowing for retrieval of only the columns necessary to complete the query. Additionally, Parquet files can be partitioned across many individual files, further allowing query engines to ignore Parquet files outside the range of the query. These features make Parquet a natural choice for efficiently storing data in cloud object stores like AWS S3.

    By specifying how to store geospatial data in the Parquet format, GeoParquet takes advantage of all the benefits of Parquet, with the added bonus of supporting interoperability within the geospatial data ecosystem.

    Let’s look at how we can take advantage of these two benefits of GeoParquet: efficient data storage and efficient retrieval.

    Efficient Data Storage With GeoParquet

    First, let’s compare the file sizes of the original dataset (CSV) with the same data in the GeoJSON and GeoParquet formats. We can save the data as GeoJSON using Sedona:

    industrial_vessel_df.repartition(1).write. \
        mode("overwrite"). \
        format("geojson"). \
        save(S3_URL_DATA_ROOT + "globalfishingwatch_industrial_vessels.json")

    Note that we repartition the data to save as a single file.

    And now to create a single un-partitioned GeoParquet file for comparison:

    industrial_vessel_df.repartition(1).write. \
        mode("overwrite"). \
        format("geoparquet"). \
        save(S3_URL_DATA_ROOT + "globalfishingwatch_industrial_vessels.json")

    The relative file sizes are:

    • CSV: 6.1 GB
    • GeoJSON: 9 GB
    • GeoParquet: 2.4 GB

    The same data stored as GeoParquet takes up 60% less space than the CSV equivalent and 73% less storage space than the same data stored as GeoJSON. This translates to faster query times when less data is transferred over the network.

    Much of this efficiency is due to the GeoParquet specification that geometries are serialized using WKB, which are then encoded and compressed using the built-in compression functionality of Parquet.

    Efficient Retrieval With Partitioned GeoParquet

    A common technique when working with Parquet files is to partition the Parquet files based on the value of some attribute. When working with GeoParquet this can be done using an administrative boundary (such as by state or country). In cases where it doesn’t make sense to partition by administrative boundary, we can instead partition using a spatial index such as S2 or H3.

    By partitioning our data using a spatial index we can attain efficient retrieval by leveraging another feature of the GeoParquet specification: each GeoParquet file stores the bounding box of the geometries in the file in the column metadata. This means that at query time when executing a spatial filter query, the query engine can check the bounding box for each partitioned GeoParquet file and quickly exclude those falling outside the spatial filter bounds. This is known as spatial predicate pushdown. Let’s see it in action.

    First, we will partition our data using the S2 spatial index and save as partitioned GeoParquet across multiple files (one file for each S2 cell):

    industrial_vessel_df_pq = industrial_vessel_df_pq.withColumn("s2", expr("array_max(ST_S2CellIds(location, 2))"))
    industrial_vessel_df_pq.repartition("s2").write. \
        mode("overwrite"). \
        partitionBy("s2"). \
        format("geoparquet"). \
        save(S3_URL_DATA_ROOT + "globalfishingwatch_industrial_vessels_s2part.parquet")
    
    industrial_vessel_df_pq_part_s2 = sedona.read.format("geoparquet").load(S3_URL_DATA_ROOT + "globalfishingwatch_industrial_vessels_s2part.parquet")

    Now let’s compare the performance of these various file formats by executing a spatial filter query.

    Spatial Filter Performance Comparison

    Let’s query for the number of observations in the area around the Gulf of Mexico. I drew a simple polygon to capture this area, which can be represented in WKT format as:

    POLYGON ((-84.294662 29.840644, -88.952866 30.297018, -89.831772 28.767659, -94.050522 29.61167, -97.038803 27.839076, -97.917709 21.453069, -94.489975 18.479609, -86.843491 21.616579, -80.779037 24.926295, -84.294662 29.840644))

    Gulf of Mexico

    Next, we’ll define a spatial predicate using the ST_Within Spatial SQL function.

    gulf_filter = 'ST_Within(location, ST_GeomFromWKT("POLYGON ((-84.294662 29.840644, -88.952866 30.297018, -89.831772 28.767659, -94.050522 29.61167, -97.038803 27.839076, -97.917709 21.453069, -94.489975 18.479609, -86.843491 21.616579, -80.779037 24.926295, -84.294662 29.840644))"))'

    Now we’ll test the performance using the CSV version of the data:

    %%time
    industrial_vessel_df.where(gulf_filter).count()
    ----------------------------------------------------------
    217508 (16.2 s)

    Note that this time includes loading the entire CSV file, converting the values from string to geometry types, then performing the spatial filter without using an index.

    Next, we compare the performance of executing the same spatial filter but this time using our single Parquet file.

    industrial_vessel_df_pq = sedona.read. \
        format('geoparquet'). \
        load(S3_URL_DATA_ROOT + "globalfishingwatch_industrial_vessels.parquet")
    
    %%time
    industrial_vessel_df_pq.where(gulf_filter).count()
    ---------------------------------------------------------------
    217508 (4.91 s)

    The significant improvement here was due to loading less data over the network and more efficient deserialization of the geometry values from WKB.

    Next, let’s compare the performance of using the partitioned GeoParquet. Sedona will be able to exclude most GeoParquet files based on their bounding box metadata significantly reducing both the data loaded over the network but also the number of rows to filter using the spatial predicate.

    industrial_vessel_df_pq_part_s2 = sedona.read. \
        format("geoparquet"). \
        load(S3_URL_DATA_ROOT + "globalfishingwatch_industrial_vessels_s2part.parquet")
    
    %%time
    industrial_vessel_df_pq_part_s2.where(gulf_filter).count()
    ------------------------------------------------------------------------
    217508  (1.87s)

    To summarize, the time it takes to load the data and execute the spatial filter for each file type:

    • CSV: 16.2s
    • Single GeoParquet file: 4.91s
    • Partitioned GeoParquet: 1.87s

    Thanks to spatial predicate pushdown enabled by GeoParquet and Apache Sedona , the partitioned GeoParquet query takes 88% less time.

    Another exciting benefit of GeoParquet is the types of integrations throughout the geospatial data ecosystem that it enables, such as with visualization tools.

    Scalable Geospatial Visualization With Lonboard

    Matched vs unmatched vessels

    The GeoParquet specification is being developed alongside the GeoArrow specification, which enables efficient in-memory columnar representation of geometries and enables efficiently transporting in-memory geospatial data between tools.

    One thing this type of zero-copy in-process geospatial data transport can enable is efficient geospatial data visualization by efficiently transporting data to the GPU for fast visualization rendering.

    A current proposal to the GeoParquet specification includes GeoArrow as a serialization format option (in addition to WKB mentioned earlier).

    Lonboard is a relatively new Python package from the folks at Development Seed that takes advantage of GeoParquet and GeoArrow to render large scale visualizations containing millions of geometries in seconds. Since GeoArrow is not compressed it doesn’t need to be parsed before being used and the geometries can be efficiently moved to the GPU for rendering.

    Note that since for visualization data is moved to the web browser and the GPU of our local machine, I downloaded the GeoParquet file locally and ran this example in a local Python notebook. If running in a cloud notebook environment the data will first need to be fetched over the network so the performance impacts won’t be as significant.

    Let’s see how we can use Lonboard to visualize the entirety of our industrial vessel dataset (20+ million points). First, we’ll create a GeoDataFrame loading our GeoParquet data.

    url = "../data/industrial_vessels.parquet"
    gdf = gpd.read_parquet(url)

    We’ll visualize matched and unmatched vessels distinctly, so we’ll split them into two different GeoDataFrames and two layers in Lonboard. We’ll color matched vessels in green and unmatched in red.

    unmatched_gdf = gdf.loc[gdf['matched_category']=='unmatched']
    matched_gdf = gdf.loc[gdf['matched_category']=='matched_fishing']
    
    unmatched_layer = ScatterplotLayer.from_geopandas(unmatched_gdf, get_fill_color = [200, 0, 0, 200])
    matched_layer = ScatterplotLayer.from_geopandas(matched_gdf, get_fill_color = [0, 200, 0, 200])
    map_ = Map(layers=[unmatched_layer, matched_layer])
    map_

    Lonboard visualization

    This visualization contains millions of point and renders in seconds in our Jupyter notebook while offering smooth panning and scrolling (since all points are rendered immediately). If you’ve ever tried to render visualizations of this scale using other tools in Jupyter this is pretty impressive!

    Resources

    Want to keep up with the latest developer news from the Wherobots and Apache Sedona community? Sign up for the This Month In Wherobots Newsletter: