course bg
EduPristine>Blog>Processing JSON data using Spark SQL Engine: DataFrame API

Processing JSON data using Spark SQL Engine: DataFrame API

October 21, 2015

In the previous blog we played around actual data using Spark core API and understood basic building blocks of Spark i.e RDDs. We also executed some basic operations using transformations and actions.

In this blog we will focus on Spark SQL component which helps us to interactively query relational data with low latency. We will explore new DataFrame API, which efficiently processes tabular data and manipulate it using simple SQL queries or rich set of library of functions.

IMG0

SparkSQL:

SparkSQL is a distributed and fault tolerant query engine.

It allows users to run interactive queries on structured and semi-structured data. Structured data is nothing but tabular data which you can break down in rows and columns.

SparkSQL provides programming abstraction called DataFrames and a rich library of functions to load and process data including string manipulation, date arithmetic, common math operations etc.

SparkSQL provides following capabilities:

Seamless Integration

Spark SQL allows you to write queries inside Spark programs, using either SQL or a DataFrame API.
You can apply normal spark functions (map, filter, ReduceByKey etc) to sql query results.

Supports variety of Data Formats and Sources

Data Frames and SQL provide connection to access a variety of data sources, including Hive, Avro, Parquet, Cassandra, CSV, ORC, JSON, and JDBC. You can load, query and join data across these sources.

Hive Compatibility

You don;t need to make any changes to your data in existing hive metastore to make it work with Spark neither you need to change your hive queries. Spark SQL reuses the Hive frontend and metastore, giving you full compatibility with existing Hive data, queries, SerDes and UDFs.

Standard Connectivity for JDBC or ODBC

A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools. You can use your existing BI tools like tableau.

Performance Scalability

At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features like columnar storage and code generation in a novel way to build an extensible query optimizer. It scales to thousands of nodes and multi hour queries using the Spark engine, which provides full mid-query fault tolerance.

What are DataFrames?

In Spark, a DataFrame is a distributed collection of data organized into named columns.

It is equivalent to a table in a relational database or a data frame in R/Python.

DataFrames can be constructed from a wide array of sources such as:

  • structured data files,
  • tables in Hive,
  • external databases, or
  • Existing RDDs.

DataFrame API provides operations to filter, group, or compute aggregates, and can be used with Spark SQL.

Once created, dataframes can be manipulated using the various domain-specific-language (DSL) functions.

Initializing Spark SQL

First open spark-shell,

spark/bin/spark-shell

To get started with Spark SQL we need to add a few imports to our programs,

import org.apache.spark.sql.SQLContext

We need to create a SQLContext. A SQLConext wraps the SparkContext, and adds functions for working with structured data.

Constructing a SQL context in Scala

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Processing JSON data using DataFrame

Now, let;s load our json data from employee.json file

IMG1

using sqlcontext’s read.json method

scala> val employeeDF = sqlContext.read.json("../employee.json")
employeeDF: org.apache.spark.sql.DataFrame = [address: struct
string>, dept: string, empid: bigint, empname: string, salary: bigint]

Let’s see what’s our DataFrame contains

scala> employeeDF.show

table 1

If you want view schema of our employeeDF in tree format, use printSchema

scala> employeeDF.printSchema

table 2

We can see how many records are there in employeeDF using count

scala> employeeDF.count
res5: Long = 4

If we want to see employee name and his/her department, use select clause on DF

scala> employeeDF.select("empname","dept").show

table 3

Now let’s do some filtering on our data, find out high salaried employee

scala> employeeDF.filter(employeeDF("salary") > 150000).show()

table 4

Now let’s give everybody hike of 20,000 for annual appraisal cycle.

scala> employeeDF.select(employeeDF("empname"),employeeDF("salary")+20000).show

table 5

Saving results of new dataframe to file system using write.save

scala> employeeDF.select(employeeDF("empname"),employeeDF("salary")+20000 as "New_Salary").write.save("employeeswithImprovedSalaries")

Next blog we will see how to query this data using simple sql query syntax.

Conclusion:

In this blog we explored spark sql engine and its features to query tabular data efficiently. We also learnt DataFrames and various operators to manipulate JSON data in dataframes.

Next blog we will see how to convert dataframe to a temporary table and execute sql queries against it and explore spark-csv parsing library to parse csv data efficiently.

Stay tuned for more.

About Author

avatar Poonam Ligade

Poonam Ligade currently works as Data Engineer at Serendio, California based Big Data Analytics Company. She has around 4 years of experience in handling Big data Applications. She has worked on Apache Hadoop stack for 3 years. She is currently dealing with Apache Spark Stack and Apache Cassandra from early 2014. Her interest lies in Distributed computing, NOSQL databases and Machine learning. You can connect with her on linkedin.

Comments

Interested in this topic?

Our counsellors will get in touch with you with more information about this topic.

* Mandatory Field

`````````````````````````````````````````````````` Post ID = 83426