While the devil is in the details, there are broadly two ways to go about building a scalable geospatial analytics engine.

One is to write something that natively understands geospatial analytics, has specialized handling for geospatial indices and geospatial queries.

The other is to take a general purpose distributed processing engine and specialize it for geospatial processing.

One of my goals in writing Magellan was to show that "one size fits all" does indeed work for geospatial analytics at scale. That is, it is possible to take a general purpose query processing framework and adapt it for geospatial analytics and still achieve state of the art in geospatial query performance.

I chose Spark as the general purpose engine for two reasons: Spark's performance, maturity and adoption meant most if not all big data processing over time would move to Spark, making it easy to integrate geospatial analytics with the rest of the ETL/ analytics pipelines.

The other is Spark's Catalyst optimizer. Catalyst is the powerful, extensible optimizer at the heart of Spark SQL. Its extensibility is what allows Magellan to be written on top of Spark and still leverage all the benefits that the core engine provides like join reordering, cost based optimization, efficient memory layout, whole stage code generation etc.

While the journey is not complete yet, Magellan is at a point where it performs very well on geospatial analytics on large datasets. We can easily scale spatial joins to billions of points and hundreds of thousands of shapes on commodity hardware on modest clusters and have it execute in under a couple of minutes. That is some serious scale and performance!

In this blog post, we will take a peek under the hood of Magellan to understand how it operates at scale.

For the rest of this article, I am going to assume you are running an Apache Spark 2.2 cluster and Magellan version [1.0.5+].

We will be working with the NYC Taxicab dataset.

We will also use three different polygon datasets:

Dataset | Number of Polygons | Avg # of edges per polygon |
---|---|---|

NYC Neighborhoods | 310 | 104 |

NYC Census Blocks | 39192 | 14 |

NY Census Blocks | 350169 | 51 |

In the previous blog post, we were discussing a geospatial join of the form:

```
val joined = trips.join(neighborhoods)
.where($"point" within $"polygon")
```

Joining roughly **O(200 million)** points with the NYC Neighborhood dataset takes about **6 min** on a 5 node **r3 x.large** cluster, as discussed before.

What we did not consider previously was the question: how does the time consumed in this join depend on the size of the polygon dataset?

Redoing the analysis of the previous blog post on all three polygon datasets with the exact same cluster and Spark configuration gives us a better picture:

Dataset | Time taken (min) |
---|---|

NYC Neighborhoods | 6 |

NYC Census Blocks | 840 |

NY Census Blocks | > 4 days |

This is to be expected: the total time taken for the join is roughly proportional to the number of polygons in the dataset.

In the case of the naive join, the best we can do is to look at the bounding box of each polygon to decide whether a particular point has a chance of being inside. If the point is within the bounding box, we need to perform an expensive point in polygon test to determine if the point is actually inside the polygon.

Now that we have seen how the naive join works, let us do something less naive. Re run the analysis but before that, inject the following piece of code at the beginning:

```
magellan.Utils.injectRules(spark)
```

and provide a hint during the creation of the polygon dataframe what the index precision should be via:

```
spark.read.format("magellan").load($"path").index(30)
```

Dataset | Time taken (min) (5 x r3.xlarge) | Time taken (min) (5 x c3.4xlarge) |
---|---|---|

NYC Neighborhoods | 1.6 | 0.5 |

NYC Census Blocks | 3.5 | 0.9 |

NY Census Blocks | 5.2 | 1.7 |

Ok, this is much better: the time taken for the computation is nearly independent of the size of the polygon dataset, over a three order of magnitude scale in the size of the polygon dataset

So, what happened?

Briefly, Magellan injects a Spatial Join Rule into Catalyst, which ends up building spatial indices on the fly for both the point and polygon datasets. The join is then rewritten under the hood to use these indices and avoid an expensive cartesian join in the first place.

The index precision provided as hint to the data frame via `dataframe.index(precision)`

controls the granularity of the index.

To understand this better, let's start with how the point in polygon test works.

The basic problem we need to solve is: given a point and a polygon, how can we tell if the point is within the polygon or not?

There are many ways to solve this problem but perhaps the simplest is the so called ray casting algorithm.

The idea is very simple: Consider the polygon below, and two points, the red one outside the polygon and the blue one inside the polygon.

If we draw a ray from each point going out to infinity, the ray emanating from the point inside the polygon has to cross the polygon an odd number of times to escape, whereas the ray from the point outside has to cross an even number of times to escape.

So, by counting edge crossing modulo 2, we can determine if a point lies within the polygon or not.

