‘Library Fine’ problem in Hackerrank – solution in Scala

Problem Statement:

The Head Librarian at a library wants you to make a program that calculates the fine for returning the book after the return date. You are given the actual and the expected return dates. Calculate the fine as follows:

If the book is returned on or before the expected return date, no fine will be charged, in other words fine is 0.

If the book is returned in the same month as the expected return date, Fine = 15 Hackos × Number of late days

If the book is not returned in the same month but in the same year as the expected return date, Fine = 500 Hackos × Number of late months

If the book is not returned in the same year, the fine is fixed at 10000 Hackos.

Refer the below link to know more about this problem. https://www.hackerrank.com/challenges/library-fine

Solution in Scala


import java.util.{Calendar, Scanner}

object Solution {

    def main(args: Array[String]) {
       val scanner: Scanner = new Scanner(System.in)

    //Actual Returned Date
    val aDate: Int = scanner.nextInt
    val aMonth: Int = scanner.nextInt
    val aYear: Int = scanner.nextInt

    //Due Date
    val dDate: Int = scanner.nextInt
    val dMonth: Int = scanner.nextInt
    val dYear: Int = scanner.nextInt

    val isValidData: Boolean = ((aDate >= 1 && aDate = 1 && dDate = 1 && aMonth = 1 && dMonth = 1 && aYear = 1 && dYear <= 3000))

    var fineAmount: Int = 0
    if (isValidData) {
      val actualCalendar: Calendar = Calendar.getInstance()
      actualCalendar.set(aYear, aMonth, aDate)

      val dCalendar: Calendar = Calendar.getInstance()
      dCalendar.set(dYear, dMonth, dDate)

      if ((actualCalendar.getTime == dCalendar.getTime) || actualCalendar.getTime.before(dCalendar.getTime)) {
        fineAmount = 0
      }
      else if (actualCalendar.getTime.after(dCalendar.getTime) && aYear == dYear) {
        fineAmount = if ((aMonth == dMonth)) 15 * (aDate - dDate) else 500 * (aMonth - dMonth)
      }
      else {
        fineAmount = 10000
      }
    }
    println(fineAmount)
    }
}

Advertisements

How to install Scala, SBT and Spark in Ubuntu

Install Scala 2.11.8 and SBT


sudo apt-get remove scala-library scala
sudo wget www.scala-lang.org/files/archive/scala-2.11.8.deb
sudo dpkg -i scala-2.11.8.deb
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt


Install Spark 2.0.2


sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
sudo chmod -R 755 spark-2.0.2-bin-hadoop2.7.tgz
sudo gunzip spark-2.0.2-bin-hadoop2.7.tgz
sudo tar -xvf spark-2.0.2-bin-hadoop2.7.tar
sudo mv spark-2.0.2-bin-hadoop2.7 spark
sudo mv spark /usr/local/spark

Open bashrc file and add spark_home and update PATH  
sudo vi ~/.bashrc
  export SPARK_HOME="/usr/local/spark"
  export PATH=$PATH:$SPARK_HOME/bin

source ~/.bashrc

Spark Scala Unit Testing

In this post, I am going to show an example for writing unit test cases for Spark Scala job and run it with Maven.

Assume that we have a set of XML files which has user information like first name, last name and etc. Assume that middle name and county name are optional fields but the XML file does contain empty nodes for these two fields. So now our job is to read those files and remove those empty nodes and output those updated content into a text file either in local env or hadoop env.

The sample XML content is given below,

 

<persons>
    <person>
        <firstName>Bala</firstName>
        <middleName/>
        <lastName>Samy</lastName>
        <countyName/>
    </person>
    <person>
        <firstName>Bala1</firstName>
        <middleName/>
        <lastName>Samy1</lastName>
        <countyName/>
    </person>
</persons>

 

The Spark scala code for reading XML files and removing the empty nodes are given below.


package com

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.Map

object EmptyTagReplacer {

  def main(args: Array[String]) {

    if (args.length < 2) {
      println("Usage <inputDir> <outputDir>")
    }
    val conf = new SparkConf().setAppName("EmptyTagReplacer")
    val sc = new SparkContext(conf)

    val inFile = args(0)
    val outFile = args(1)

    val input: Map[String, String] = sc.wholeTextFiles(inFile).collectAsMap()
    searchAndReplaceEmptyTags(sc, input, outFile)
    sc.stop()
  }

  def searchAndReplaceEmptyTags(sc: SparkContext, inputXml: Map[String, String], outFile: String):
  scala.collection.mutable.ListBuffer[String] = {

    var outputXml = new scala.collection.mutable.ListBuffer[String]()
    val htmlTags = List("<middleName/>", "<countyName/>")
    inputXml.foreach { case (fileName, content) =>
      var newContent = content
      for (tag  <- htmlTags) {
        val data = sc.parallelize(newContent)
        data.saveAsTextFile(outFile + "/" + fileName)
      }
      outputXml += newContent
    }
    outputXml
  }

