How to use Spark-CSV for data analysis

In this post, I am going to show an example with spark-csv API. So the main objective is use spark-csv API to read a csv file and do the data analysis and write the output in a CSV file.

I have taken the dataset for this analysis from https://www.kaggle.com/kaggle/us-baby-names. This dataset is a big CSV file which contains the name and year and total count details of Baby Names in USA.

The sample data is given below.


Id	Name	Year	Gender	Count
1	Mary	1880	F	7065
2	Anna	1880	F	2604
3	Emma	1880	F	2003
3	Emma	1882	M	2003

My use case is to read this whole data set and filter out the records based on the Name provided during the execution and write the output to a file. This is very simple use case.

So with the above sample data, my output will be look like as below, Assume that during the execution, I am passing the Name as Emma


Id	Name	Year	Gender	Count
3	Emma	1880	F	2003
3	Emma	1882	M	2003

Include the below dependencies in your build.sbt file


libraryDependencies ++= Seq("com.databricks" % "spark-csv_2.10" % "1.4.0",
                             "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided")

Refer the below Scala Spark Code,


package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object USANameAnalysis {
  def main(args: Array[String]) {

    if (args.length < 3) {

      /*
      Run as below
      bin/spark-submit --class com.spark.USANameAnalysis --master local spark-scala-examples-assembly-1.0.jar
      file:///usr/local/spark/NationalNames.csv file:///usr/local/spark/output Zura

      */
      println("Usage inputFile outputFile nameToQuery")
      return
    }

    val conf = new SparkConf().setAppName("NAME-ANALYSIS")

    //Scala Spark Context.
    val sc = new SparkContext(conf)

    //Create the SQL context
    val sqlContext = new SQLContext(sc)

    //Load the CSV data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(args(0))

    df.printSchema()

    df.columns.foreach(println)

    //Then filter with name and output the data to an another CSV file
    val selectedData = df.filter(df("Name") === args(2))

    selectedData.collect().foreach(println)

    selectedData.write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save(args(1))
    /*
     Output file content is given below

     Id,Name,Year,Gender,Count
     32455,Zura,1893,F,6
     35552,Zura,1894,F,5
     108497,Zura,1913,F,5
     143367,Zura,1917,F,6

     */

     /*
      We can also map the DF to a table and query against it.

      df.registerTempTable("USA_NAME_DATA")
      val query = "SELECT * FROM USA_NAME_DATA where Name IN ('" + args(1) + "')"
      val specificResults = sqlContext.sql(query).collect()
      specificResults.foreach(println)

     */
    sc.stop()
  }
}


Refer my Github repo for the code https://github.com/dkbalachandar/spark-scala-examples

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s