‘Library Fine’ problem in Hackerrank – solution in Scala

Problem Statement:

The Head Librarian at a library wants you to make a program that calculates the fine for returning the book after the return date. You are given the actual and the expected return dates. Calculate the fine as follows:

If the book is returned on or before the expected return date, no fine will be charged, in other words fine is 0.

If the book is returned in the same month as the expected return date, Fine = 15 Hackos × Number of late days

If the book is not returned in the same month but in the same year as the expected return date, Fine = 500 Hackos × Number of late months

If the book is not returned in the same year, the fine is fixed at 10000 Hackos.

Refer the below link to know more about this problem. https://www.hackerrank.com/challenges/library-fine

Solution in Scala


import java.util.{Calendar, Scanner}

object Solution {

    def main(args: Array[String]) {
       val scanner: Scanner = new Scanner(System.in)

    //Actual Returned Date
    val aDate: Int = scanner.nextInt
    val aMonth: Int = scanner.nextInt
    val aYear: Int = scanner.nextInt

    //Due Date
    val dDate: Int = scanner.nextInt
    val dMonth: Int = scanner.nextInt
    val dYear: Int = scanner.nextInt

    val isValidData: Boolean = ((aDate >= 1 && aDate = 1 && dDate = 1 && aMonth = 1 && dMonth = 1 && aYear = 1 && dYear <= 3000))

    var fineAmount: Int = 0
    if (isValidData) {
      val actualCalendar: Calendar = Calendar.getInstance()
      actualCalendar.set(aYear, aMonth, aDate)

      val dCalendar: Calendar = Calendar.getInstance()
      dCalendar.set(dYear, dMonth, dDate)

      if ((actualCalendar.getTime == dCalendar.getTime) || actualCalendar.getTime.before(dCalendar.getTime)) {
        fineAmount = 0
      }
      else if (actualCalendar.getTime.after(dCalendar.getTime) && aYear == dYear) {
        fineAmount = if ((aMonth == dMonth)) 15 * (aDate - dDate) else 500 * (aMonth - dMonth)
      }
      else {
        fineAmount = 10000
      }
    }
    println(fineAmount)
    }
}

Scala: Enum Creation Examples

Enum is used for creating a group of constants like the days of week, colors and etc.

In this post, I am going to show how to create enums in Scala.

There are two ways,
1. Using Scala Enumeration
2. Using Scala Traits

Using Scala Enumeration

Scala has Enumerations class which can be extended to create an Enum. Check the below example.


object ScalaEnumObject {

  def main(args: Array[String]) {
    println("Event:")
    Event.values foreach println
  }
  //Extends Enumeration class
  object Event extends Enumeration {
    type Event = Value
    val CREATE, READ, UPDATE, REMOVE = Value
  }
}

The output looks below,


Event:
CREATE
READ
UPDATE
REMOVE

Using Scala Traits
We can use Trait to create Enums. Trait is similar to Java interface. So we can create a Trait and then create the case object which extends that Trait for each Enum values.

Check the below example.


object ScalaEnumObject {

  def main(args: Array[String]) {
    println("Directions:")
    val directions = List(EAST,WEST,SOUTH,NORTH)
    directions.foreach(direction => println (direction.name))
  }

   //Using Trait
  sealed trait Direction {
    def name: String
  }

  case object EAST extends Direction {
    val name = "E"
  }

  case object WEST extends Direction {
    val name = "W"
  }

  case object SOUTH extends Direction {
    val name = "S"
  }

  case object NORTH extends Direction {
    val name = "N"
  }
}

The output looke below,


Directions:
E
W
S
N

Web app and RESTful services using MEAN stack and back end with Spark + Scala

I have developed a MEAN stack application which shows the San Francisco Food inspections details.
Source: Food Inspection(Use Food Inspections – LIVES Standard)

I have used Spark, Scala, MongoDB, NodeJs, AngularJs to do this.

My spark job reads the input CSV data contains food inspection details and processes it and stores the data in MongoDB as collections. I have allFoodInspection and filterFoodInspection collections here. The first one has all the data and the second one has the business name, the unique risk category and number of risk’s committed.

My MEAN stack REST layer reads the data from Mongodb and processes and exposes the data and the Web Layer uses the data and display it and use the data for drawing a chart.

