January 6, 2018
This is 2nd post in Apache Spark 5 part blog series. In the previous blog we looked at why we needed tool like Spark, what makes it faster cluster computing system and its core components.
In this blog we will work with actual data using Spark core API: RDDs, transformations and actions.
RDD (Resilient Distributed Dataset) is main logical data unit in Spark. An RDD is distributed collection of objects. Distributed means, each RDD is divided into multiple partitions. Each of these partitions can reside in memory or stored on disk of different machines in a cluster. RDDs are immutable (Read Only) data structure. You can’t change original RDD, but you can always transform it into different RDD with all changes you want.
Quoting from Learning Spark book, “In Spark all work is expressed as creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.”
As we already discussed in previous blog Spark allows you to do programming in Scala, Java, Python and R. For this blog we will be working with Scala API.
RDDs can be created by 2 ways:
1.Parallelizing existing collection.
2.Loading external dataset from HDFS (or any other HDFS supported file types).
Let’s see how to create RDDs both ways.
To execute any operation in spark, you have to first create object of SparkContext class.
A SparkContext class represents the connection to our existing Spark cluster and provides the entry point for interacting with Spark.
We need to create a SparkContext instance so that we can interact with Spark and distribute our jobs.
If you are executing spark commands from spark-shell like this
You can see SparkContext is already created for you by just typing sc on console.
Now let’s create an RDD by parallelizing a simple Scala collection, using SparkContext’s parallelize method
scala> val a=sc.parallelize(List('a' , 'b', 'c', 'd', 'a', 'f', 'd', 'c', 'b', 'c'), 3)
Here, 1st parameter is list of characters and 2nd is partition i.e 3. Spark will distribute data in 3 partitions. By specifying partitions you can optimize data placement.
Please note that here we have just defined RDD, data is not loaded still. This means that if you go to access the data in this RDD it could fail. The computation to create the data in an RDD is only done when the data is referenced; for example, it is created by caching or writing out the RDD.
Let’s check contents of RDD.
first will give you 1st random record in RDD.
res1: Char = a
To view all contents you can use
res2: Array[Char] = Array(a, b, c, d, a, f, d, c, b, c)
You should avoid calling collect on huge dataset, since it will bring all data to driver program i.e single machine and can result in out of memory error.
Now let’s filter out elements with value as ‘a’.
scala> a.filter(x=>x= ='a').collect
res3: Array[Char] = Array(a, a)
If you want to count occurrences of all a’s and b’s you can use map
res4: Array[(Char, Int)] = Array((f,1), (c,3), (d,2), (a,2), (b,2))
Here, map will add extra integer1 as initial count to each element and reduceByKey will sum up all counts to give final sum of occurrences for each element.
Spark provides a rich set of operators to manipulate RDDs. RDD performs 2 operations mainly, transformations and actions. Let’s understand them next.
Transformations create new RDD from existing RDD like map, reduceByKey and filter we just saw. Transformations are executed on demand. That means they are computed lazily. We will see lazy evaluations more in details in next part.
Now let’s create an RDD from text file
scala > val rawData=sc.textFile("path to/cars.txt")
We have data like below:
Data and code is available on github
First we have to cleanse data and bring in appropriate format to process it.
case class cars(make:String, model:String, mpg:Integer, cylinders :Integer, engine_disp:Integer, horsepower:Integer, weight:Integer ,accelerate:Double, year:Integer, origin:String)
Let’s use map transformation for that
scala > val carsData=rawData.map(x=>x.split("\t")).
map(x=>cars(x(0).toString, x(1).toString, x(2).toInt, x(3).toInt, x(4).toInt, x(5).toInt, x(6).toInt, x(7).toDouble x(8).toInt, x(9).toString))
Here first we have split tab separated data and mapped it to correct data types using Scala case class cars.
scala > carsData.take(2)
will give you random 2 records from data.
res0: Array[cars] = Array(cars(amc, amc ambassador dpl, 15, 8, 390, 190, 3850, 8.5, 70, American), cars(amc, amc gremlin ,21, 6, 199, 90, 2648, 15.0, 70, American))
Now, if you want to see how many American, Japanese and European origin cars are there.
scala > carsData.map(x=>(x.origin,1)).reduceByKey((x,y)=>x+y).collect
res2: Array[(String, Int)] = Array((American,47), (European,9), (Japanese,6))
You can filter out only American origin cars
americanCars: org.apache.spark.rdd.RDD[cars] = FilteredRDD at filter at 18.
filter is transformation which doesn’t return result instead it returns new RDD.
Now let’s do something more difficult. Calculate average weight of cars by their make.
scala> val makeWeightSum=americanCars.map(x=>(x.make,x.weight.toInt))
.combineByKey((x:Int) => (x, 1),
(acc:(Int, Int), x)=> (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
res3: Array[(String, (Int, Int))] = Array((chevrolet,(34609,10)), (plymouth,(23680,7)),
(buick,(11281,3)), (pontiac,(17311,4)), (hi,(4732,1)), (chevy,(4376,1)), (mercury,(2220,1)),
(amc,(29153,9)), (dodge,(12900,3)), (ford,(30333,8)))
Now let’s take average
res4: Array[(String, Int)] = Array((chevrolet,3460), (plymouth,3382), (buick,3760), (pontiac,4327), (hi,4732), (chevy,4376), (mercury,2220), (amc,3239), (dodge,4300), (ford,3791))
RDDs maintain a graph of 1 RDD getting transformed into another called lineage graph, which helps Spark to recompute any intermediate RDD in case of failures. This way spark achieves fault tolerance. For above transformations Spark’s lineage graph will be:
Actions return final results of RDD computations. Actions triggers execution using lineage graph to load the data into original RDD, carry out all intermediate transformations and return final results to Driver program or write it out to file system.
First, take, reduce, collect, count are some of the actions in spark.
If you want to see how many american origins cars are there in our data.
res5: Long = 47
If you want to do multiple computations on same data you can explicitly store it in memory.
carsData.cache() or carsData.persist()
Using these you can load raw/intermediate data in memory (or persist it to disk as well in case data is large.)
You can save output to fileSystem using
As we previously saw even if we define RDD, data does not get loaded immediately, neither any of the transformation actually get instantly computed, till you call an action like collect or count or save output to file system.
So the data is not loaded until it is necessary saving memory blocking in advance. Spark uses lazy evaluation to reduce the number of passes it has to take over data by chaining operations together.
In this blog we saw how to play around actual data using transformations and actions in spark core API. And looked upon spark important concepts like lineage graph and lazy evaluation
Next blog we will explore SparkSQL component to process structured/unstructured data more efficiently using simple sql queries.