If the number of edges of the polygon is **p**, the computational complexity of this algorithm is **O(p)**.

Without precomputing anything, and for a generic (non convex) polygon, the worst case complexity of the point in polygon test is also **O(p)** so to do better than the algorithm above, we either need convexity or we need the ability to precompute stuff about the polygon so that at query time we do less computation.

Convexity is usually not under our control, so the next best thing is to see what we can precompute in order to simplify the problem.

The first thing we can do is given a polygon, precompute the bounding box, ie. the smallest axis parallel rectangle that encloses the polygon.

Once this is done, every time we need to check if a point is within the polygon, we first check if the point is within the bounding box. If not, we know for sure the point cannot be within the polygon.

If the point is within the bounding box, we proceed to do the more expensive point in polygon test to check if the point is actually inside the polygon.

This is an attractive solution because in the best case scenario, it avoids doing the expensive **O(p)** computation.

However, there are many scenarios where this still leads to more computation than necessary. In fact, the example below is a scenario where both the red and blue point would be inside the bounding box and would be indistinguishable unless a further test was applied.

There is another more subtle issue with using bounding boxes. Suppose you have **m** points and **n** polygons and you want to know for each point which polygon it falls into. For each polygon, you also know its bounding box.

Without looking at each bounding box for every given point, you cannot a priori conclude if a point falls inside a bounding box or not.

This means, knowing the bounding box of a polygon doesn't help you localize the tests to only points that are nearby in some sense and avoid the **O(mn)** computations.

What if we knew that the set of all polygons were bounded by some large (but finite) bounding box?

In this case, you can further sub divide the bounding box into small enough cells and for each cell determine if the cell is either within, or overlaps or is disjoint from the polygon.

Similarly, for each point, you can determine which cell it falls inside.

Knowing which cell a point falls inside now allows us to prune all the polygons that do not contain/ or overlap that cell.

Furthermore, for polygons that contain that cell, we know for sure that the point is also within the polygon.

So the expensive point in polygon test needs to only be applied for the cells that overlap/ but are not contained in the polygon.

This in essence is the algorithm that Magellan employs.

There are many ways to construct these "cells". One approach is the so called Space Filling Curves. Magellan has an abstraction for constructing arbitrary types of space filling curves, though the implementation we support currently is the so called Morton Curve.

The last piece of the puzzle is to transparently create and use these space filling curves as indices behind the scenes.

This is one of the places where Magellan leverages Catalyst's extensibility.

When Catalyst sees a join of the form

```
point.join(polygon).where($"point" within $"polygon")
```

it does not know what points and polygons mean: in particular that these are geometries.

However, it parses this data frame query and passes it through a pipeline of optimizations as below:

During construction of this pipeline there are various stages where external optimizations can be added. For example, additional analyzer rules, or optimizer rules or even rules affecting physical planning can be inserted into the Catalyst optimizer.

The plan that Magellan receives at the end of the optimization phase looks something like this:

The plan has resolved all variables, has pushed filters down as far as possible but beyond that there is nothing more that Catalyst can do as it is not aware of the nature of the data.

Magellan intercepts this plan and rewrites it to the following:

The rewritten plan includes an operator for each side of the join that takes the shape, computes the index and generates tuples of the form `(shape, curve, relation)`

on either side of the join.

The `curve`

represents the space filling curve of a given resolution while the `relation`

represents how the curve relates to the geometry: is it within, overlapping or disjoint from the geometry.

The Join itself is rewritten to use the `curve`

as key, while the filter is augmented with a condition that first checks if the `curve`

is contained within the polygon in which case it short circuits the point in polygon test.

As this is all done under the hood, the user is able to automatically leverage new spatial join optimizations that Magellan adds without having to rewrite their code.

This blog post touched upon how Magellan scales geospatial queries and provided a peek under the hood of the Magellan engine.

You can also learn more about Magellan from the following:

- Magellan project on GitHub
- The slides for my Spark Summit Europe 2016 talk on Magellan
- My talk at Foss4g where I presented some results of benchmarking Magellan on a single node and compared other geospatial libraries on big data.
- My Strata 2018 Slides on Magellan, outlining its current performance, the architecture etc.

Magellan is a distributed execution engine for geospatial analytics on big data.

It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries.

The application developer writes standard sql or data frame queries to evaluate geometric expressions while the execution engine takes care of efficiently laying data out in memory during query processing, picking the right query plan, optimizing the query execution with cheap and efficient spatial indices while presenting a declarative abstraction to the developer.

