Spark and SPARQL; RDF Graphs and GraphX

Some interesting possibilities for working together.
some description

In Spark Is the New Black in IBM Data Magazine, I recently wrote about how popular the Apache Spark framework is for both Hadoop and non-Hadoop projects these days, and how for many people it goes so far as to replace one of Hadoop's fundamental components: MapReduce. (I still have trouble writing "Spar" without writing "ql" after it.) While waiting for that piece to be copyedited, I came across 5 Reasons Why Spark Matters to Business by my old XML.com editor Edd Dumbill and 5 reasons to turn to Spark for big data analytics in InfoWorld, giving me a total of 10 reasons that Spark... is getting hotter.

I originally became interested in Spark because one of its key libraries is GraphX, Spark's API for working with graphs of nodes and arcs. The "GraphX: Unifying Data-Parallel and Graph-Parallel Analytics" paper by GraphX's inventors (pdf) has a whole section on RDF as related work, saying "we adopt some of the core ideas from the RDF work including the triples view of graphs." The possibility of using such a hot new Big Data technology with RDF was intriguing, so I decided to look int it.

I thought it would be interesting to output a typical GraphX graph as RDF so that I could perform SPARQL queries on it that were not typical of GraphX processing, and then to go the other way: read a good-sized RDF dataset into GraphX and do things with it that would not be typical of SPARQL processing. I have had some success at both, so I think that RDF and GraphX systems have much to offer each other.

This wouldn't have been very difficult if I wasn't learning the Scala programming language as I went along, but GraphX libraries are not available for Python or Java yet, so what you see below is essentially my first Scala program. A huge help in my attempts to learn Scala, Spark, and GraphX were the class handouts of Swedish Institute of Computer Science senior researcher Amir H. Payberah. I just stumbled across them in some web searches while trying to get a Scala GraphX program to compile, and his PDFs introducing Scala, Spark, and graph processing (especially the GraphX parts) lit a lot of "a-ha" lightbulbs for me, and I had already looked through several introductions to Scala and Spark. He has since encouraged me to share the link to course materials for his current course on cloud computing.

While I had a general idea of how functional programming languages worked, one of the lightbulbs that Dr. Payberah's work lit for me was why they're valuable, at least in the case of using Spark from Scala: Spark provides higher-order functions that can hand off your own functions and data to structures that can be stored in distributed memory. This allows the kinds of interactive and iterative (for example, machine learning) tasks that generally don't work well with Hadoop's batch-oriented MapReduce model. Apparently, for tasks that would work fine with MapReduce, Spark versions also run much faster because their better use of memory lets them avoid all the disk I/O that is typical of MapReduce jobs.

