Spark HDFS Integration

Spark is rapidly getting popular among the people working with large amounts of data. And it is not a big surprise as it offers up to 100x faster data processing compared to Hadoop MapReduce, works in memory, offers interactive shell and is quite simple to use in general. But in my opinion the main advantage of Spark is its great integration with Hadoop – you don’t need to invent the bycicle to make the use of Spark if you already have a Hadoop cluster. With Spark you can read data from HDFS and submit jobs under YARN resource manager so that they would share resources with MapReduce jobs running in parallel (which might as well be Hive queries or Pig scrips, for instance). All of these makes Spark a great tool that should be considered by any company having some big data strategy.
spark-logoIt is a known fact that Spark is still in early days, even though its getting popular. And mainly it means the lack of well-formed user guide and examples. Of course, there are some on official website, but they don’t cover well the integration with HDFS. I will try to fill this gap by providing examples of interacting with HDFS data using Spark Python interface also known as PySpark. I’m currently using Spark 1.2.0 (the latest one available) on top of Hadoop 2.2.0 and Hive 0.12.0 (which comes with PivotalHD distribution 2.1, also the latest).

So, lets start the pyspark, that would welcome us with nice ASCII art:

First, let’s create a function that would pring for us different statistics regarding RDD: its ID, number of entries it contain, number of partitions, storage level and some sample data:

Let’s start with the simplest case: reading a text file from HDFS. But first we need to put something there, and I’ve choosen the “README.md” file shipped with Spark. In order to put it to HDFS, execute these commands:

Now read this file directly from HDFS:

Ok with a simple case. But what if we have a TSV file there? My TSV file looks like this:

Now we read it:

But what if the separator is comma? It is a bit more complicated. As a csv file I used the same TSV replacing tabs with commas:

Ok, what if file is compressed? Compression is handled automatically by Hadoop classes!

So, in general, now we see that key-value CSV files can be easily handled by Spark in both compressed and uncompressed form. But CSV is not the best option to store your files on HDFS. Let’s make some experiments with sequence files and map files. To create some sequence files for this test I’ve written a simple Java MapReduce application that just reads input README.md file and writes its contents to the sequence file using default identity mapper and reducer:

Now lets read these files:

As you can see, we can read them in any combination. But what about writing out the sequence files? PySpark has built-in method for writing out the sequence file, but it accepts only the codec, not the compression type. Lets use more general one that allows us to specify Hadoop OutputFormat and Hadoop configuration:

Here is the listing of files created by these calls:

And finally, most complicated example. Lets read out the Parquet file. To do so, we first need to create parquet file itself, and Hive will help us with this. My example covers working with Hive 0.12, so I have to download and manually add a parquet jar file to my cluster, here’s the guide: https://cwiki.apache.org/confluence/display/Hive/Parquet
Now that we’ve made it, let’s create parquet file:

Ok, with this executed we would have /sparkdemo/hdfs/hive_parquet_table catalog created on HDFS with parquet file we wanted. Now let’s read it with Spark:

And in the end, here’s a sample query on top of this parquet file:

In this article I have covered basics of Spark integration with HDFS. Hope that in future it would be better covered in official documentation, and as of now I think my article would be helpful for many Spark users

Leave a Reply