Spark DataFrames are faster, aren’t they?

Recently Databricks announced availability of DataFrames in Spark , which gives you a great opportunity to write even simpler code that would execute faster, especially if you are heavy Python/R user. In this article I would go a bit deeper than the publicly available benchmark results to show you how it really works.
stupid benchmarking

First of all I would like to say that any general benchmarking is an evil. First of all, in benchmark you always measure something abstract – smiple cases of user queries, generalized workload, average concurrency, etc. Making too many approximations make your benchmark mostly unusable by other people. I always like to watch benchmarking of HDDs. For each new model their vendors claim that this new HDD is faster and more reliable. But in fact even the simple “sequential scan” test run by vendor and in the field shows at least 2x difference. When you start to wonder why this happens, you come to understanding that vendor used some low-level code working directly with HDD, and of course this code serves a single purpose, “benchmark”. When you use it in the field, you have OS, filesystem abstraction on top of the disk, IO scheduler and many other things, and it turns out that the fancy vendor benchmark is not that good.

Benchmarks are Evil

Independent benchmarks are generally the same – if you are benchmarking something (especially something expensive), you most likely get money for this benchmarking from some of the participants and you play on their side. Like, selecting the best result across 5 runs for vendor1 and worst for vendor2. It is fair as this is really the result shown by each of the products you’re testing, but not really correct from the statistical perspective.

So recently it happened that I’ve faced the following diagram a number of times in different presentations:

Spark_Dataframe_Official_Benchmark

Nice and fancy, but most of the places I’ve seen were missing the information about what was really run here and what optimizations allowed to achieve these results for Spark. The most comprehensive article on this is on this topic is available here, which also has a link to Github repository of Reynold Xin with the benchmark code itself, but unfortunately with not much details about the environment and different parameters. In the comments guys are even claiming that they cannot start the code.

So ok, why the DataFrame is faster than RDD and is it really so?

First of all I repeated the benchmark on my system for different Spark versions: 1.3.1, 1.4.0 and 1.4.1, which is the latest one at the moment of this writing.

Second, I’ve analyzed the RDD code of this benchmark and find it suboptimal in a number of ways:

  1. It bloats the data before aggregating. I.e. pair (x,y) is transformed to (x,(y,1)) before aggregation to make it possible to apply standard aggregation. But this means much more network traffic when you shuffling the data
  2. Next, it stores the data in Parquet file and reads it into DataFrame (or SchemaRDD if you like the old name). Why not store it in SequenceFile directly for the benchmarking of RDD code? In this code you construct Row objects and then extract data from them for analysis, which is not optimal

Ok, so the idea is to make a bigger number of tests:

  1. DataFrame code
  2. Benchmark-provided aggregation code on top of RDD
  3. More optimal aggregation code with mapPartitions function on top of Parquet file
  4. More optimal aggregation code with mapPartitions function on top of SequenceFile

And of course run each of these tests on Spark versions 1.3.1, 1.4.0 and 1.4.1.

Here are the commands I used to start Spark in my environment:

As you can see, for each case Spark had 4 executors, each is capable of running 1 task execution thread and having 8GB of RAM.

First lets generate the data, and let’s make it in Python just like it is done in the original benchmark:

Next, here is the code to run Python test:

Now the Scala code (don’t pretend this to be ideal):

Here is the summary table with the benchmark results:

Spark DataFrame Benchmark Results Table

And the diagram:

DataFrame Benchmark

From this you can see some interesting points:

  1. DataFrames are really faster than any native Python code
  2. DataFrames are on par with the correct implementation of aggregation in Scala over SequenceFile
  3. Reading Parquet format in Scala has better performance starting from Spark 1.4.0
  4. For Python, reading fromSequenceFile works faster than reading from Parquet file
  5. Real improvement for DataFrame over optimal Python code is 2x and is caused mostly by native implementation of aggregation compared to external implementation in Python which forces data transfer between JVM and Python

In general my position regarding DataFrames is following:

  1. This is a big step forward regarding performance as it allows all the languages utilize the same execution engine
  2. This is a big step forward in terms of platform architecture. It opens the gates for Tungsten and greater extensibility:Tungsten Design
  3. It is a good abstraction for data scientists and analysts that simplifies analytical modeling
  4. Native RDD code would still be faster as long as you understand how Spark works and would be able to produce optimal code

Project Tungsten is an interesting thing discussed on the last Spark Summit in San Francisco. Here’s the link to the related presentation. I cannot avoid mentioning that Spark uses JVM, but the longer they fight for better performance, the closer they get to C. Take a look at the new data structure for rows that uses the sun.misc.Unsafe magic:

Tungsten_Row_Structure

For me it clearly resembles the row structure of relational databases, for instance PostgreSQL:

Postgresql_tuple_structure

