In the previous blog we played around actual data using Spark core API and understood basic building blocks of Spark i.e RDDs. We also executed some basic operations using transformations and actions.
In this blog we will focus on Spark SQL component which helps us to interactively query relational data with low latency. We will explore new DataFrame API, which efficiently processes tabular data and manipulate it using simple SQL queries or rich set of library of functions.
Spark SQL:
SparkSQL is a distributed and fault tolerant query engine.
It allows users to run interactive queries on structured and semi-structured data. Structured data is nothing but tabular data which you can break down in rows and columns.
SparkSQL provides programming abstraction called DataFrames and a rich library of functions to load and process data including string manipulation, date arithmetic, common math operations etc.
SparkSQL provides following capabilities:
Seamless Integration
Spark SQL allows you to write queries inside Spark programs, using either SQL or a DataFrame API.
You can apply normal spark functions (map, filter, ReduceByKey etc) to sql query results.
Supports variety of Data Formats and Sources
Data Frames and SQL provide connection to access a variety of data sources, including Hive, Avro, Parquet, Cassandra, CSV, ORC, JSON, and JDBC. You can load, query and join data across these sources.
Hive Compatibility
You don;t need to make any changes to your data in existing hive metastore to make it work with Spark neither you need to change your hive queries. Spark SQL reuses the Hive frontend and metastore, giving you full compatibility with existing Hive data, queries, SerDes and UDFs.
Standard Connectivity for JDBC or ODBC
A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools. You can use your existing BI tools like tableau.
Performance Scalability
At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features like columnar storage and code generation in a novel way to build an extensible query optimizer. It scales to thousands of nodes and multi hour queries using the Spark engine, which provides full mid-query fault tolerance.
What are DataFrames?
In Spark, a DataFrame is a distributed collection of data organized into named columns.
It is equivalent to a table in a relational database or a data frame in R/Python.
DataFrames can be constructed from a wide array of sources such as:
- structured data files,
- tables in Hive,
- external databases, or
- Existing RDDs.
DataFrame API provides operations to filter, group, or compute aggregates, and can be used with Spark SQL.
Once created, dataframes can be manipulated using the various domain-specific-language (DSL) functions.
Initializing Spark SQL
First open spark-shell,
spark/bin/spark-shell
To get started with Spark SQL we need to add a few imports to our programs,
import org.apache.spark.sql.SQLContext
We need to create a SQLContext. A SQLConext wraps the SparkContext, and adds functions for working with structured data.
Constructing a SQL context in Scala
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Processing JSON data using DataFrame
Now, let;s load our json data from employee.json file
using sqlcontext’s read.json method
scala> val employeeDF = sqlContext.read.json("../employee.json")
employeeDF: org.apache.spark.sql.DataFrame = [address: struct
string>, dept: string, empid: bigint, empname: string, salary: bigint]
Let’s see what’s our DataFrame contains
scala> employeeDF.show

If you want view schema of our employeeDF in tree format, use printSchema
scala> employeeDF.printSchema

We can see how many records are there in employeeDF using count
scala> employeeDF.count
res5: Long = 4
If we want to see employee name and his/her department, use select clause on DF
scala> employeeDF.select("empname","dept").show

Now let’s do some filtering on our data, find out high salaried employee
scala> employeeDF.filter(employeeDF("salary") > 150000).show()

Now let’s give everybody hike of 20,000 for annual appraisal cycle.
scala> employeeDF.select(employeeDF("empname"),employeeDF("salary")+20000).show

Saving results of new dataframe to file system using write.save
scala> employeeDF.select(employeeDF("empname"),employeeDF("salary")+20000 as "New_Salary").write.save("employeeswithImprovedSalaries")
Next blog we will see how to query this data using simple sql query syntax.
Conclusion:
In this blog we explored spark sql engine and its features to query tabular data efficiently. We also learnt DataFrames and various operators to manipulate JSON data in dataframes.
Next blog we will see how to convert dataframe to a temporary table and execute sql queries against it and explore spark-csv parsing library to parse csv data efficiently.
Stay tuned for more.
Comments