class: center, middle, inverse
# Spark SQL
in network analytics Daniel Darabos, Lynx Analytics .fadeout[Press **P** to read the speaker notes.] --- # Lynx Analytics Big data analytics company focused on big graphs: - social networks - mobile phone call graphs - banking transaction networks ??? We provide clients with valuable "insights", such as estimating the age of pre-paid subscribers based on a call graph. As the company accumulated a great deal of expertise in this domain, we decided to create a comprehensive product for it. --- # LynxKite Lynx Analytics' core product, takes care of: - visualizing big graphs - calculating graph metrics - graph manipulation - finding structure in graphs - ... Written in Scala, uses Apache Spark for scalable computation, AngularJS for the frontend. ??? Example calculation: PageRank. Example manipulation: merging vertices by a matching attribute. "..." stands for machine learning, aggregation, import/export, etc. We have over 100 graph operations. --- class: center, middle, inverse # Demo ??? I just want to show this is a real product and anchor the presentation. Segue into SQL: Analysts are used to using SQL for looking at data. You cannot have an analytical tool without SQL. --- We added SQL earlier this year. ```sql SELECT * FROM vertices WHERE age > 18; SELECT src_id, dst_id, src_age - dst_age AS diff FROM edges ORDER BY diff DESC LIMIT 10; ``` # Why? - Widely known - Easy to use - Flexible - Spark does it well ??? We've added a SQL box at the bottom of the project page. It's not very flashy. It's not animated and not 3D. Still the users love it! We love it too, because it can fill in for missing functionality. If we don't have an operation for some calculation, it is often possible to hack it up with SQL in a live instance until we can get out a release with a proper implementation. Have I convinced you to add a SQL box to all of your products now? Okay, here's how you do it. --- # Our data ```scala trait Table { def idSet: VertexSet def columns: Map[String, Attribute[_]] } ``` ??? This is the API on our end. We have built up tables of vertices and edges. These tables have an ID set and a bunch of attributes of various types that are identified by string names. The attributes can provide RDDs, which are the distributed data representation that is the foundation of Apache Spark. Now we want to run SQL on this. --- # Running SQL ```scala trait Table { def idSet: VertexSet def columns: Map[String, Attribute[_]] def toDF(ctx: sql.SQLContext): sql.DataFrame = new TableRelation(this, ctx).toDF } ``` And then: ```scala val ctx = sqlContext.newSession() vertexTbl.toDF(ctx).registerTempTable("vertices") edgeTbl.toDF(ctx).registerTempTable("edges") ctx.sql("SELECT COUNT(*) FROM vertices") ``` ??? Once we can create a DataFrame from the Table, everything is easy. We create a throwaway `SQLContext` as a scope where we can register tables called `vertices` and `edges` without interfering with other projects people are working on at the same time. We create and register DataFrames in this context. Then we can run SQL queries on it and return the results to the UI. Apparently the magic happens in `TableRelation`. How does it create a DataFrame? --- ```scala class TableRelation( table: controllers.Table, val sqlContext: sql.SQLContext) extends sql.sources.BaseRelation with sql.sources.TableScan with sql.sources.PrunedScan { def toDF = sqlContext.baseRelationToDataFrame(this) // Implement BaseRelation. val schema: sql.types.StructType = { val fields = table.columns.toSeq.map { case (name, attr) => sql.types.StructField( name = name, dataType = Table.dfType(attr.typeTag)) } sql.types.StructType(fields) } // ... continued on next slide ... ``` ??? This is the Spark Data Source API. This API is also used for implementing support for file formats, like CSV. But it's also a good fit for creating DataFrames out of your random data representations. We implement three interfaces. `BaseRelation` can describe its schema. You can see the implementation of that here. It just looks at the attributes in our table and creates a `StructField` for each, then builds a `StructType` of these fields. --- ```scala // ... continued from previous slide ... // Implement TableScan. def buildScan(): RDD[sql.Row] = buildScan(schema.fieldNames) // Implement PrunedScan. def buildScan( wantedColumns: Array[String]): RDD[sql.Row] = { val emptyRows = table.idSet.rdd.mapValues(_ => Seq[Any]()) val rdds = wantedColumns.toSeq.map { name => table.columns(name).rdd } val seqRows = rdds.foldLeft(emptyRows) { (seqs, rdd) => seqs.leftOuterJoin(rdd).mapValues { case (seq, opt) => seq :+ opt.getOrElse(null) } } seqRows.values.map(sql.Row.fromSeq(_)) } } ``` ??? `TableScan` can return the contents of the entire DataFrame as an RDD of Rows that conform to the specified schema. `PrunedScan` is a smarter interface for columnar data sources. It specifies the list of columns it wants to access. We store the data for our columns in separate directories. If you only use just one or two columns in a SQL query, we can save a lot of time by only reading those columns from disk. About the implementation. If you don't want to learn anything about Scala plug your ears now. Check out the last line: we want to return an RDD of `sql.Row` lines and we can create `Row`s from `Seq`s. So we are going to create an RDD of `Seq`s that contains all the requested attributes. An attribute is not necessarily defined for all vertices, but we can use `null` for missing values. A plain join of attributes would lose the lines with missing values though, so we use "outer" join, which gives us an `Option` that may contain the value or may be `None`. `getOrElse` resolves this `Option` to the actual value or to `null`. But the most interesting part is the `foldLeft`. First we create an RDD with an empty `Seq` for each vertex. (`emptyRows`) Then we join each attribute RDD to this RDD, and add each joined value to the `Seq` for the vertex. At the end the `Seq` will contain all the attribute values for the vertex. You can unplug your ears now. --- # Surprise benefits Import and export to: - CSV - JDBC - JSON - Parquet - ORC - Hive ??? The Spark DataFrame API has bindings to a bunch of formats that has now all become available to us. We had our own CSV and JDBC import/export code, but we threw it away and use DataFrames for this now. The import code is not very exciting, it just takes a DataFrame apart into `Attribute`s and creates a `Table` from it. We get the benefits of Spark's mature data format handlers: for example the CSV code can infer data types. We do exporting via SQL now. For the default case where you want to export everything, it still just takes a click, because we fill in the default `SELECT *` query. But it gives you the power to export only part of the data whichever way you want to slice it. (For example to export only some attributes.) --- # Surprise benefits Tables are a nice abstraction. - Import tables as vertices/edges. - Project vertices/edges are available as tables. Crazy combinations! - Import the vertices of one project as a set of edges in another. ??? We had issues with the cartesian product of import operations and data sources. We had operations for importing vertices from CSV, importing vertices from JDBC, importing edges from CSV, importing edges from JDBC, importing segmentations from CSV, etc... We dreaded the day we had to add a new data source, because it would mean adding five new operations. The table-based API we created fixed this. We have a method for each data source that just creates a `Table` from that source. Then we have import operations that take a `Table` and import it into a project as vertices, edges, segmentation, etc. It's painless now to add a new data source or a new import operation. --- class: links Apache Spark: http://spark.apache.org/ Me:
This presentation: [http
://rnd.lynxanalytics.com
/presentations/spark-meetup-2016-sql-analytics](http://rnd.lynxanalytics.com/presentations/spark-meetup-2016-sql-analytics) # Questions?