Let us see how we can execute this.

  1. Follow the steps given in this post to install scala, sbt and spark in your machine if you are using Ubuntu. Refer my another post to know how to install these. How to install Scala, SBT and Spark in Ubuntu
  2. Clone the git repository https://github.com/dkbalachandar/sf-food-inspection-spark.git and go inside of sf-inspection-spark folder and run ‘sbt assembly’ to create a far jar with all the dependencies.Here I have used spark 2.0.2 and scala 2.11.8 (Spark 2.0.2 version is compatible with scala 2.11.x version).
    If you don’t use the compatible version then you will end up with lots of errors.
  3. Copy the ../sf-food-inspection-spark/target/scala-2.11/sf-food-inspection-spark-assembly-1.0.jar to /usr/local/spark folder
  4. Download Food_Inspections_-_LIVES_Standard.csv from https://data.sfgov.org/browse?q=food+inspection and move it to /usr/local/spark folder
  5. Install Mongodb with the below steps
    
     sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
     echo "deb http://repo.mongodb.org/apt/ubuntu trusty/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list
     sudo apt-get update
     sudo apt-get install -y mongodb-org
     sudo service mongod start
    
    

    Run the spark job with the below command

    
    bin/spark-submit --class com.spark.SFFoodInspectionAnalysis --master local sf-food-inspection-spark-assembly-1.0.jar file:///usr/local/spark/Food_Inspections_-_LIVES_Standard.csv 
    
    
  6. Then check the Mongo Db and check the collections and make sure that the data are getting inserted and availableOpen up a terminal window and type ‘mongo’ and enter. It will open a shell window. Then use the below commands to verify the data
    
      show dbs
      use sfFood
      show collections
      db.allFoodInspection.find()
      db.filterFoodInspection.find()
    
    
  7. Clone the git repository https://github.com/dkbalachandar/sf-food-inspection-web.git and go inside of sf-food-inspection-web folder, then run below commands to build and run the application
    
      npm install
      node server.js
    
    
  8. Open the http://localhost:8081 and check the page. I have used the data and created a table and display a chart with the details.

Please are the some of the screenshots taken from the application

How to install Scala, SBT and Spark in Ubuntu

Install Scala 2.11.8 and SBT


sudo apt-get remove scala-library scala
sudo wget www.scala-lang.org/files/archive/scala-2.11.8.deb
sudo dpkg -i scala-2.11.8.deb
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt


Install Spark 2.0.2


sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
sudo chmod -R 755 spark-2.0.2-bin-hadoop2.7.tgz
sudo gunzip spark-2.0.2-bin-hadoop2.7.tgz
sudo tar -xvf spark-2.0.2-bin-hadoop2.7.tar
sudo mv spark-2.0.2-bin-hadoop2.7 spark
sudo mv spark /usr/local/spark

Open bashrc file and add spark_home and update PATH  
sudo vi ~/.bashrc
  export SPARK_HOME="/usr/local/spark"
  export PATH=$PATH:$SPARK_HOME/bin

source ~/.bashrc

Spark Scala Unit Testing

In this post, I am going to show an example for writing unit test cases for Spark Scala job and run it with Maven.

Assume that we have a set of XML files which has user information like first name, last name and etc. Assume that middle name and county name are optional fields but the XML file does contain empty nodes for these two fields. So now our job is to read those files and remove those empty nodes and output those updated content into a text file either in local env or hadoop env.

The sample XML content is given below,

 

<persons>
    <person>
        <firstName>Bala</firstName>
        <middleName/>
        <lastName>Samy</lastName>
        <countyName/>
    </person>
    <person>
        <firstName>Bala1</firstName>
        <middleName/>
        <lastName>Samy1</lastName>
        <countyName/>
    </person>
</persons>

 

The Spark scala code for reading XML files and removing the empty nodes are given below.


package com

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.Map

object EmptyTagReplacer {

  def main(args: Array[String]) {

    if (args.length < 2) {
      println("Usage <inputDir> <outputDir>")
    }
    val conf = new SparkConf().setAppName("EmptyTagReplacer")
    val sc = new SparkContext(conf)

    val inFile = args(0)
    val outFile = args(1)

    val input: Map[String, String] = sc.wholeTextFiles(inFile).collectAsMap()
    searchAndReplaceEmptyTags(sc, input, outFile)
    sc.stop()
  }

  def searchAndReplaceEmptyTags(sc: SparkContext, inputXml: Map[String, String], outFile: String):
  scala.collection.mutable.ListBuffer[String] = {

    var outputXml = new scala.collection.mutable.ListBuffer[String]()
    val htmlTags = List("<middleName/>", "<countyName/>")
    inputXml.foreach { case (fileName, content) =>
      var newContent = content
      for (tag  <- htmlTags) {
        val data = sc.parallelize(newContent)
        data.saveAsTextFile(outFile + "/" + fileName)
      }
      outputXml += newContent
    }
    outputXml
  }

  def countTags(sc: SparkContext, xmlRecords: List[String]): List[Int] = {

    var middleNameTagCounter = sc.accumulator(0)
    var countyTagCounter = sc.accumulator(0)
    val middleNameRegex = "<middleName/>".r
    val countyRegEx = "<countyName/>".r
    xmlRecords.foreach { content =>
      middleNameTagCounter += middleNameRegex.findAllIn(content).length
      countyTagCounter += countyRegEx.findAllIn(content).length
    }
    List(middleNameTagCounter.value, countyTagCounter.value)
  }
}

Now the test case for testing the above spark job is given below,



package com

import java.io.File

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite
import collection.mutable.Map

//import scala.io.Source._

