Web app and RESTful services using MEAN stack and back end with Spark + Scala

I have developed a MEAN stack application which shows the San Francisco Food inspections details.
Source: Food Inspection(Use Food Inspections – LIVES Standard)

I have used Spark, Scala, MongoDB, NodeJs, AngularJs to do this.

My spark job reads the input CSV data contains food inspection details and processes it and stores the data in MongoDB as collections. I have allFoodInspection and filterFoodInspection collections here. The first one has all the data and the second one has the business name, the unique risk category and number of risk’s committed.

My MEAN stack REST layer reads the data from Mongodb and processes and exposes the data and the Web Layer uses the data and display it and use the data for drawing a chart.

Let us see how we can execute this.

  1. Follow the steps given in this post to install scala, sbt and spark in your machine if you are using Ubuntu. Refer my another post to know how to install these. How to install Scala, SBT and Spark in Ubuntu
  2. Clone the git repository https://github.com/dkbalachandar/sf-food-inspection-spark.git and go inside of sf-inspection-spark folder and run ‘sbt assembly’ to create a far jar with all the dependencies.Here I have used spark 2.0.2 and scala 2.11.8 (Spark 2.0.2 version is compatible with scala 2.11.x version).
    If you don’t use the compatible version then you will end up with lots of errors.
  3. Copy the ../sf-food-inspection-spark/target/scala-2.11/sf-food-inspection-spark-assembly-1.0.jar to /usr/local/spark folder
  4. Download Food_Inspections_-_LIVES_Standard.csv from https://data.sfgov.org/browse?q=food+inspection and move it to /usr/local/spark folder
  5. Install Mongodb with the below steps
    
     sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
     echo "deb http://repo.mongodb.org/apt/ubuntu trusty/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list
     sudo apt-get update
     sudo apt-get install -y mongodb-org
     sudo service mongod start
    
    

    Run the spark job with the below command

    
    bin/spark-submit --class com.spark.SFFoodInspectionAnalysis --master local sf-food-inspection-spark-assembly-1.0.jar file:///usr/local/spark/Food_Inspections_-_LIVES_Standard.csv 
    
    
  6. Then check the Mongo Db and check the collections and make sure that the data are getting inserted and availableOpen up a terminal window and type ‘mongo’ and enter. It will open a shell window. Then use the below commands to verify the data
    
      show dbs
      use sfFood
      show collections
      db.allFoodInspection.find()
      db.filterFoodInspection.find()
    
    
  7. Clone the git repository https://github.com/dkbalachandar/sf-food-inspection-web.git and go inside of sf-food-inspection-web folder, then run below commands to build and run the application
    
      npm install
      node server.js
    
    
  8. Open the http://localhost:8081 and check the page. I have used the data and created a table and display a chart with the details.

Please are the some of the screenshots taken from the application

How to install Scala, SBT and Spark in Ubuntu

Install Scala 2.11.8 and SBT


sudo apt-get remove scala-library scala
sudo wget www.scala-lang.org/files/archive/scala-2.11.8.deb
sudo dpkg -i scala-2.11.8.deb
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt


Install Spark 2.0.2


sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
sudo chmod -R 755 spark-2.0.2-bin-hadoop2.7.tgz
sudo gunzip spark-2.0.2-bin-hadoop2.7.tgz
sudo tar -xvf spark-2.0.2-bin-hadoop2.7.tar
sudo mv spark-2.0.2-bin-hadoop2.7 spark
sudo mv spark /usr/local/spark

Open bashrc file and add spark_home and update PATH  
sudo vi ~/.bashrc
  export SPARK_HOME="/usr/local/spark"
  export PATH=$PATH:$SPARK_HOME/bin

source ~/.bashrc

Spark Scala Unit Testing

In this post, I am going to show an example for writing unit test cases for Spark Scala job and run it with Maven.