Magellan is the first library to extend Spark SQL to provide a relational abstraction for geospatial analytics. I see it as an evolution of geospatial analytics engines into the emerging world of big data by providing abstractions that are developer friendly, can be leveraged by anyone who understands or uses Apache Spark while simultaneously showcasing an execution engine that is state of the art for geospatial analytics on big data.

In subsequent blogs, we will talk in depth about how Magellan is implemented, how it scales geospatial analytics and how it leverages Spark SQL, Tungsten and Catalyst in order to do so while providing a declarative abstraction for the application developer.

In this blog, I will show you how to use Magellan on a real world dataset so you as the end user can get a feel for working with the library.

For the rest of this article, I am going to assume you are running an Apache Spark 2.1+ cluster and Magellan version 1.0.4+

We will be working with the NYC Taxicab dataset. For the purposes of this article we will only pick the cab trips in 2015. It turns out that prior to 2015, there is a missing field (improvement surcharge) and while this makes for a good tutorial on ETL on how to deal with schema changes when loading data into Spark, it would take me too far from the objective of this article.

We will further assume that the files have been loaded as is into a S3 bucket called "nyc.taxicab.rides".

For the purpose of this analysis, I'll spin up a 5 node r3.xlarge cluster in Databricks, with one node the driver and the other four nodes being the workers. That gives us a total of 20 cores for processing.

The analysis will be performed in Scala using a Databricks notebook.

Our goal is to analyze the trip data and figure out which neighborhoods in New York City have the most frequent pickups, what times of the day are popular times for cab pickups from say La Guardia, and answer other questions that have to do with the distribution of cab rides by neighborhood in some part.

For this purpose, we will first need a dataset that represents New York City neighborhoods at the level of detail needed for this study.

We'll assume this dataset has also been loaded into S3 and is available in a bucket called "nyc.neighborhoods".Instead of passing credentials around while loading data from S3, we will use the concept of mount points that Databricks exposes.

We will create two mount points, one for the trip dataset and one for the neighborhood dataset as below:

```
dbutils.fs.mkdirs("/mnt/nyctaxicabanalysis")
dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@nyc.neighborhoods","/mnt/nyctaxicabanalysis/neighborhoods/")
dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@nyc.taxicab.trips","/mnt/nyctaxicabanalysis/trips/")
```

Once this is done, we can access the datasets without passing in credentials using the mount point as a symlink.

Apache Spark starting 2.0 has a CSV Reader that we can use to read the NYC taxicab trip dataset into a DataFrame. Prior to 2.0, you could also use the excellent spark-csv package.

The trip dataset has the following schema:

```
val schema = StructType(Array(
StructField("vendorId", StringType, false),
StructField("pickup_datetime", StringType, false),
StructField("dropoff_datetime", StringType, false),
StructField("passenger_count", IntegerType, false),
StructField("trip_distance", DoubleType, false),
StructField("pickup_longitude", DoubleType, false),
StructField("pickup_latitude", DoubleType, false),
StructField("rateCodeId", StringType, false),
StructField("store_fwd", StringType, false),
StructField("dropoff_longitude", DoubleType, false),
StructField("dropoff_latitude", DoubleType, false),
StructField("payment_type", StringType, false),
StructField("fare_amount", StringType, false),
StructField("extra", StringType, false),
StructField("mta_tax", StringType, false),
StructField("tip_amount", StringType, false),
StructField("tolls_amount", StringType, false),
StructField("improvement_surcharge", StringType, false),
StructField("total_amount", DoubleType, false)))
```

The following snippet of code will load the CSV data from S3 into a DataFrame and cache it.

```
val trips = sqlContext.read
.format("com.databricks.spark.csv")
.option("comment", "V")
.option("mode", "DROPMALFORMED")
.schema(schema)
.load("/mnt/nyctaxicabanalysis/trips/*")
.withColumn("point",
point($"pickup_longitude",$"pickup_latitude"))
.cache()
```

In the process of loading this data, we have converted the pickup longitude and pickup latitude into a Magellan Point to make subsequent analysis easier.

To do this, we used the withColumn function on a Spark SQL dataset to add a derived column along with Magellan's syntactic sugar to add a column by providing a BinaryExpression of the form:

```
point(longitude-expr, latitude-expr)
```

A query we are able to answer at this point is, how big is the dataset?

```
trips.count()
```

returns **146,112,989** rows in about 3 minutes. Most of the initial time is spent reading from S3, which is the reason why we are caching the subsequent dataset in Spark.

Subsequent reads will be served from cache making it much faster.

