TechAE Blogs - Explore now for new leading-edge technologies

TechAE Blogs - a global platform designed to promote the latest technologies like artificial intelligence, big data analytics, and blockchain.

Full width home advertisement

Post Page Advertisement [Top]

Getting Smart With: Spark SQL

Getting Smart With: Spark SQL

Overview:

One of the first in learning Spark is to understand how to interact with Spark SQL, which is followed by DataFrame and Dataset concepts. A Spark module for processing structured data is Spark SQL. More details on the structure of the data and the computation being run are sent to Spark as a result. Internally, Spark SQL does further improvements using this additional data. The Dataset API and SQL are two of the many ways to communicate with Spark SQL.

For learning the basics of Apache Spark, you can learn here.

Table of Contents:

  • Spark Session
  • Creating DataFrames
  • Dataset Operations
  • Running SQL Queries Programmatically
  • Global Temporary View
  • Creating Datasets
  • Interoperating with RDDs

Prerequisites:

Spark Session:

The SparkSession class is the starting point for all of Spark's capabilities. Simply use SparkSession.builder() to construct a simple SparkSession:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL app").config("spark.some.config.option", "some-value").getOrCreate()
Spark Session

Creating DataFrames:

A DataFrame is a Dataset with named columns. Similar in idea to a table in a relational database or a data frame in R or Python, but with more effective optimizations. A variety of sources, including structured data files, Hive tables, external databases, and pre-existing RDDs, may be used to create DataFrames.

val df = spark.read.json("examples/src/main/resources/people.json")
// df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// Displays the content of the DataFrame
df.show()

Creating DataFrames

DataFrame Operations:

Following is the example of print schema code, but we will discuss more operations in a later article.

import spark.implicits._

// Print the schema in a tree format
df.printSchema()

Print Schema

Check out the Best Practices of Apache Spark

Running SQL Queries Programmatically:

The following example shows to write SQL queries programmatically which in return, gives the result in DataFrame.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View:

SparkSQL provides temporary session-based views which will disappear upon session termination. These views are stored in the system-preserved database, global_temp.

df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

spark.newSession().sql("SELECT * FROM global_temp.people").show()

Global Temporary View

Creating Datasets:

Similar to RDDs, datasets serialize the objects for processing or network transmission using Encoders rather than Java Serialization or Kryo. Encoders are dynamically created pieces of code that employ a format that enables Spark to carry out several tasks including filtering, sorting, and hashing without needing to convert the bytes back into objects.

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Hamiz", 32)).toDS()
caseClassDS.show()

val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

Creating Datasets

Interoperating with RDDs:

Spark SQL supports two different methods for converting existing RDDs into Datasets.

1st Method: Inferring the Schema Using Reflection

By using this function, an RDD containing case classes is automatically transformed into a DataFrame. The table's schema is determined by the case class. The arguments given to the case class are read using reflection and become the names of the columns. Case classes can also contain complex types like Seqs or Arrays or be nested. This reflection-based method results in more compact code and is effective when creating Spark applications when you are aware of the structure.

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val AdultsDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 21 AND 35")

// The columns of a row in the result can be accessed by field index
AdultsDF.map(adult => "Name: " + adult(0)).show()
+-------------+
|        value|
+-------------+
|Name: Michael|
|   Name: Andy|
+-------------+

// or by field name
AdultsDF.map(adult => "Name: " + adult.getAs[String]("name")).show()
+-------------+
|        value|
+-------------+
|Name: Michael|
|   Name: Andy|
+-------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
AdultsDF.map(adult => adult.getValuesMap[Any](List("name", "age"))).collect()
// Array[Map[String,Any]] = Array(Map(name -> Michael, age -> 29), Map(name -> Andy, age -> 30))

2nd Method: Programmatically Specify the Schema

When case classes cannot be defined ahead of time, a DataFrame can be created through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. The following three steps are worked out:

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via the createDataFrame method provided by SparkSession.

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

Thanks for reading!

To conclude, We have looked into the detail of Spark SQL and learned how to create DataFrames and Datasets. In the next blog, we will create some hands-on projects related to this article. Make sure to subscribe to my blog to get upcoming updates.

If any query occurs feel free to ask in the comment section.

See you next time,

@TechAE

Buy Me A Coffee

No comments:

Post a Comment

Thank you for submitting your comment! We appreciate your feedback and will review it as soon as possible. Please note that all comments are moderated and may take some time to appear on the site. We ask that you please keep your comments respectful and refrain from using offensive language or making personal attacks. Thank you for contributing to the conversation!

Bottom Ad [Post Page]