And what about memory allocation? It is fighting garbage collection by using offheap memory and unsafe memory allocations, trying to catch native C performance. For instance, here is the memory structure for Postgres:

Postgresql_shared_memory

Last two PostgresSQL pictures were taken from Bruce Momjian’s presentation.

So here we should stop and think:

  • Spark is written in Scala, still they make many optimizations typical for C with full control over memory allocation. Did they chose the right technology from the beginning?
  • Spark is applying many optimizations that in the past were typical for database management systems. But why shouldn’t we use DBMS instead of Spark?

Just a little tease on the last point. Pivotal HAWQ is based on PostgreSQL and is written in C, it is SQL-on-Hadoop solution that provides you SQL interface to the data stored in HDFS. Let’s see how it would solve the same problem raised in this benchmark.

First, create the table, that would hold the data:

Next, HAWQ natively don’t have randint function, only random, so we can just use the one from Python with plpythonu function:

Generate the same 10 million rows:

And now the benchmark:

There is no trick, this is really the first run. Of course, immediately after the insert the data would still be cached by OS, but it is exactly the same for Spark. For Spark I didn’t include results of the first run into the benchmark, as the first run was always worse than the second.

To be fair, here is the configuration of HAWQ I used:

It uses 4 nodes, each of them runs 10 threads of HAWQ (11 numbers here stand for “master” daemons that work like Spark Driver). Let’s restart Spark with 40 executor threads:

As you can see, we’re not going beyond 2 seconds and the first run is 7 seconds.

So finally, Spark is little by little trying to catch up with MPP solutions in terms of performance, but unfortunately it is still far away. The only option for it to be on par with C-based solutions is to use the programming language that can work with memory directly and get away from JVM, so in the nearest future we might see some C extensions of Spark to improve its performance.

8 thoughts on “Spark DataFrames are faster, aren’t they?

  1. rungtaprateek

    Thank you for a really interesting read.

    I agree with your conclusion, but I will point out, abstractions matter. Spark’s widespread adoption, and general mass hysteria has a lot to do with it’s APIs being easy to use. So if they can change the the engine of a moving car without disrupting it, all the more power to them.

    Reply
    1. 0x0FFF Post author

      I agree. The problem here is that all the good performance optimization stuff in Spark comes to DataFrame API and SparkSQL, which are essentially the same thing. But in fact, there are lots of options on the market that can do SQL queries over terabytes of data: first its all the MPP databases (Teradata, Vertica, Greenplum, Netezza, etc.), next it is modern SQL-on-Hadoop solutions (Impala, HAWQ). And all of them would be faster than SparkSQL and Spark DF API even with the optimizations they plan.

      Spark has got this kind of traction in community because of its simple functional API, pretty concept of RDDs and Scala/Python REPLs. The great advantage of it is that you can do many things with Spark that you cannot do with plain SQL, you’ve got a full control of execution. With improving SparkSQL/DF they are surely targeting enterprise customers, as I guess they must have started to think about profitability. But it’s not the direction that would bring them success in my opinion.

      Reply
  2. zhao xu qin

    I cannot agree with you more on “Spark is little by little trying to catch up with MPP solutions in terms of performance”.
    So far, spark optimizer is very young, it does very little optimization, not like traditional MPP which has cost model and more candidate execution plans.
    In traditional MPP, the cost model relies on statistics a lot, when in comes to spark case, I’m thinking spark is not a “database”, which means it does not have data preload process, does not have statistics collection, if spark needs any statistics, it does an extra runtime data scan, this is a big overhead.

    Any thoughts on spark cost model? Thanks a lot.

    Reply
    1. 0x0FFF Post author

      Here’s a bit of information on how query optimization in SparkSQL really works, with some links to the code. At the moment Spark uses statistics information only for join ordering, and the only statistics used is table size in bytes, which is not very descriptive.

      And I agree that all the MPP solutions are currently ahead of Spark both in terms of performance and adoption, but I think that Spark is moving very, very fast. It will not catch up with the solutions like Teradata in terms of performance and completeness of SQL support, but this in not the goal. Spark is much easier to implement and integrate into current infrastructure, this is what makes it popular. And of course its ability to go deeper than just SQL and manually define transformations you want to apply, their order and logic.

      Reply
        1. 0x0FFF Post author

          This might be very expensive Teradata, but as well it might be Greenplum that would be open sourced in a couple of months, and thus free to use just like Spark. Or Vertica on AWS, which is free to use with 3 nodes and up to 1TB of storage

          Reply
    1. 0x0FFF Post author

      It depends on what you really want to test. For one it is ML performance, for another it is SQL query performance, for third it is streaming performance, etc.
      In this article I have shown that there is no magic behind DataFrames, and that vendor benchmarks are not the best thing to look at as they are always biased

      Reply

Leave a Reply