Assume that we have a set of XML files which has user information like first name, last name and etc. Assume that middle name and county name are optional fields but the XML file does contain empty nodes for these two fields. So now our job is to read those files and remove those empty nodes and output those updated content into a text file either in local env or hadoop env.

The sample XML content is given below,

 

<persons>
    <person>
        <firstName>Bala</firstName>
        <middleName/>
        <lastName>Samy</lastName>
        <countyName/>
    </person>
    <person>
        <firstName>Bala1</firstName>
        <middleName/>
        <lastName>Samy1</lastName>
        <countyName/>
    </person>
</persons>

 

The Spark scala code for reading XML files and removing the empty nodes are given below.


package com

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.Map

object EmptyTagReplacer {

  def main(args: Array[String]) {

    if (args.length < 2) {
      println("Usage <inputDir> <outputDir>")
    }
    val conf = new SparkConf().setAppName("EmptyTagReplacer")
    val sc = new SparkContext(conf)

    val inFile = args(0)
    val outFile = args(1)

    val input: Map[String, String] = sc.wholeTextFiles(inFile).collectAsMap()
    searchAndReplaceEmptyTags(sc, input, outFile)
    sc.stop()
  }

  def searchAndReplaceEmptyTags(sc: SparkContext, inputXml: Map[String, String], outFile: String):
  scala.collection.mutable.ListBuffer[String] = {

    var outputXml = new scala.collection.mutable.ListBuffer[String]()
    val htmlTags = List("<middleName/>", "<countyName/>")
    inputXml.foreach { case (fileName, content) =>
      var newContent = content
      for (tag  <- htmlTags) {
        val data = sc.parallelize(newContent)
        data.saveAsTextFile(outFile + "/" + fileName)
      }
      outputXml += newContent
    }
    outputXml
  }

  def countTags(sc: SparkContext, xmlRecords: List[String]): List[Int] = {

    var middleNameTagCounter = sc.accumulator(0)
    var countyTagCounter = sc.accumulator(0)
    val middleNameRegex = "<middleName/>".r
    val countyRegEx = "<countyName/>".r
    xmlRecords.foreach { content =>
      middleNameTagCounter += middleNameRegex.findAllIn(content).length
      countyTagCounter += countyRegEx.findAllIn(content).length
    }
    List(middleNameTagCounter.value, countyTagCounter.value)
  }
}

Now the test case for testing the above spark job is given below,



package com

import java.io.File

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite
import collection.mutable.Map

//import scala.io.Source._

class EmptyTagReplacerTest extends FunSuite with SharedSparkContext {


  test("Empty HTML tag replacer test") {

    //Read the content and create a content Map.
    //val content: String = scala.io.Source.fromFile("./src/test/resources/text-files/xml1").mkString
    val content: String =  FileUtils.readFileToString(new File("./src/test/resources/text-files/xml1"), "UTF-8")

    println("content"+content)
    val contentMap = collection.mutable.Map[String, String]()
    contentMap.+=("fileName" -> content)
    //Call searchAndReplaceMethod to remove empty Nodes
    val outputContent: scala.collection.mutable.ListBuffer[String] = EmptyTagReplacer.searchAndReplaceEmptyTags(sc, contentMap, "")
    val counts: List[Int] = EmptyTagReplacer.countTags(sc, outputContent.toList)
    println(counts)
    val expected = List(0, 0)
    assert(counts == expected)
  }
}


You have to include the scala-maven-plugin and scalatest-maven-plugin in pom.xml to make this work.

Please refer my github repo to know more https://github.com/dkbalachandar/scala-spark-test

How to use Spark-CSV for data analysis

In this post, I am going to show an example with spark-csv API. So the main objective is use spark-csv API to read a csv file and do the data analysis and write the output in a CSV file.

I have taken the dataset for this analysis from https://www.kaggle.com/kaggle/us-baby-names. This dataset is a big CSV file which contains the name and year and total count details of Baby Names in USA.

