class: center, middle, inverse
# Rich, type-safe, composable
graph transformations Daniel Darabos, Lynx Analytics .fadeout[Press **P** to read the speaker notes.] --- # Lynx Analytics Big data analytics company focused on big graphs: - social networks - mobile phone call graphs - banking transaction networks ??? We provide clients with valuable "insights", such as estimating the age of pre-paid subscribers based on a call graph. As the company accumulated a great deal of expertise in this domain, we decided to create a comprehensive product for it. --- # LynxKite Lynx Analytics' core product, takes care of: - visualizing big graphs - calculating graph metrics - graph manipulation - finding structure in graphs - ... Written in Scala, uses Apache Spark for scalable computation, AngularJS for the frontend. ??? Example calculation: PageRank. Example manipulation: merging vertices by a matching attribute. "..." stands for machine learning, aggregation, import/export, etc. We have over 100 graph operations. --- class: center, middle, inverse # Demo ??? I just want to show this is a real product and anchor the presentation. See, PageRank works! I didn't lie. Now I filter by gender: this is the PageRank histogram for men. All of this is built on a solid foundation of graph representation. --- # Graph representation Just 4 types of immutable graph entities: - Vertex Set - Edge Bundle - Attribute - Scalar
??? This is all I'm going to talk about for the rest of this talk. If you're interested in other topics, chat me up after the talk. A graph is made up of vertices connected by edges. E.g. vertices can be bank accounts and edges can be transactions. Why are vertices in sets while edges are in bundles? Vertices are small dots, you cannot bundle them. They only have an ID. Edges also have IDs, but they also have a source and destination vertex ID. The vertex and edge IDs are not different. You can even have edges going between edges. Sounds weird, but has a lot of practical value. (Just wait a few slides.) Attributes assign some value (string, double, anything) to IDs. (Either vertex or edge IDs.) E.g. account balances or transaction dates. Scalars just hold any kind of value that is not associated with IDs. E.g. average account balance for the whole bank. I've capitalized the names because these are the actual class names. --- # Graph representation Just 4 subclasses of `MetaGraphEntity`: - `VertexSet` - `EdgeBundle` - `Attribute[T]` - `Scalar[T]`
??? Now these are Scala types. Scala is a JVM language so it interoperates with Java and has the same foundations of classes, objects, methods, etc. It's a functional language, which makes it especially suited for distributed computation. I will briefly talk about distributed computation with Apache Spark, but I will focus on everything that happens before the computation. These are called _meta_ graph entities because they do not contain the actual gigabytes of data associated with them. We will get to the data, but the data is only necessary for the actual computation, and so many interesting things happen before that. Besides being "functional", Scala is also world-famous for its rich type-system. `VertexSet` and `EdgeBundle` are simple classes. But `Attribute[T]` and `Scalar[T]` have something going on. The `[T]` at the end is a type parameter. A subscriber name would be an `Attribute[String]`. The average call length for the month would be `Scalar[Double]`. It's like C++ templates or Java generics. --- # Operations Kinda like a function. Naïve code: ```scala def pagerank( iterations: Int, vs: VertexSet, es: EdgeBundle, weight: Attribute[Double]): Attribute[Double] = { // Calculate PageRank. } ``` Easy to invoke. Type-safe. ??? An example operation is one that calculates PageRank. The PageRank of a vertex is an approximation of the likelihood of ending up in this vertex if we start from a random vertex and follow edges at random. Obviously a function is easy to invoke. Just call it. It's also type-safe: if you accidentally swap the vertices and edges, the compiler will catch your mistake. -- .red[Persistence?] ??? The big issue with this simple function interface is persistence. If the only interface to our code is an API, the interface can work. We write a program that calls the `pagerank` functions and other operations one after the other. But each operation can take hours to compute. The computations can fail. We want to add another operation at the end. We don't want to re-run the program from the start each time. Plus in LynxKite the processing workflow is not described in Scala code. It's clicked together via the UI. We need to represent these logical functions as something that can be persisted. At the same time we want to retain the compile-time safety, and ease of invocation of functions. Our solution is graph-specific, but you can steal most of the ideas for other domains too. The desire to persist a complex, slow, ever-growing computation is universal. Let's decompose operations. What are they really made of? --- # Operations Take parameters. Take inputs. Generate outputs. Can compute outputs. ??? In the PageRank example the parameter would be the number of iterations to do. This describes the algorithm without attaching it to specific inputs. The input signature can depend on the parameters. The inputs are graph entities that the operation will work on. For PageRank these would be the vertex set and edge bundle of a graph and a `Double`-valued attribute describing the edge weights. (A higher weight means we are more likely to follow that edge in the random walk.) Once we fill in the inputs of an operation, we call it an operation instance. PageRank would have a single output, a `Double`-valued vertex attribute. The meta-level output is available as soon as the meta-level inputs have been used to create a meta graph operation instance. But this instance is also capable of computing the actual data of the outputs if it is given the actual data of its inputs. Inputs and outputs are identified by string names in the input and output signatures. Any entity is defined by the generating operation instance and the name of the entity in the operation's output signature. Now you will understand the full definition of the entity types. --- # Meta graph
??? We can plot this data flow network. Entities go into operations and new entities come out. --- # Meta graph
??? Later operations can use the outputs of earlier operations. We call this network the "meta graph". Some entities in this graph may have already been computed. If we need one entity, and it has not been computed yet, we need to run its source operation. Before we can run that we need to make sure its inputs have already been computed. You could also call this a dependency graph. I bet you've met this in some form. I'll talk more about how we represent this network in our API. --- # MetaGraphEntity ```scala sealed trait MetaGraphEntity { val source: MetaGraphOperationInstance val name: String } case class VertexSet( source: MetaGraphOperationInstance, name: String) extends MetaGraphEntity ``` ??? A "sealed" trait is like an interface that can only be implemented in the current file, so you know there will never be a 5th type of `MetaGraphEntity`. Operation instances in turn are defined by the operation, the operation parameters, and the input entities. This may sound circular, but it's possible for an operation not to have any inputs. For example an operation that imports a CSV file would just have the name of the file as a parameter, but it would have no input entities. Such an operation is less interesting though, so let's step back to PageRank. --- # Unsafe operations ```scala val g = ExampleGraph().result val pr = { val op = PageRank(iterations = 10) op( "vs" -> g("vertices"), "es" -> g("edges"), "weight" -> g("edgeWeight")).result("pagerank") } ``` ??? At an early stage our code looked roughly like this. Now these operation objects can be persisted all right. It's fairly simple to implement something like this. Raise your hands if you have implemented something like this! Just me? Okay. Anyway, we were not happy with this. -- .red[No compile-time checks!] ??? These string identifiers are terrible. You can make a typo and it will not be found out during compilation. You could add constants for them, but then you can still use the wrong constant. You could swap vertices and edges and it would only hit assertions at run time. We have a powerful UI and every day our users do something we never anticipated. This reads well in the product brochure, but it is just another way of saying we cannot be sure our unit tests cover everything that can happen. We really want those compile-time safeties back. And this is the tricky part. I'll start from the operation definition this time, and work toward the safe and pretty invocation. --- # PageRank ```scala class Input { val vs = vertexSet val es = edgeBundle(src = vs, dst = vs) val weights = edgeAttribute[Double](es) } class Output(inputs: Input) { val pagerank = vertexAttribute[Double](inputs.vs) } class PageRank(iterations: Int) { // Code describing actual computation comes here. } ``` (Pseudocode.) ??? This is the idealized code of PageRank. The real code is not much different, but the differences are very interesting. First, let's get to know this code. We declare the inputs, give them names, like `vs`, and types, like `vertexSet`. We also set up constraints, such as declaring that the edge bundle called `es` will have to go from `vs` to `vs`. (So it's a set of edges on this set of vertices.) For attributes we declare their types (`Double` in both cases) and what they belong to (`es` and `vs`). The operation is a plain class. The constructor takes some parameters. Let's look at the differences! --- # PageRank ```scala class Input extends MagicInputSignature { val vs = vertexSet val es = edgeBundle(src = vs, dst = vs) val weights = edgeAttribute[Double](es) } class Output( implicit instance: MetaGraphOperationInstance, inputs: Input) extends MagicOutput(instance) { val pagerank = vertexAttribute[Double](inputs.vs.entity) } case class PageRank(iterations: Int) extends TypedMetaGraphOp[Input, Output] { // Code describing actual computation comes here. } ``` ??? `Input` now inherits from `MagicInputSignature`. This is the class that defines `vertexSet`, `edgeBundle`, etc. These are placeholders for actual meta graph entities. These objects store the constraints, and check them when the input is built. (This is a nice advantage over the plain function interface!) They also know their own name. `vs.name` is the string `"vs"`. This is the "magic" part in `MagicInputSignature`, since this is done with reflection. To find out the name of a placeholder, we just check all the fields in the class with reflection, and find the name of the field that contains the placeholder. Did you expect more magic? Let's look at `MagicOutput` then! Actually it's almost identical. The only extra magic is accessing `inputs.vs`. There is an asymmetry here. The inputs have to be declared before the operation instance is created. (Because the operation instance is created by providing its inputs.) But the outputs are only declared once the operation instance exists. So they depend on the input meta graph entities, not the placeholders. You can see that the `Output` class takes an "implicit" `instance` parameter. How is it used? `inputs.vs` is a placeholder and `inputs.vs.entity` is a `VertexSet` instance. How does the placeholder get the entity? --- ```scala trait MetaGraphOperationInstance { val operation: MetaGraphOp val inputs: MetaDataSet } case class MetaDataSet( vertexSets: Map[String, VertexSet], edgeBundles: Map[String, EdgeBundle], attributes: Map[String, Attribute[_]], scalars: Map[String, Scalar[_]]) abstract class MagicInputSignature { def nameOf(p: Placeholder): String // Reflection. class VertexSetPlaceholder extends Placeholder { def entity( implicit instance: MetaGraphOperationInstance): VertexSet = { instance.inputs.vertexSets(nameOf(this)) } } } ``` ??? In Scala if you have an implicit variable of type T in a scope, and a function call takes an implicit parameter of type T, the implicit variable will be passed in automatically. I mean _implicitly_! So when we said `inputs.vs.entity`, it's a function call and it passes in the `instance` variable. The placeholder can figure out its name and look it up in the list of inputs for the operation instance. Similar magic is used throughout our operation code. Let's look at how an operation is invoked. --- # Running an operation ```scala val g = ExampleGraph().result val pr = { val op = PageRank(iterations = 10) op( op.vs, g.vertices)( op.es, g.edges)( op.weight, g.edgeWeight).result.pagerank } ``` Boilerplate not bad compared to a function call. Type-safe, typo-safe. ??? `ExampleGraph` is an operation with no parameters and no inputs. It's a favorite for unit tests. In the first line we take all its outputs. We create a PageRank operation by providing its parameters. Then we provide its inputs. The fields in the input signature are used as keys for setting the inputs. It provides compile-time safety against typos. But it also provides type-safety, so if you swap `g.vertices` and `g.edges` the program will not compile. If `edgeWeight` were a `String` attribute, the program would not compile. At run time we also automatically check that `g.edgeWeight` is in fact an attribute on `g.edges` and that `g.edges` is in fact an edge bundle from `g.vertices` to `g.vertices`. While this check does not happen at compile-time, it still happens before any actual computation. I have added a few slides between the operation definition and this invocation, but if you have a compiler running in your brain, it may be raising a bunch of red flags. `op` is an instance of `PageRank`. `vs` is a field of `Input`. Why do we write `op.vs`? That shouldn't work! Well, this is Scala. We have defined an implicit conversion that can convert an operation into its input type. `op` does not have a field called `vs`, but there is an implicit conversion to a class that does. So the compiler adds the conversion. It's as if we wrote `op.inputs.vs`, just a bit shorter. The code above in fact does not compute PageRank. It just sets up the entities at the meta level. There is no connection to Spark so far. It is very simple to go from the "meta" level to the "data" level. --- # Actually running an operation ```scala // Lazy operations. val g = ExampleGraph().result val pr = PageRank.run( // Convenience wrapper. g.vertices, g.edges, g.edgeWeight) // Actual execution on actual data: println(s"PageRank values: ${pr.rdd.collect}") ``` ??? Accessing `pr.rdd` triggers the computation. Before it can execute `PageRank` it needs to generate all its inputs. This triggers the execution of `ExampleGraph`. Once that's done, `PageRank` runs, and we get the Apache Spark RDD (a distributed array) from `PageRank`'s output. `RDD.collect` just fetches the contents of the RDD from the distributed system to the local machine. Now you know how everything works. This is how we write code that uses operations. Let's look at a more complex example that uses a couple more operations. --- # Filter ```scala val g = ExampleGraph().result val pr = PageRank.run(/* ... */) val f = { val op = FilterVerticesByAttribute("=male") op( op.vs, g.vertices)( op.attr, g.gender).result } ``` ??? `FilterVerticesByAttribute` filters the vertices by the attribute. Who knew. It outputs a filtered vertex set. But `pr`, the PageRank attribute is defined on the original vertex set. How do we get a filtered version of it? --- # Filter ```scala val g = ExampleGraph().result val pr = PageRank.run(/* ... */) val f = { val op = FilterVerticesByAttribute("=male") op( op.vs, g.vertices)( op.attr, g.gender).result } val fpr = { val op = PullOverAttribute[Double]() val result = op( op.originalAttribute, pr)( op.originalVertices, g.vertices)( op.destinationVertices, f.filtered)( op.projection, f.transformation).result result.destinationAttribute } ``` ??? Besides the filtered vertex set, `FilterVerticesByAttribute` also outputs an edge bundle that represents the transformation. This is an edge bundle from the original vertices to the filtered vertices. `PullOverAttribute` is an operation that projects an attribute from one vertex set to another, given a projection from one to the other. This is how we get the filtered PageRank attribute. Does it seem like too much code? There is actually some redundancy. Like I said, we check at run time that the inputs are consistent. We check that `originalAttribute` is indeed defined on `originalVertices`. But if we can check that, why do we even need to specify those inputs? Indeed we don't have to, I just wanted to make the example clearer. --- # Filter ```scala val g = ExampleGraph().result val pr = PageRank.run(/* ... */) val f = { val op = FilterVerticesByAttribute("=male") op( // op.vs, g.vertices)( op.attr, g.gender).result } val fpr = { val op = PullOverAttribute[Double]() val result = op( op.originalAttribute, pr)( // op.originalVertices, g.vertices)( // op.destinationVertices, f.filtered)( op.projection, f.transformation).result result.destinationAttribute } ``` ??? Both of these operations are small and quite abstract, but they compose in very nice ways. `PullOverAttribute` is very flexible because it can handle any attribute type. Check out its input signature. --- ## PullOverAttribute ```scala class Input[T] extends MagicInputSignature { val originalVertices = vertexSet val destinationVertices = vertexSet val originalAttribute = vertexAttribute[T](originalVertices) val projection = edgeBundle( src = originalVertices, dst = destinationVertices, EdgeBundleProperties(isReversedFunction = true)) } class Output[T]( implicit instance: MetaGraphOperationInstance, inputs: Input[T]) extends MagicOutput(instance) { implicit val tt = inputs.originalAttribute.typeTag val destinationAttribute = vertexAttribute[T](inputs.vs.entity) } ``` ??? There are more lines than for PageRank, but it's mostly just because of the longer field names. One new thing is the `[T]` type parameter. It's going to take an `Attribute[T]` and output another `Attribute[T]`. Java generics are just compile-time. A `List[Int]` and a `List[String]` are the same at run-time. (As long as they are both empty...) Scala runs on the same VM, so it cannot distinguish between `Attribute[Int]` and `Attribute[Double]` either. If we only stored attributes in local variables (like `val pr = PageRank.run(...)`) we would be fine. The compiler would be able to track the type. But we have a web UI and users can add and remove attributes by clicking around. The list of attribute names and types is not in the code. So we must be able to tell and check types at run-time. `TypeTag`s are Scala's solution for this. A `TypeTag` is a full type specification as data that you can store in a variable. So we store the `TypeTag` for every attribute and scalar. When we created an `Attribute[Double]` in PageRank's output, the compiler generated a `TypeTag` for `Double`. It automatically generates `TypeTag`s for any concrete type. But `PullOverAttribute` outputs an `Attribute[T]` and the compiler cannot generate a `TypeTag` for type `T`. We must provide it. So we take it from the input attribute. That was easy! Edge bundles can also be looked at as vertex sets, so `PullOverAttribute` also works for edge attributes. --- # Summary | Functions | String keys | Scala magic ----------------|----------------|-------------|-------------- Invocation | .no1[simplest] | .no2[okay] | .no2[okay] Implementation | .no1[simplest] | .no2[okay] | .no3[complex] Compiler checks | .no1[yes] | .no3[no] | .no1[yes] Persistence | .no3[no] | .no1[yes] | .no1[yes] Constraints | .no3[no] | .no2[maybe] | .no1[yes] ??? I know very well the effects of Scala on the human brain. If you just woke up, here's a summary of what we're proud of. Our fancy solution recovers the important compiler checks of simple functions while implementing persistence based on string keys. The declarative constraints (like validating that an edge bundle in the input applies to the vertex set in the input) are a nice bonus. The complexity is the price we pay for this. Complexity is expensive because it increases the likelihood of bugs, and makes any debugging more difficult. But persistence is a must and the lack of compile-time checks with string keys would have enabled a lot of bugs. In practice we made a very good deal. The tricky code is very small and I can't recall any bugs we found in it. --- class: links Scala: http://scala-lang.org/ Apache Spark: http://spark.apache.org/ Me:
This presentation: http://rnd.lynxanalytics.com/presentations/bdu2016/ # Questions?