Beginners Guide: Apache Spark Machine Learning with Large Data

This informative tutorial walks us through using Spark's machine learning capabilities and Scala to train a logistic regression classifier on a larger-than-memory dataset.



3. Setup and Run Apache Spark in a standalone mode


If you don’t have Apache Spark in your machine you can simply download it from the Spark web page http://spark.apache.org/. Please use version 1.5.1. Direct link to a pre-built version – http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz

You are ready to run Spark in Standalone mode if Java is installed in your computer. If not – install Java.

For Unix systems and Macs, uncompress the file and copy to any directory. This is a Spark directory now.

Run spark master:

sbin/start-master.sh

Run spark slave:

sbin/start-slaves.sh

Run Spark shell:

bin/spark-shell

Spark shell can run your Scala command in interactive mode.

Windows users can find the instruction here: http://nishutayaltech.blogspot.in/2015/04/how-to-run-apache-spark-on-windows7-in.html

If you are working in cluster mode in a Hadoop environment, I’m assuming you already know how to run the Spark shell.

4. Importing libraries


For this end-to-end scenario we are going to use Scala, the primary language for Apache Spark.

// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification
    .LogisticRegression
import org.apache.spark.mllib.evaluation
    .BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

5. Parsing XML


We need to extract Body, Text and Tags from the input xml file and create a single data-frame with these columns. First, let’s remove the xml header and footer. I assume that the input file is located in the same directory where you run the spark shell command.

val fileName = "Posts.small.xml"
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
   filter(!_.startsWith("<?xml version=")).
   filter(_ != "<posts>").
   filter(_ != "</posts>")

Spark has good functions for parsing json and csv formats. For Xml we need to write several additional lines of code to create a data frame by specifying the schema programmatically.

Note, Scala language automatically converts all xml codes like “<a>” to actual tags “<a>”. Also we are going to concatenate title and body and remove all unnecessary tags and new line characters from the body and all space duplications.

val postsRDD = postsXml.map { s =>
   val xml = XML.loadString(s)

   val id = (xml \ "@Id").text
   val tags = (xml \ "@Tags").text

   val title = (xml \ "@Title").text
   val body = (xml \ "@Body").text
   val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
   val text = (title + " " + bodyPlain).replaceAll("\n", 
      " ").replaceAll("( )+", " ");

   Row(id, tags, text)
}

To create a data-frame, schema should be applied to RDD.

val schemaString = "Id Tags Text"
val schema = StructType(
   schemaString.split(" ").map(fieldName => 
      StructField(fieldName, StringType, true)))

val postsDf = sqlContext.createDataFrame(postsRDD, schema)

Now you can take a look at your data frame.

postsDf.show()