  def countTags(sc: SparkContext, xmlRecords: List[String]): List[Int] = {

    var middleNameTagCounter = sc.accumulator(0)
    var countyTagCounter = sc.accumulator(0)
    val middleNameRegex = "<middleName/>".r
    val countyRegEx = "<countyName/>".r
    xmlRecords.foreach { content =>
      middleNameTagCounter += middleNameRegex.findAllIn(content).length
      countyTagCounter += countyRegEx.findAllIn(content).length
    }
    List(middleNameTagCounter.value, countyTagCounter.value)
  }
}

Now the test case for testing the above spark job is given below,



package com

import java.io.File

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite
import collection.mutable.Map

//import scala.io.Source._

class EmptyTagReplacerTest extends FunSuite with SharedSparkContext {


  test("Empty HTML tag replacer test") {

    //Read the content and create a content Map.
    //val content: String = scala.io.Source.fromFile("./src/test/resources/text-files/xml1").mkString
    val content: String =  FileUtils.readFileToString(new File("./src/test/resources/text-files/xml1"), "UTF-8")

    println("content"+content)
    val contentMap = collection.mutable.Map[String, String]()
    contentMap.+=("fileName" -> content)
    //Call searchAndReplaceMethod to remove empty Nodes
    val outputContent: scala.collection.mutable.ListBuffer[String] = EmptyTagReplacer.searchAndReplaceEmptyTags(sc, contentMap, "")
    val counts: List[Int] = EmptyTagReplacer.countTags(sc, outputContent.toList)
    println(counts)
    val expected = List(0, 0)
    assert(counts == expected)
  }
}


You have to include the scala-maven-plugin and scalatest-maven-plugin in pom.xml to make this work.

Please refer my github repo to know more https://github.com/dkbalachandar/scala-spark-test

How to use Spark-CSV for data analysis

In this post, I am going to show an example with spark-csv API. So the main objective is use spark-csv API to read a csv file and do the data analysis and write the output in a CSV file.

I have taken the dataset for this analysis from https://www.kaggle.com/kaggle/us-baby-names. This dataset is a big CSV file which contains the name and year and total count details of Baby Names in USA.

The sample data is given below.


Id	Name	Year	Gender	Count
1	Mary	1880	F	7065
2	Anna	1880	F	2604
3	Emma	1880	F	2003
3	Emma	1882	M	2003

My use case is to read this whole data set and filter out the records based on the Name provided during the execution and write the output to a file. This is very simple use case.

So with the above sample data, my output will be look like as below, Assume that during the execution, I am passing the Name as Emma


Id	Name	Year	Gender	Count
3	Emma	1880	F	2003
3	Emma	1882	M	2003

Include the below dependencies in your build.sbt file


libraryDependencies ++= Seq("com.databricks" % "spark-csv_2.10" % "1.4.0",
                             "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided")

Refer the below Scala Spark Code,


package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object USANameAnalysis {
  def main(args: Array[String]) {

    if (args.length < 3) {

      /*
      Run as below
      bin/spark-submit --class com.spark.USANameAnalysis --master local spark-scala-examples-assembly-1.0.jar
      file:///usr/local/spark/NationalNames.csv file:///usr/local/spark/output Zura

      */
      println("Usage inputFile outputFile nameToQuery")
      return
    }

    val conf = new SparkConf().setAppName("NAME-ANALYSIS")

    //Scala Spark Context.
    val sc = new SparkContext(conf)

    //Create the SQL context
    val sqlContext = new SQLContext(sc)

    //Load the CSV data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(args(0))

    df.printSchema()

    df.columns.foreach(println)

    //Then filter with name and output the data to an another CSV file
    val selectedData = df.filter(df("Name") === args(2))

    selectedData.collect().foreach(println)

    selectedData.write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save(args(1))
    /*
     Output file content is given below

     Id,Name,Year,Gender,Count
     32455,Zura,1893,F,6
     35552,Zura,1894,F,5
     108497,Zura,1913,F,5
     143367,Zura,1917,F,6

     */

     /*
      We can also map the DF to a table and query against it.

      df.registerTempTable("USA_NAME_DATA")
      val query = "SELECT * FROM USA_NAME_DATA where Name IN ('" + args(1) + "')"
      val specificResults = sqlContext.sql(query).collect()
      specificResults.foreach(println)

     */
    sc.stop()
  }
}


Refer my Github repo for the code https://github.com/dkbalachandar/spark-scala-examples

Spark Job for Removing Empty XML tags

The below spark program will read a set of XML files, parse it and then remove the empty tags and finally write the output as a sequence file.

package com.spark

import org.apache.spark.{SparkConf, SparkContext}

