class: center, middle, inverse
# Introduction to
SparkSQL and DataFrames Gabor Feher, Lynx Analytics .fadeout[Press **P** to read the speaker notes.] --- # Lynx Analytics We are a big data analytics company focused on big graphs. That's how we ended up using Spark. ??? Our graph analysis product LynxKite is built on the top of Spark, because we are processing a lot of data. First I will talk about Spark SQL in general and then I will show some examples with real data. --- # Current state of the world in Spark - RDD is a distributed collection of arbitrary Java objects. - DataFrame is a distributed collection with a schema, like a relational table with columns. You can also use SQL with it. ??? A little bit of history: Spark has originally offered the RDD API. RDD means Resilient Distributed Dataset. It provides a way for the programmer manipulate a lot of data distributed across a cluster of machines, just by holding on to RDD objects in their program. An RDD looks a bit like an array in single-threaded programming, except that the real data it represents stays on possibly many other machines. In Spark 1.3 came DataFrames, where the idea is the same, but while RDDs are collections of arbitrary objects, DataFrames are more like tables in a relational database: they consist of rows and have columns. You can also use SQL to work with them. This will be the main focus of my talk. --- # Why Spark SQL? - Concepts that users know very well - SQL queries and relational database tables - Data frames like in R or Pandas - DataFrames have a schema, which allows for performance optimization - More optimized planning (Catalyst) - More efficient processing (Tungsten) ??? So what's the point of all this SQL stuff? There are many users who know SQL and relational databases very well. Also, popular packages like R and the Pandas Python library are built around the concept of data frames. So one thing is usability, but there is also another benefit. DataFrames have a pre-defined schema. The user needs to tell Spark what are the columns and what are their types before they can start working with them. In case of RDDs, Spark doesn't see inside the objects stored in RDDs. In case of DataFrames, having the schema allows Spark to store these objects in much less memory and process more efficiently than arbitrary Java objects. This is called Project Tungsten. --- # DataFrame vs RDD - DataFrame: less flexible but more optimized - RDD: more flexible, the user has to optimize manually ??? To sum up: DataFrames are more optimized, and RDDs are more flexible. For example, when you aggregate, there is an operator to sum up values, but there is no operator to take product of them. Another difference: with RDDs you can implement any code to operate on the data. In case of data frames, you can only pick from a pre-defined set of operations. These operations are quite well optimized, and if you combine more of them, Spark may be able do further optimizations across functions. The engine that does this is called the Catalyst Optimizer. --- # Let's have some action! - Data about airports and flights between them: - http://openflights.org/data.html - One file with airports: airports.dat - One file with routes: routes.dat - These are in fact CSV files. - I have added header lines manually. - We will use Spark 1.6.1 on my laptop. - http://spark.apache.org/downloads.html - Download a Pre-built version and not source code! ??? Quick look into the files and the download page, then moving on! --- # First steps with the Spark Shell - We will use the Scala interface ``` ./spark-1.6.1-bin-hadoop2.6/bin/spark-shell \ --packages com.databricks:spark-csv_2.10:1.4.0 ``` ```scala scala> scala> val x = 1 + 1 x: int = 2 scala> val y = x + 1 y: int = 3 scala> println("hello, world") hello, world scala> ``` ??? Spark is written in Scala and LynxKite is also written in Scala. So I will show using DataFrames with Scala today. You could also use them from Java, Python or even R. The easiest way to start playing is to use the Spark Shell, which is packaged with Spark. It will execute the lines you enter one by one, as you press enter. On this slide, I am showing that you can execute basic Scala code within it. There is one trick here. I know we will be reading CSV files later today, so I am starting with a command-line flag so that the CSV-reading plugin of Spark is activated. Other files formats like JSON or Parquet don't need plugins. Bad luck today. --- # Load the data Read to CSV files with: ```scala val routes = sqlContext.read. format("com.databricks.spark.csv"). option("header", "true"). option("inferSchema", "true"). load("/path/to/routes.dat") val airports = sqlContext.read. format("com.databricks.spark.csv"). option("header", "true"). option("inferSchema", "true"). load("/path/to/airports.dat") ``` ??? I have cheated a little and added a header a first row to each of the files, listing all the column names. It's shorter code to load like that. When reading a file, we need to specify the format and the file path. We also tell Spark to read the first line as a header and figure out the data type of each column automatically. (Like is it a string or a number, basically.) --- # See the data - We can print 20 lines, or even more. - In scala there is no need for `()` to call a method. ```scala routes.show airports.show(30) ``` ??? --- # Example: filtering There are different ways to say the same thing: ```scala val hunPorts = airports. filter(airports("country") === "Hungary") val hunPorts = airports. filter($"country" === "Hungary") // Let's type in the shortest: val hunPorts = airports. filter("country = 'Hungary'") hunPorts.show // Chaining commands: airports.filter("country = 'Hungary'").show ``` ??? Different ways to express stuff. For filtering we need to specify a logic expression, which involves some columns. Columns can be referred different ways, I am sticking with the easiest to type version today. When doing several operations based on a DataFrame, there are two ways to go: in each line create a new DataFrame and use that in the next line. Or write all the operations chained in one line. (Or a multi-line expression.) Behind the scenes, there is no real difference between these two options. Pick the one that looks nicer in your situation. --- # Example: select Let's convert the altitude of cities from feet to meters. Save the list of cities over 1000m to a file. ```scala val x = airports. selectExpr("city", "alt * 0.3 as alt_m") val y = x.filter("alt_m > 1000") y.show y.write.format("json"). save("/path/to/city_elev.json") ``` It creates a directory with two files?! Clue: `y.rdd.partitions.size` ??? Let's create a JSON file so that we can see Spark can handle different formats easily. When we look at the written file, it turns out that it's a directory with two files. What?! This is because Spark splits up the data into partitions. Different partitions can be executed on different computers, but one partition stays on one computer. In this case, Spark has chosen to split this data into two partitions. We can even print the number of partitions. Also, this is a good place to discuss the "laziness" property of Spark. When we write down the first and second lines, Spark won't do any computations. It just takes a note that in order to compute `x`, I will need to do this "select" an airports. And then in order to compute `y`, I will need to do this filter on `x`. --- # SQL example: aggregation Print top airports by number of outgoing flights and show top results. ```scala routes.registerTempTable("routes") sqlContext.sql( "SELECT src_iata, COUNT(dst_iata) AS count " + "FROM routes " + "GROUP BY src_iata " + "ORDER BY count DESC").show() ``` --- # SQL example: joining Replace the source and destination IATA codes in the routes table with city names. ```scala routes.show airports.registerTempTable("airports") sqlContext.sql( "SELECT a1.city AS city1, a2.city AS city2 " + "FROM routes, airports AS a1, airports AS a2 " + "WHERE a1.iata = routes.src_iata AND " + " a2.iata = routes.dst_iata").show() ``` --- # Let's peek behind the scenes http://localhost:4040 - SQL logical plan, physical plan - Jobs, stages, RDDs ??? --- # Q&A --- Questions? # Links http://spark.apache.org/docs/latest/sql-programming-guide.html http://www.slideshare.net/databricks/2015-0616-spark-summit http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset https://spark-summit.org/2015/events/deep-dive-into-project-tungsten-bringing-spark-closer-to-bare-metal/ ---