The sample data is given below.


Id	Name	Year	Gender	Count
1	Mary	1880	F	7065
2	Anna	1880	F	2604
3	Emma	1880	F	2003
3	Emma	1882	M	2003

My use case is to read this whole data set and filter out the records based on the Name provided during the execution and write the output to a file. This is very simple use case.

So with the above sample data, my output will be look like as below, Assume that during the execution, I am passing the Name as Emma


Id	Name	Year	Gender	Count
3	Emma	1880	F	2003
3	Emma	1882	M	2003

Include the below dependencies in your build.sbt file


libraryDependencies ++= Seq("com.databricks" % "spark-csv_2.10" % "1.4.0",
                             "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided")

Refer the below Scala Spark Code,


package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object USANameAnalysis {
  def main(args: Array[String]) {

    if (args.length < 3) {

      /*
      Run as below
      bin/spark-submit --class com.spark.USANameAnalysis --master local spark-scala-examples-assembly-1.0.jar
      file:///usr/local/spark/NationalNames.csv file:///usr/local/spark/output Zura

      */
      println("Usage inputFile outputFile nameToQuery")
      return
    }

    val conf = new SparkConf().setAppName("NAME-ANALYSIS")

    //Scala Spark Context.
    val sc = new SparkContext(conf)

    //Create the SQL context
    val sqlContext = new SQLContext(sc)

    //Load the CSV data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(args(0))

    df.printSchema()

    df.columns.foreach(println)

    //Then filter with name and output the data to an another CSV file
    val selectedData = df.filter(df("Name") === args(2))

    selectedData.collect().foreach(println)

    selectedData.write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save(args(1))
    /*
     Output file content is given below

     Id,Name,Year,Gender,Count
     32455,Zura,1893,F,6
     35552,Zura,1894,F,5
     108497,Zura,1913,F,5
     143367,Zura,1917,F,6

     */

     /*
      We can also map the DF to a table and query against it.

      df.registerTempTable("USA_NAME_DATA")
      val query = "SELECT * FROM USA_NAME_DATA where Name IN ('" + args(1) + "')"
      val specificResults = sqlContext.sql(query).collect()
      specificResults.foreach(println)

     */
    sc.stop()
  }
}


Refer my Github repo for the code https://github.com/dkbalachandar/spark-scala-examples

Spark Job for Removing Empty XML tags

The below spark program will read a set of XML files, parse it and then remove the empty tags and finally write the output as a sequence file.

package com.spark

import org.apache.spark.{SparkConf, SparkContext}

//Find empty tags and remove it and then write as sequence files
object TagHandler {
 def main(args: Array[String]) {
 if (args.length < 2) {
     println("Usage <Source File or Directory> <Destination File or Directory>")
     // bin/spark-submit --class com.spark.TagHandler --master local tagHandler-assembly-1.0.jar /data/xml /data/output
 }

 val inFile = args(0)
 val outFile = args(1)

 val htmlTags = List("<sub/>", "<sup/>", "<i/>", "<b/>")

 val conf = new SparkConf().setAppName("TagHandler")
 val sc = new SparkContext(conf)
 val wholeFiles = sc.wholeTextFiles(inFile)

 wholeFiles.collect().foreach { case (fileName, content) =>
 var newContent = content
 for (tag <- htmlTags) {
     newContent = newContent.replace(newContent, "")
 }
 val data = sc.parallelize(List((fileName, newContent)))
 data.saveAsSequenceFile(outFile)
 }
 sc.stop()
 }
}

 

Spark XML – How to replace hyphen symbols found in XML elements

Using pattern matching:


import org.apache.spark.{SparkConf, SparkContext}