Spark lets you use this distributed memory by providing a data structure called a Resilient Distributed Dataset, or RDD. When you store your data in RDDs, you can let Spark take care of their distribution across a computing cluster. GraphX lets you store a set of nodes, arcs, and—crucially for us RDF types—extra information about each in RDDs. To output a "typical" GraphX graph structure as RDF, I took the Example Property Graph example in the Apache Spark GraphX Programming Guide and expanded it a bit. (If experienced Scala programmers don't gag when they see my program, they will in my next installment, where I show how I read RDF into GraphX RDDs. Corrections welcome.)

My Scala program below, like the Example Property Graph mentioned above, creates an RDD called users of nodes about people at a university and an RDD called relationships that stores information about edges that connect the nodes. RDDs use long integers such as the 3L and 7L values shown below as identifiers for the nodes, and you'll see that it can store additional information about nodes—for example, that node 3L is named "rxin" and has the title "student"—as well as additional information about edges—for example, that the user represented by 5L has an "advisor" relationship to user 3L. I added a few extra nodes and edges to give the eventual SPARQL queries a little more to work with.

Once the node and edge RDDs are defined, the program creates a graph from them. After that, I added code to output RDF triples about node relationships to other nodes (or, in RDF parlance, object property triples) using a base URI that I defined at the top of the program to convert identifiers to URIs when necessary. This produced triples such as <http://snee.com/xpropgraph#istoica> <http://snee.com/xpropgraph#colleague> <http://snee.com/xpropgraph#franklin> in the output. Finally, the program outputs non-relationship values (literal properties), producing triples such as <http://snee.com/xpropgraph#rxin> <http://snee.com/xpropgraph#role> "student".

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object ExamplePropertyGraph {
    def main(args: Array[String]) {
        val baseURI = "http://snee.com/xpropgraph#"
	val sc = new SparkContext("local", "ExamplePropertyGraph", "127.0.0.1")

        // Create an RDD for the vertices
        val users: RDD[(VertexId, (String, String))] =
            sc.parallelize(Array(
                (3L, ("rxin", "student")),
                (7L, ("jgonzal", "postdoc")),
                (5L, ("franklin", "prof")),
                (2L, ("istoica", "prof")),
                // Following lines are new data
                (8L, ("bshears", "student")),
                (9L, ("nphelge", "student")),
                (10L, ("asmithee", "student")),
                (11L, ("rmutt", "student")),
                (12L, ("ntufnel", "student"))
            ))
        // Create an RDD for edges
        val relationships: RDD[Edge[String]] =
            sc.parallelize(Array(
                Edge(3L, 7L, "collab"),
                Edge(5L, 3L, "advisor"),
                Edge(2L, 5L, "colleague"),
                Edge(5L, 7L, "pi"),
                // Following lines are new data
                Edge(5L, 8L, "advisor"),
                Edge(2L, 9L, "advisor"),
                Edge(5L, 10L, "advisor"),
                Edge(2L, 11L, "advisor")
            ))
        // Build the initial Graph
        val graph = Graph(users, relationships)

        // Output object property triples
        graph.triplets.foreach( t => println(
            s"<$baseURI${t.srcAttr._1}> <$baseURI${t.attr}> <$baseURI${t.dstAttr._1}> ."
        ))

        // Output literal property triples
        users.foreach(t => println(
            s"""<$baseURI${t._2._1}> <${baseURI}role> \"${t._2._2}\" ."""
        ))

        sc.stop

    }
}

The program writes out the RDF with full URIs for each every resource, but I'm showing a Turtle version here that uses prefixes to help it fit on this page better:

@prefix xp: <http://snee.com/xpropgraph#> . 

xp:istoica  xp:colleague xp:franklin .
xp:istoica  xp:advisor   xp:nphelge .
xp:istoica  xp:advisor   xp:rmutt .
xp:rxin     xp:collab    xp:jgonzal .
xp:franklin xp:advisor   xp:rxin .
xp:franklin xp:pi        xp:jgonzal .
xp:franklin xp:advisor   xp:bshears .
xp:franklin xp:advisor   xp:asmithee .
xp:rxin     xp:role      "student" .
xp:jgonzal  xp:role      "postdoc" .
xp:franklin xp:role      "prof" .
xp:istoica  xp:role      "prof" .
xp:bshears  xp:role      "student" .
xp:nphelge  xp:role      "student" .
xp:asmithee xp:role      "student" .
xp:rmutt    xp:role      "student" .
xp:ntufnel  xp:role      "student" .

My first SPARQL query of the RDF asked this: for each person with advisees, how many do they have?

PREFIX xp: <http://snee.com/xpropgraph#>

SELECT ?person (COUNT(?advisee) AS ?advisees)
WHERE {
  ?person xp:advisor ?advisee
}
GROUP BY ?person

Here is the result:

--------------------------
| person      | advisees |
==========================
| xp:franklin | 3        |
| xp:istoica  | 2        |
--------------------------

The next query asks about the roles of rxin's collaborators:

PREFIX xp: <http://snee.com/xpropgraph#>

SELECT ?collaborator ?role
WHERE {
  xp:rxin xp:collab ?collaborator . 
  ?collaborator xp:role ?role . 
}

As it turns out, there's only one:

----------------------------
| collaborator | role      |
============================
| xp:jgonzal   | "postdoc" |
----------------------------

Does nphelge have a relationship to any prof, and if so, who and what relationship?

PREFIX xp: <http://snee.com/xpropgraph#>

SELECT ?person ?relationship
WHERE {

  ?person xp:role "prof" . 

  { xp:nphelge ?relationship ?person }
  UNION
  { ?person ?relationship xp:nphelge }

}

And here is our answer:

-----------------------------
| person     | relationship |
=============================
| xp:istoica | xp:advisor   |
-----------------------------

A hardcore RDF person will have two questions about the sample data:

  • What about properties of edges? For example, what if I wanted to say that an xp:advisor property was an rdfs:subPropertyOf the Dublin Core property dc:contributor?

  • The ability to assign properties such as a name of "rxin" and a role of "student" to a node like 3L is nice, but what if I don't have a consistent set of properties that will be assigned to every node—for example, if I've aggregated person data from two different sources that don't use all the same properties to describe these persons?

Neither of those were difficult with GraphX, and next month I'll show my approach. I'll also show how I applied that approach to let a GraphX program read in any RDF and then perform GraphX operations on it.


Please add any comments to this Google+ post.