Graph Analytics Using Big Data

An overview and a small tutorial showing how to analyze a dataset using Apache Spark, graphframes, and Java.



We will not provide some schema to these raw columns. For this we will map and load this data into java pojo as shown. Our pojo object is Airport

JavaRDD airportsRdd = rawDataAirport.javaRDD().map(row -> {
					Airport ap = new Airport();
						ap.setAirportId(row.getString(0));
						ap.setState(row.getString(2));
						...
					return ap;
				});

 

We can convert this rdd into a dataset as dataset is easier to query and use as.

Dataset airports = session.createDataFrame(airportsRdd.rdd(),Airport.class);
airports.createOrReplaceTempView("airports");

 

Similar to this we can also load data for routes as shown.

Dataset rawDataRoute = session.read().csv("data/flight/routes.dat");

 

Again, we can load each row into a java pojo Route and store in an rdd object.

JavaRDD routesRdd = rawDataRoute.javaRDD().map(row -> {
				Route r = new Route();
				r.setSrc(String)
				r.setDst(String)
...
			return r;
});

 

We will convert it back to a dataset as we did earlier for airports.

Dataset routes = session.createDataFrame(routesRdd.rdd(), Route.class);

 

Now we have two datasets – airports and routes. As you can recall that graphs are built using Nodes and Edges in computer science. From the perspective of graphs our nodes are the airports and they are connected with edges via the routes they offer.

So, to build the graph using graphframe we provide the nodes and edges that is airports and routes as:

GraphFrame gf = new GraphFrame(airports, routes);

 

Graphframe requires that you have an ‘id’ attribute in your vertices and a corresponding ‘src’ and ‘dest’ attribute in your edge (check that our pojo’s had these columns or attributes).

Now our graph object is ready and it sits on top of big data using a spark, graphframe stack.

gf.vertices().show();

 

This would print the vertices (and their attributes)

 

 

Now let’s see the airports in India and this is quite easy with graphframe

gf.vertices().filter("country = India'").show();

 

This would show the first few lines of airports in India as shown

 

 

Check the states on the right hand side above and it shows the airports in different cities as ‘Ahmedabad’, ‘Mumbai’, ‘Bhopal’ etc.

Now let’s find the total airports in India. This is a simple query as shown

System.out.println("Airports in India ----> " + gf.vertices().filter("country = 
                   'India").count());

 

This would print the number of airports in India as

Airports in India --> 125

Note: This is as per this dataset. You might need to check on the web for the authenticity of this info.

 

Let’s now find the total flights going in and out of ‘Indira Gandhi international airport’ in delhi

For this find the degree (number of edges flowing in and out of vertices) of the edges and fire a corresponding query on that

Dataset degreesDS = gf.degrees();
		degreesDS.createOrReplaceTempView("DEGREES");
				
session.sql("select a.airportName, a.State, a.Country, d.degree from AIRPORTS a, 
 DEGREES d where a.airportIataCode = d.id and d.id = 'DEL'" ).show();

 

And this would print the data as

 

 

That’s quite a lot right. 527 flights flowing in and out per day from this airport.

We can also break this info into flights flowing into versus out of this airport. To do this instead of ‘degrees()’ method use the ‘inDegrees()’ and the ‘outDegrees()’ method and rest of the code is similar. I leave this code for you to do on your own but when you run that code you should see the following output:

Flights Going out : Indira Gandhi International Airport , Delhi , India , 264
Flights Going in : Indira Gandhi International Airport , Delhi , India , 263

 

Similarly, we can find flights going out or in to other airports too. Also since this is plain sql only you can fire a query to find the top airports in the country with respect to the number of flights going in and out.

Let’s now find the direct flights that run between ‘delhi’ and ‘bangalore’. This is again a simple query:

session.sql("select a.airlineName, r.src,r.dst from ROUTES r, AIRLINES a "
		+ "where r.src = 'DEL' and r.dst = 'BOM' and r.airlineCode = 
                a.airlineId").show();

 

And this would print the airlines giving direct flights between Delhi and Mumbai:

 

 

This same data can be fetched using the concepts of ‘triplets’ in a graph as shown

gf.triplets().filter("src.airportIataCode='DEL' and 
                               dst.airportIataCode='BOM'").show();

 

This would print the data as

 

 

Now let’s find a triplet or direct flight between ‘Delhi’ and ‘Bhuj’

gf.triplets().filter("src.airportIataCode='DEL' and 
                               dst.airportIataCode='BHJ'").show();

 

This would print the result as

 

 

This is an empty result. Thus no direct flight exists. Now we need to find if there is s ‘single stop flight’ to this destintation. Here comes the power of graphs we can simply do this using bread first search as.

Dataset sfoToBufDS = gf.bfs().fromExpr("id = 'DEL'").toExpr("id = 'BHJ'").maxPathLength(2).run();

 

As you can see above the result of the breadth first search is also in the form of a dataset and we store it in a variable .Next we register this dataset as a temporary view called as ‘sfo_to_buf’

sfoToBufDS.createOrReplaceTempView("sfo_to_buf");

 

Finally, we will query on top of this temporary view to figure out the starting state of the vertice and the connecting state and finally the state where the flight ends (in our case this is buffalo). We will also print the output to console by invoking the show method.

session.sql("select distinct from.state , v1.state, to.state from 
 sfo_to_buf").show(100);

 

This would print the result on the screen as

 

 

Thus in order to go to Bhuj from delhi you can take a flight to Mumbai and from Mumbai take a direct flight to Bhuj. You can beautify the result more by changing the queries and also showing the airlines.

Last let’s see an important and complicated piece. If I tell you now to group the airports in India based on their importance. One way to do this is to check the max flights going in and out. But another way to do this is using the page rank algorithm. Thus, it’s not just the number of flights we will also see how the important airport is directly connected to another important airports and based on this each airport gets a score and ranking. PageRank is bundled inside graphframes so the code is just one liner as shown:

Dataset pg = gf.pageRank().resetProbability(0.15).maxIter(5).run().vertices();

 

This operation would run for long as it goes through all the nodes and edges. Can you imagine how google would be doing this on the whole amount of data they have got ?

Now this would print the output as . As you can see each airport gets a pagerank value as shown on the right hand side column on this image below

 

 

The more this pagerank value the more important the airport it is. To get a more meaningful and sorted result (according to their pageranks) lets fire another query on top of this page ranked dataset above as shown:

pg.createOrReplaceTempView("pageranks");
				
session.sql("select * from pageranks order by pagerank desc").show(20);

 

This would print the result as

 

 

Looks like our page rank results are quite good. Our top most airport is Delhi followed by Mumbai and then Kolkata and Bangalore. Not bad.

Summary

This article helped us explore how easy it is to do complex graph analytics on big data using apache spark and graphframes. Also as part of this article we delved into path analytics but graph analytics can help us analyse massive social networks too.

Learn more on big data and graph analytics:

  1. Big Data Analytics with Java by packt publishers.
  2. Apache Spark official website
  3. GraphFrames official link

Bio: Rajat Mehta is a VP at JPMorgan Chase and author of Big Data Analytics with Java: Data analysis, visualization & machine learning techniques.

Related