Magellan has Data Source implementations for OSM-XML, GeoJSON as well as Shapefiles. In this case, we will use the in built GeoJSON reader to load the NYC neighborhood dataset into Magellan data structures.

As the neighborhood is a connected polygon, we expect to be able to parse this dataset into a collection of polygons. Apart from the geometric datastructure, Magellan also reads any metadata for a given row into a Map that can be queried subsequently.

The following snippet of code loads the NYC neighborhood dataset from S3:

```
val neighborhoods = sqlContext.read
.format("magellan")
.option("type", "geojson")
.load("/mnt/nyctaxicabanalysis/neighborhoods/")
.select($"polygon",
$"metadata"("neighborhood").as("neighborhood"))
.cache()
```

Here we have extracted the key corresponding to the neighborhood name while loading the dataset. The expression we used to extract the value from a map is an inbuilt function in Spark SQL and points to one of the nice properties of working with Magellan, which is we get to leverage all the functions Spark SQL provides, as well as provide end users with the same consistent syntax they are used to from Spark.

How many neighborhoods are there in NYC?

```
neighborhoods.count()
```

yields **310** rows.

We can also get a estimate of how jagged is each neighborhood by calculating the average number of edges in this collection of polygons.

It turns out that on average there are about **104** edges per neighborhood.

In order to determine which neighborhood each pickup point falls under, we need to join the trip dataset with the neighborhood dataset.

Naively, this would be a cartesian join. That is, each point in the trip dataset would have to be evaluated against each neighborhood in the neighborhood dataset to determine if the point falls within a given neighborhood.

If **m** represents the number of points, **n** the number of polygons and **p** the average # of edges per polygon, the computational complexity of this join is **O(mnp)** on a single node.

That is, we would roughly be performing roughly **4 trillion** calculations to determine where each point falls in the trip dataset.

On a single core this will take more than an **hour** even ignoring constant factors.

In subsequent blog posts we will discuss

- How the point in polygon calculation works
- How Magellan operates the cartesian join close to the hardware limits by leveraging Catalyst and whole stage code generation
- How Magellan's spatial joins use indexing to speed up the join and avoid having to issue cartesian joins in the first place.

For the purpose of this article, we gloss over details of the join operation and assume that the naive cartesian join is employed.

The two datasets can be joined as below:

```
val joined = trips.join(neighborhoods)
.where($"point" within $"polygon")
```

Here we have applied a predicate that only picks out those points which are contained within a given polygon. The join as written is cartesian as it does not have a join column.

Spark SQL's Catalyst optimizer applies a few optimizations here that we will get into in subsequent articles, but the main thing to note is that the end result of executing this join on a Apache Spark cluster is that this join is executed as a Broadcast Nested Loop Join

To compute the neighborhoods that receive the most pickups from cabs, we can issue the following query on top of the joined dataset:

```
display(joined
.groupBy('neighborhood)
.count()
.orderBy($"count".desc))
```

On the 5 node cluster without whole stage code generation this join and subsequent report takes about **6 min**.

With whole stage code generation for BNL join, the entire calculation runs in about **2 minutes or less**. This is substantially faster than the naive cartesian join, as we use simple bounding box techniques along with tight code generation to avoid materializing polygons whose bounding box does not intersect a given point.

This hopefully illustrates another nice property of Magellan: we get for free optimizations that Spark's compiler applies to stages of computation along with the JVM's JIT compiler optimizations. As a result, we are able to execute this query with performance on par with the best hand written code that is aware of the end to end pipeline.

Hopefully, this article gives you a flavor of working with Magellan to analyze geospatial data at scale.

From an end user's perspective, Magellan's declarative APIs allow you to describe the geometric computations you want to perform, while the execution engine figures out the most efficient way to perform those computations. Through Magellan's interoperability with the rest of Spark SQL's ecosystem, you are empowered to use all the operators that Spark SQL provides while analyzing geospatial datasets and rest assured that the compiled query plan will execute optimally.

Follow the Magellan project on GitHub to get the latest updates and download the code. You can also download releases from here.

]]>It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries.

In this blog, I will be talking about Magellan,

]]>It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries.

In this blog, I will be talking about Magellan, geospatial analytics in general and areas of Spark SQL that we leverage in Magellan.

These topics should be of interest to end users who wish to perform scalable geospatial analytics, to developers who wish to extend Spark SQL to provide specialized libraries for query processing as well as dilettantes who like to understand the algorithms (both age old and state of the art) used in geospatial analytics.

]]>