object TransFormXmlApp {
  def main(args: Array[String]) {
    if (args.length < 2) {
      println("Usage Source File Destination File")
    }
    val inFile = args(0)
    val outFile = args(1)
    val conf = new SparkConf().setAppName("TransFormXmlApp")
    val sc = new SparkContext(conf)
    val wholeFiles = sc.wholeTextFiles(inFile)
    val htmlReg = "<\\/*[a-z-A-Z]+".r     
    wholeFiles.collect().foreach { case (fileName, content) =>
      val newContent = htmlReg.replaceAllIn(content,  m => m.toString.replace("-","_"))
      println(newContent)
      val data = sc.parallelize(List(newContent))
      data.saveAsTextFile(outFile)
    }
    sc.stop()
  }
}

Using scala-xml API:


import org.apache.spark.{SparkConf, SparkContext}

import scala.xml._
import scala.xml.transform._

object TransFormXmlApp {
  def main(args: Array[String]) {
     if (args.length < 2) {         
         println("Usage Source File Destination File")     
      }  
      val inFile = args(0)     
      val outFile = args(1)     
      val conf = new SparkConf().setAppName("TransFormXmlApp")     
      val sc = new SparkContext(conf)     
      val wholeFiles = sc.wholeTextFiles(inFile)   
  
      //Replace hypen as underscore     
      val hypenAsUnderScoreRule = new RewriteRule {       
         override def transform(nodes: scala.xml.Node): Seq[Node] = nodes match {
         case e@Elem(prefix, label, attribs, scope, children@_*) => Elem(prefix, label.replace("-", "_"), attribs, scope, false, children: _*)
         case _ => nodes
      }
    }
    //Remove hyphen symbol
    val removeHyphenRule = new RewriteRule {
      override def transform(nodes: scala.xml.Node): Seq[Node] = nodes match {
        case e@Elem(prefix, label, attribs, scope, children@_*) => Elem(prefix, label.replace("-", ""), attribs, scope, false, children: _*)
        case _ => nodes
      }
    }
    wholeFiles.collect().foreach { case (fileName, content) =>
      val updatedXml = new RuleTransformer(hyphenAsUnderScoreRule).transform(XML.loadString(content))
      val data = sc.parallelize(updatedXml)
      data.saveAsTextFile(outFile)
    }
    sc.stop()
  }
}

Spark SQL + XML – How to escape column names with hyphen symbol

Assume that you have the below XML content

<?xml version="1.0" encoding="UTF-8"?>
<users>
    <user>
        <first-name>Bala</first-name>
        <middle-name></middle-name>
        <last-name>Samy</last-name>
        <age>60</age>
    </user>
    <user>
        <first-name>Adam</first-name>
        <middle-name></middle-name>
        <last-name>Williams</last-name>
        <age>60</age>
    </user>
</users>

You want to write a spark SQL program to parse this content and run queries against it. We can query all the data but if you want to run a query with where clause against the columns first-name, last-name and middle-name,the query wont work as those columns contains hypen in it.

Now you have two solutions.
Either you have to get rid of hypen symbol by replacing it(Please refer my another post to know how to do this Spark XML – How to replace hyphen symbols found in XML elements) with underscore/empty or use the back tick symbol. I have used the second solution. Please refer my below code


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkUserDataOperation {

  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage ")
    }
    val inFile = args(0)
    val conf = new SparkConf().setAppName("SparkUserDataOperation")
    val sc = new SparkContext(conf)
    //Create the SQL context
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read
      .format("com.databricks.spark.xml")
      .option("rootTag", "users")
      .option("rowTag", "user")
      .load(inFile)

    df.printSchema()
    df.columns.foreach(println)
    //Map it to a table
    df.registerTempTable("users")
    //Query it
    val allResults = sqlContext.sql("SELECT * FROM users").collect()
    print("Print all records::")
    allResults.foreach(println)
     //Note that the first-name is surrounded with backtick symbol.
    val specificResults = sqlContext.sql("SELECT * FROM users where `first-name` IN ('Bala')").collect()
    print("Print Query records::")
    specificResults.foreach(println)
    sc.stop()
  }
}