Web app and RESTful services using MEAN stack and back end with Spark + Scala

I have developed a MEAN stack application which shows the San Francisco Food inspections details.
Source: Food Inspection(Use Food Inspections – LIVES Standard)

I have used Spark, Scala, MongoDB, NodeJs, AngularJs to do this.

My spark job reads the input CSV data contains food inspection details and processes it and stores the data in MongoDB as collections. I have allFoodInspection and filterFoodInspection collections here. The first one has all the data and the second one has the business name, the unique risk category and number of risk’s committed.

My MEAN stack REST layer reads the data from Mongodb and processes and exposes the data and the Web Layer uses the data and display it and use the data for drawing a chart.

Let us see how we can execute this.

  1. Follow the steps given in this post to install scala, sbt and spark in your machine if you are using Ubuntu. Refer my another post to know how to install these. How to install Scala, SBT and Spark in Ubuntu
  2. Clone the git repository https://github.com/dkbalachandar/sf-food-inspection-spark.git and go inside of sf-inspection-spark folder and run ‘sbt assembly’ to create a far jar with all the dependencies.Here I have used spark 2.0.2 and scala 2.11.8 (Spark 2.0.2 version is compatible with scala 2.11.x version).
    If you don’t use the compatible version then you will end up with lots of errors.
  3. Copy the ../sf-food-inspection-spark/target/scala-2.11/sf-food-inspection-spark-assembly-1.0.jar to /usr/local/spark folder
  4. Download Food_Inspections_-_LIVES_Standard.csv from https://data.sfgov.org/browse?q=food+inspection and move it to /usr/local/spark folder
  5. Install Mongodb with the below steps
    
     sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
     echo "deb http://repo.mongodb.org/apt/ubuntu trusty/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list
     sudo apt-get update
     sudo apt-get install -y mongodb-org
     sudo service mongod start
    
    

    Run the spark job with the below command

    
    bin/spark-submit --class com.spark.SFFoodInspectionAnalysis --master local sf-food-inspection-spark-assembly-1.0.jar file:///usr/local/spark/Food_Inspections_-_LIVES_Standard.csv 
    
    
  6. Then check the Mongo Db and check the collections and make sure that the data are getting inserted and availableOpen up a terminal window and type ‘mongo’ and enter. It will open a shell window. Then use the below commands to verify the data
    
      show dbs
      use sfFood
      show collections
      db.allFoodInspection.find()
      db.filterFoodInspection.find()
    
    
  7. Clone the git repository https://github.com/dkbalachandar/sf-food-inspection-web.git and go inside of sf-food-inspection-web folder, then run below commands to build and run the application
    
      npm install
      node server.js
    
    
  8. Open the http://localhost:8081 and check the page. I have used the data and created a table and display a chart with the details.

Please are the some of the screenshots taken from the application

Advertisements

How to use Spark-CSV for data analysis

In this post, I am going to show an example with spark-csv API. So the main objective is use spark-csv API to read a csv file and do the data analysis and write the output in a CSV file.

I have taken the dataset for this analysis from https://www.kaggle.com/kaggle/us-baby-names. This dataset is a big CSV file which contains the name and year and total count details of Baby Names in USA.

The sample data is given below.


Id	Name	Year	Gender	Count
1	Mary	1880	F	7065
2	Anna	1880	F	2604
3	Emma	1880	F	2003
3	Emma	1882	M	2003

My use case is to read this whole data set and filter out the records based on the Name provided during the execution and write the output to a file. This is very simple use case.

So with the above sample data, my output will be look like as below, Assume that during the execution, I am passing the Name as Emma


Id	Name	Year	Gender	Count
3	Emma	1880	F	2003
3	Emma	1882	M	2003

Include the below dependencies in your build.sbt file


libraryDependencies ++= Seq("com.databricks" % "spark-csv_2.10" % "1.4.0",
                             "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided")

Refer the below Scala Spark Code,


package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object USANameAnalysis {
  def main(args: Array[String]) {

    if (args.length < 3) {

      /*
      Run as below
      bin/spark-submit --class com.spark.USANameAnalysis --master local spark-scala-examples-assembly-1.0.jar
      file:///usr/local/spark/NationalNames.csv file:///usr/local/spark/output Zura

      */
      println("Usage inputFile outputFile nameToQuery")
      return
    }

    val conf = new SparkConf().setAppName("NAME-ANALYSIS")

    //Scala Spark Context.
    val sc = new SparkContext(conf)

    //Create the SQL context
    val sqlContext = new SQLContext(sc)

    //Load the CSV data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(args(0))

    df.printSchema()

    df.columns.foreach(println)

    //Then filter with name and output the data to an another CSV file
    val selectedData = df.filter(df("Name") === args(2))

    selectedData.collect().foreach(println)

    selectedData.write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save(args(1))
    /*
     Output file content is given below

     Id,Name,Year,Gender,Count
     32455,Zura,1893,F,6
     35552,Zura,1894,F,5
     108497,Zura,1913,F,5
     143367,Zura,1917,F,6

     */

     /*
      We can also map the DF to a table and query against it.

      df.registerTempTable("USA_NAME_DATA")
      val query = "SELECT * FROM USA_NAME_DATA where Name IN ('" + args(1) + "')"
      val specificResults = sqlContext.sql(query).collect()
      specificResults.foreach(println)

     */
    sc.stop()
  }
}


Refer my Github repo for the code https://github.com/dkbalachandar/spark-scala-examples