//Find empty tags and remove it and then write as sequence files
object TagHandler {
 def main(args: Array[String]) {
 if (args.length < 2) {
     println("Usage <Source File or Directory> <Destination File or Directory>")
     // bin/spark-submit --class com.spark.TagHandler --master local tagHandler-assembly-1.0.jar /data/xml /data/output
 }

 val inFile = args(0)
 val outFile = args(1)

 val htmlTags = List("<sub/>", "<sup/>", "<i/>", "<b/>")

 val conf = new SparkConf().setAppName("TagHandler")
 val sc = new SparkContext(conf)
 val wholeFiles = sc.wholeTextFiles(inFile)

 wholeFiles.collect().foreach { case (fileName, content) =>
 var newContent = content
 for (tag <- htmlTags) {
     newContent = newContent.replace(newContent, "")
 }
 val data = sc.parallelize(List((fileName, newContent)))
 data.saveAsSequenceFile(outFile)
 }
 sc.stop()
 }
}

 

Spark XML – How to replace hyphen symbols found in XML elements

Using pattern matching:


import org.apache.spark.{SparkConf, SparkContext}

object TransFormXmlApp {
  def main(args: Array[String]) {
    if (args.length < 2) {
      println("Usage Source File Destination File")
    }
    val inFile = args(0)
    val outFile = args(1)
    val conf = new SparkConf().setAppName("TransFormXmlApp")
    val sc = new SparkContext(conf)
    val wholeFiles = sc.wholeTextFiles(inFile)
    val htmlReg = "<\\/*[a-z-A-Z]+".r     
    wholeFiles.collect().foreach { case (fileName, content) =>
      val newContent = htmlReg.replaceAllIn(content,  m => m.toString.replace("-","_"))
      println(newContent)
      val data = sc.parallelize(List(newContent))
      data.saveAsTextFile(outFile)
    }
    sc.stop()
  }
}

Using scala-xml API:


import org.apache.spark.{SparkConf, SparkContext}

import scala.xml._
import scala.xml.transform._

object TransFormXmlApp {
  def main(args: Array[String]) {
     if (args.length < 2) {         
         println("Usage Source File Destination File")     
      }  
      val inFile = args(0)     
      val outFile = args(1)     
      val conf = new SparkConf().setAppName("TransFormXmlApp")     
      val sc = new SparkContext(conf)     
      val wholeFiles = sc.wholeTextFiles(inFile)   
  
      //Replace hypen as underscore     
      val hypenAsUnderScoreRule = new RewriteRule {       
         override def transform(nodes: scala.xml.Node): Seq[Node] = nodes match {
         case e@Elem(prefix, label, attribs, scope, children@_*) => Elem(prefix, label.replace("-", "_"), attribs, scope, false, children: _*)
         case _ => nodes
      }
    }
    //Remove hyphen symbol
    val removeHyphenRule = new RewriteRule {
      override def transform(nodes: scala.xml.Node): Seq[Node] = nodes match {
        case e@Elem(prefix, label, attribs, scope, children@_*) => Elem(prefix, label.replace("-", ""), attribs, scope, false, children: _*)
        case _ => nodes
      }
    }
    wholeFiles.collect().foreach { case (fileName, content) =>
      val updatedXml = new RuleTransformer(hyphenAsUnderScoreRule).transform(XML.loadString(content))
      val data = sc.parallelize(updatedXml)
      data.saveAsTextFile(outFile)
    }
    sc.stop()
  }
}

Spark SQL + XML – How to escape column names with hyphen symbol

Assume that you have the below XML content

<?xml version="1.0" encoding="UTF-8"?>
<users>
    <user>
        <first-name>Bala</first-name>
        <middle-name></middle-name>
        <last-name>Samy</last-name>
        <age>60</age>
    </user>
    <user>
        <first-name>Adam</first-name>
        <middle-name></middle-name>
        <last-name>Williams</last-name>
        <age>60</age>
    </user>
</users>

You want to write a spark SQL program to parse this content and run queries against it. We can query all the data but if you want to run a query with where clause against the columns first-name, last-name and middle-name,the query wont work as those columns contains hypen in it.

Now you have two solutions.
Either you have to get rid of hypen symbol by replacing it(Please refer my another post to know how to do this Spark XML – How to replace hyphen symbols found in XML elements) with underscore/empty or use the back tick symbol. I have used the second solution. Please refer my below code


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkUserDataOperation {

  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage ")
    }
    val inFile = args(0)
    val conf = new SparkConf().setAppName("SparkUserDataOperation")
    val sc = new SparkContext(conf)
    //Create the SQL context
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read
      .format("com.databricks.spark.xml")
      .option("rootTag", "users")
      .option("rowTag", "user")
      .load(inFile)

    df.printSchema()
    df.columns.foreach(println)
    //Map it to a table
    df.registerTempTable("users")
    //Query it
    val allResults = sqlContext.sql("SELECT * FROM users").collect()
    print("Print all records::")
    allResults.foreach(println)
     //Note that the first-name is surrounded with backtick symbol.
    val specificResults = sqlContext.sql("SELECT * FROM users where `first-name` IN ('Bala')").collect()
    print("Print Query records::")
    specificResults.foreach(println)
    sc.stop()
  }
}