class EmptyTagReplacerTest extends FunSuite with SharedSparkContext {


  test("Empty HTML tag replacer test") {

    //Read the content and create a content Map.
    //val content: String = scala.io.Source.fromFile("./src/test/resources/text-files/xml1").mkString
    val content: String =  FileUtils.readFileToString(new File("./src/test/resources/text-files/xml1"), "UTF-8")

    println("content"+content)
    val contentMap = collection.mutable.Map[String, String]()
    contentMap.+=("fileName" -> content)
    //Call searchAndReplaceMethod to remove empty Nodes
    val outputContent: scala.collection.mutable.ListBuffer[String] = EmptyTagReplacer.searchAndReplaceEmptyTags(sc, contentMap, "")
    val counts: List[Int] = EmptyTagReplacer.countTags(sc, outputContent.toList)
    println(counts)
    val expected = List(0, 0)
    assert(counts == expected)
  }
}


You have to include the scala-maven-plugin and scalatest-maven-plugin in pom.xml to make this work.

Please refer my github repo to know more https://github.com/dkbalachandar/scala-spark-test

How to use Spark-CSV for data analysis

In this post, I am going to show an example with spark-csv API. So the main objective is use spark-csv API to read a csv file and do the data analysis and write the output in a CSV file.

I have taken the dataset for this analysis from https://www.kaggle.com/kaggle/us-baby-names. This dataset is a big CSV file which contains the name and year and total count details of Baby Names in USA.

The sample data is given below.


Id	Name	Year	Gender	Count
1	Mary	1880	F	7065
2	Anna	1880	F	2604
3	Emma	1880	F	2003
3	Emma	1882	M	2003

My use case is to read this whole data set and filter out the records based on the Name provided during the execution and write the output to a file. This is very simple use case.

So with the above sample data, my output will be look like as below, Assume that during the execution, I am passing the Name as Emma


Id	Name	Year	Gender	Count
3	Emma	1880	F	2003
3	Emma	1882	M	2003

Include the below dependencies in your build.sbt file


libraryDependencies ++= Seq("com.databricks" % "spark-csv_2.10" % "1.4.0",
                             "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided")

Refer the below Scala Spark Code,


package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object USANameAnalysis {
  def main(args: Array[String]) {

    if (args.length < 3) {

      /*
      Run as below
      bin/spark-submit --class com.spark.USANameAnalysis --master local spark-scala-examples-assembly-1.0.jar
      file:///usr/local/spark/NationalNames.csv file:///usr/local/spark/output Zura

      */
      println("Usage inputFile outputFile nameToQuery")
      return
    }

    val conf = new SparkConf().setAppName("NAME-ANALYSIS")

    //Scala Spark Context.
    val sc = new SparkContext(conf)

    //Create the SQL context
    val sqlContext = new SQLContext(sc)

    //Load the CSV data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(args(0))

    df.printSchema()

    df.columns.foreach(println)

    //Then filter with name and output the data to an another CSV file
    val selectedData = df.filter(df("Name") === args(2))

    selectedData.collect().foreach(println)

    selectedData.write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save(args(1))
    /*
     Output file content is given below

     Id,Name,Year,Gender,Count
     32455,Zura,1893,F,6
     35552,Zura,1894,F,5
     108497,Zura,1913,F,5
     143367,Zura,1917,F,6

     */

     /*
      We can also map the DF to a table and query against it.

      df.registerTempTable("USA_NAME_DATA")
      val query = "SELECT * FROM USA_NAME_DATA where Name IN ('" + args(1) + "')"
      val specificResults = sqlContext.sql(query).collect()
      specificResults.foreach(println)

     */
    sc.stop()
  }
}


Refer my Github repo for the code https://github.com/dkbalachandar/spark-scala-examples

Spark Job for Removing Empty XML tags

The below spark program will read a set of XML files, parse it and then remove the empty tags and finally write the output as a sequence file.

package com.spark

import org.apache.spark.{SparkConf, SparkContext}

//Find empty tags and remove it and then write as sequence files
object TagHandler {
 def main(args: Array[String]) {
 if (args.length < 2) {
     println("Usage <Source File or Directory> <Destination File or Directory>")
     // bin/spark-submit --class com.spark.TagHandler --master local tagHandler-assembly-1.0.jar /data/xml /data/output
 }

 val inFile = args(0)
 val outFile = args(1)

 val htmlTags = List("<sub/>", "<sup/>", "<i/>", "<b/>")

 val conf = new SparkConf().setAppName("TagHandler")
 val sc = new SparkContext(conf)
 val wholeFiles = sc.wholeTextFiles(inFile)

 wholeFiles.collect().foreach { case (fileName, content) =>
 var newContent = content
 for (tag <- htmlTags) {
     newContent = newContent.replace(newContent, "")
 }
 val data = sc.parallelize(List((fileName, newContent)))
 data.saveAsSequenceFile(outFile)
 }
 sc.stop()
 }
}