What is Magellan?
Magellan is a distributed execution engine for geospatial analytics on big data.
It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries.
The application developer writes standard sql or data frame queries to evaluate geometric expressions while the execution engine takes care of efficiently laying data out in memory during query processing, picking the right query plan, optimizing the query execution with cheap and efficient spatial indices while presenting a declarative abstraction to the developer.
Magellan is the first library to extend Spark SQL to provide a relational abstraction for geospatial analytics. I see it as an evolution of geospatial analytics engines into the emerging world of big data by providing abstractions that are developer friendly, can be leveraged by anyone who understands or uses Apache Spark while simultaneously showcasing an execution engine that is state of the art for geospatial analytics on big data.
In subsequent blogs, we will talk in depth about how Magellan is implemented, how it scales geospatial analytics and how it leverages Spark SQL, Tungsten and Catalyst in order to do so while providing a declarative abstraction for the application developer.
In this blog, I will show you how to use Magellan on a real world dataset so you as the end user can get a feel for working with the library.
For the rest of this article, I am going to assume you are running an Apache Spark 2.1+ cluster and Magellan version 1.0.4+
We will be working with the NYC Taxicab dataset. For the purposes of this article we will only pick the cab trips in 2015. It turns out that prior to 2015, there is a missing field (improvement surcharge) and while this makes for a good tutorial on ETL on how to deal with schema changes when loading data into Spark, it would take me too far from the objective of this article.
We will further assume that the files have been loaded as is into a S3 bucket called "nyc.taxicab.rides".
For the purpose of this analysis, I'll spin up a 5 node r3.xlarge cluster in Databricks, with one node the driver and the other four nodes being the workers. That gives us a total of 20 cores for processing.
The analysis will be performed in Scala using a Databricks notebook.
Our goal is to analyze the trip data and figure out which neighborhoods in New York City have the most frequent pickups, what times of the day are popular times for cab pickups from say La Guardia, and answer other questions that have to do with the distribution of cab rides by neighborhood in some part.
For this purpose, we will first need a dataset that represents New York City neighborhoods at the level of detail needed for this study.We'll assume this dataset has also been loaded into S3 and is available in a bucket called "nyc.neighborhoods".
Instead of passing credentials around while loading data from S3, we will use the concept of mount points that Databricks exposes.
We will create two mount points, one for the trip dataset and one for the neighborhood dataset as below:
dbutils.fs.mkdirs("/mnt/nyctaxicabanalysis") dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@nyc.neighborhoods","/mnt/nyctaxicabanalysis/neighborhoods/") dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@nyc.taxicab.trips","/mnt/nyctaxicabanalysis/trips/")
Once this is done, we can access the datasets without passing in credentials using the mount point as a symlink.
Loading the trip data from S3
Apache Spark starting 2.0 has a CSV Reader that we can use to read the NYC taxicab trip dataset into a DataFrame. Prior to 2.0, you could also use the excellent spark-csv package.
The trip dataset has the following schema:
val schema = StructType(Array( StructField("vendorId", StringType, false), StructField("pickup_datetime", StringType, false), StructField("dropoff_datetime", StringType, false), StructField("passenger_count", IntegerType, false), StructField("trip_distance", DoubleType, false), StructField("pickup_longitude", DoubleType, false), StructField("pickup_latitude", DoubleType, false), StructField("rateCodeId", StringType, false), StructField("store_fwd", StringType, false), StructField("dropoff_longitude", DoubleType, false), StructField("dropoff_latitude", DoubleType, false), StructField("payment_type", StringType, false), StructField("fare_amount", StringType, false), StructField("extra", StringType, false), StructField("mta_tax", StringType, false), StructField("tip_amount", StringType, false), StructField("tolls_amount", StringType, false), StructField("improvement_surcharge", StringType, false), StructField("total_amount", DoubleType, false)))
The following snippet of code will load the CSV data from S3 into a DataFrame and cache it.
val trips = sqlContext.read .format("com.databricks.spark.csv") .option("comment", "V") .option("mode", "DROPMALFORMED") .schema(schema) .load("/mnt/nyctaxicabanalysis/trips/*") .withColumn("point", point($"pickup_longitude",$"pickup_latitude")) .cache()
In the process of loading this data, we have converted the pickup longitude and pickup latitude into a Magellan Point to make subsequent analysis easier.
To do this, we used the withColumn function on a Spark SQL dataset to add a derived column along with Magellan's syntactic sugar to add a column by providing a BinaryExpression of the form:
A query we are able to answer at this point is, how big is the dataset?
returns 146,112,989 rows in about 3 minutes. Most of the initial time is spent reading from S3, which is the reason why we are caching the subsequent dataset in Spark.
Subsequent reads will be served from cache making it much faster.
Loading the neighborhood data from S3
Magellan has Data Source implementations for OSM-XML, GeoJSON as well as Shapefiles. In this case, we will use the in built GeoJSON reader to load the NYC neighborhood dataset into Magellan data structures.
As the neighborhood is a connected polygon, we expect to be able to parse this dataset into a collection of polygons. Apart from the geometric datastructure, Magellan also reads any metadata for a given row into a Map that can be queried subsequently.
The following snippet of code loads the NYC neighborhood dataset from S3:
val neighborhoods = sqlContext.read .format("magellan") .option("type", "geojson") .load("/mnt/nyctaxicabanalysis/neighborhoods/") .select($"polygon", $"metadata"("neighborhood").as("neighborhood")) .cache()
Here we have extracted the key corresponding to the neighborhood name while loading the dataset. The expression we used to extract the value from a map is an inbuilt function in Spark SQL and points to one of the nice properties of working with Magellan, which is we get to leverage all the functions Spark SQL provides, as well as provide end users with the same consistent syntax they are used to from Spark.
How many neighborhoods are there in NYC?
yields 310 rows.
We can also get a estimate of how jagged is each neighborhood by calculating the average number of edges in this collection of polygons.
It turns out that on average there are about 104 edges per neighborhood.
In order to determine which neighborhood each pickup point falls under, we need to join the trip dataset with the neighborhood dataset.
Naively, this would be a cartesian join. That is, each point in the trip dataset would have to be evaluated against each neighborhood in the neighborhood dataset to determine if the point falls within a given neighborhood.
If m represents the number of points, n the number of polygons and p the average # of edges per polygon, the computational complexity of this join is O(mnp) on a single node.
That is, we would roughly be performing roughly 4 trillion calculations to determine where each point falls in the trip dataset.
On a single core this will take more than an hour even ignoring constant factors.
In subsequent blog posts we will discuss
- How the point in polygon calculation works
- How Magellan operates the cartesian join close to the hardware limits by leveraging Catalyst and whole stage code generation
- How Magellan's spatial joins use indexing to speed up the join and avoid having to issue cartesian joins in the first place.
For the purpose of this article, we gloss over details of the join operation and assume that the naive cartesian join is employed.
The two datasets can be joined as below:
val joined = trips.join(neighborhoods) .where($"point" within $"polygon")
Here we have applied a predicate that only picks out those points which are contained within a given polygon. The join as written is cartesian as it does not have a join column.
Spark SQL's Catalyst optimizer applies a few optimizations here that we will get into in subsequent articles, but the main thing to note is that the end result of executing this join on a Apache Spark cluster is that this join is executed as a Broadcast Nested Loop Join
To compute the neighborhoods that receive the most pickups from cabs, we can issue the following query on top of the joined dataset:
display(joined .groupBy('neighborhood) .count() .orderBy($"count".desc))
On the 5 node cluster without whole stage code generation this join and subsequent report takes about 6 min.
With whole stage code generation for BNL join, the entire calculation runs in about 2 minutes or less. This is substantially faster than the naive cartesian join, as we use simple bounding box techniques along with tight code generation to avoid materializing polygons whose bounding box does not intersect a given point.
This hopefully illustrates another nice property of Magellan: we get for free optimizations that Spark's compiler applies to stages of computation along with the JVM's JIT compiler optimizations. As a result, we are able to execute this query with performance on par with the best hand written code that is aware of the end to end pipeline.
Summary and next steps
Hopefully, this article gives you a flavor of working with Magellan to analyze geospatial data at scale.
From an end user's perspective, Magellan's declarative APIs allow you to describe the geometric computations you want to perform, while the execution engine figures out the most efficient way to perform those computations. Through Magellan's interoperability with the rest of Spark SQL's ecosystem, you are empowered to use all the operators that Spark SQL provides while analyzing geospatial datasets and rest assured that the compiled query plan will execute optimally.