Riak HTTP API Commands/Services

We can access RIAK database through Protocol Buffer API and HTTP API. In this post, I am going to show how we can use HTTP API to check the data, bucket properties, create index and and query the data.

Replace RIAK_SERVER_NAME with the appropriate Riak server name

To view the statistics data:
http://RIAK_SERVER_NAME:8098/stats

List out all the buckets:
http://RIAK_SERVER_NAME:8098/types/default/buckets?buckets=true

Bucket Keys:
http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/keys?keys=true

Replace BUCKET_NAME with the actual bucket name

Check the bucket properties:
http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/props

Replace BUCKET_NAME with the actual bucket name

Create an index:

Each bucket should be assigned with a search index for querying purpose. Run the below command to create the index.

curl –XPUT http://RIAK_SERVER_NAME:8098/search/index/INDEX_NAME

Install CURL first then run the above command or otherwise install the Advanced Rest Client Chrome Plugin. then access http://RIAK_SERVER_NAME:8098/search/index/INDEX_NAME with PUT method.

To check whether the index created or not is by accessing the below service and check the response.

http://RIAK_SERVER_NAME:8098/search/index/INDEX_NAME

Assign the index to a bucket:

curl -XPUT http://RIAK_SERVER_NAME:8098/buckets/BUCKET_NAME/props -H  ‘Content-Type:application/json’  -d  ‘{“props”:{“search_index”:”INDEX_NAME”}}’

Delete an object with key:
curl -X “DELETE” http://RIAK_SERVER_NAME:8098/buckets/BUCKET_NAME/keys/KEY

Replace BUCKET_NAME and INDEX_NAME appropriately

To check whether the index is assigned to the bucket or not is by accessing the below service and check the response.

http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/props

Check the response and make sure that the search_index property value is your INDEX_NAME

Retrieve the data with key:

http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/keys/KEY_VALUE

Replace BUCKET_NAME and KEY_VALUE appropriately.

Query with search index:

Assume that you have stored the below model in RIAK database with the key as customer id or any other unique id.


import java.io.Serializable;

public class RiakCustomerData implements Serializable {

    private static final long serialVersionUID = 7818749365067437253L;
    
    private String customerId_s;

    private long timestamp_l;

    private String firstName_s;

    private String lastName_s;

    private String orderData_s;

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        RiakCustomerData that = (RiakCustomerData) o;

        if (timestamp_l != that.timestamp_l) return false;
        if (customerId_s != null ? !customerId_s.equals(that.customerId_s) : that.customerId_s != null) return false;
        if (firstName_s != null ? !firstName_s.equals(that.firstName_s) : that.firstName_s != null) return false;
        if (lastName_s != null ? !lastName_s.equals(that.lastName_s) : that.lastName_s != null) return false;
        return !(orderData_s != null ? !orderData_s.equals(that.orderData_s) : that.orderData_s != null);

    }

    @Override
    public int hashCode() {
        int result = customerId_s != null ? customerId_s.hashCode() : 0;
        result = 31 * result + (int) (timestamp_l ^ (timestamp_l >>> 32));
        result = 31 * result + (firstName_s != null ? firstName_s.hashCode() : 0);
        result = 31 * result + (lastName_s != null ? lastName_s.hashCode() : 0);
        result = 31 * result + (orderData_s != null ? orderData_s.hashCode() : 0);
        return result;
    }
}

Now you have a scenario and you want to list out all the customers who has the name as John. So to do this, access the below service

http://RIAK_SERVER_NAME:8098/search/query/INDEX_NAME?q=firstName_s:John&sort:timeStamp_l%20desc

Spark Scala Unit Testing

In this post, I am going to show an example for writing unit test cases for Spark Scala job and run it with Maven.

Assume that we have a set of XML files which has user information like first name, last name and etc. Assume that middle name and county name are optional fields but the XML file does contain empty nodes for these two fields. So now our job is to read those files and remove those empty nodes and output those updated content into a text file either in local env or hadoop env.

The sample XML content is given below,

 

<persons>
    <person>
        <firstName>Bala</firstName>
        <middleName/>
        <lastName>Samy</lastName>
        <countyName/>
    </person>
    <person>
        <firstName>Bala1</firstName>
        <middleName/>
        <lastName>Samy1</lastName>
        <countyName/>
    </person>
</persons>

 

The Spark scala code for reading XML files and removing the empty nodes are given below.


package com

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.Map

object EmptyTagReplacer {

  def main(args: Array[String]) {

    if (args.length < 2) {
      println("Usage <inputDir> <outputDir>")
    }
    val conf = new SparkConf().setAppName("EmptyTagReplacer")
    val sc = new SparkContext(conf)

    val inFile = args(0)
    val outFile = args(1)

    val input: Map[String, String] = sc.wholeTextFiles(inFile).collectAsMap()
    searchAndReplaceEmptyTags(sc, input, outFile)
    sc.stop()
  }

  def searchAndReplaceEmptyTags(sc: SparkContext, inputXml: Map[String, String], outFile: String):
  scala.collection.mutable.ListBuffer[String] = {

    var outputXml = new scala.collection.mutable.ListBuffer[String]()
    val htmlTags = List("<middleName/>", "<countyName/>")
    inputXml.foreach { case (fileName, content) =>
      var newContent = content
      for (tag  <- htmlTags) {
        val data = sc.parallelize(newContent)
        data.saveAsTextFile(outFile + "/" + fileName)
      }
      outputXml += newContent
    }
    outputXml
  }

  def countTags(sc: SparkContext, xmlRecords: List[String]): List[Int] = {

    var middleNameTagCounter = sc.accumulator(0)
    var countyTagCounter = sc.accumulator(0)
    val middleNameRegex = "<middleName/>".r
    val countyRegEx = "<countyName/>".r
    xmlRecords.foreach { content =>
      middleNameTagCounter += middleNameRegex.findAllIn(content).length
      countyTagCounter += countyRegEx.findAllIn(content).length
    }
    List(middleNameTagCounter.value, countyTagCounter.value)
  }
}

Now the test case for testing the above spark job is given below,



package com

import java.io.File

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite
import collection.mutable.Map

//import scala.io.Source._

class EmptyTagReplacerTest extends FunSuite with SharedSparkContext {


  test("Empty HTML tag replacer test") {

    //Read the content and create a content Map.
    //val content: String = scala.io.Source.fromFile("./src/test/resources/text-files/xml1").mkString
    val content: String =  FileUtils.readFileToString(new File("./src/test/resources/text-files/xml1"), "UTF-8")

    println("content"+content)
    val contentMap = collection.mutable.Map[String, String]()
    contentMap.+=("fileName" -> content)
    //Call searchAndReplaceMethod to remove empty Nodes
    val outputContent: scala.collection.mutable.ListBuffer[String] = EmptyTagReplacer.searchAndReplaceEmptyTags(sc, contentMap, "")
    val counts: List[Int] = EmptyTagReplacer.countTags(sc, outputContent.toList)
    println(counts)
    val expected = List(0, 0)
    assert(counts == expected)
  }
}


You have to include the scala-maven-plugin and scalatest-maven-plugin in pom.xml to make this work.

Please refer my github repo to know more https://github.com/dkbalachandar/scala-spark-test

How to use Spark-CSV for data analysis

In this post, I am going to show an example with spark-csv API. So the main objective is use spark-csv API to read a csv file and do the data analysis and write the output in a CSV file.

I have taken the dataset for this analysis from https://www.kaggle.com/kaggle/us-baby-names. This dataset is a big CSV file which contains the name and year and total count details of Baby Names in USA.

The sample data is given below.


Id	Name	Year	Gender	Count
1	Mary	1880	F	7065
2	Anna	1880	F	2604
3	Emma	1880	F	2003
3	Emma	1882	M	2003

My use case is to read this whole data set and filter out the records based on the Name provided during the execution and write the output to a file. This is very simple use case.

So with the above sample data, my output will be look like as below, Assume that during the execution, I am passing the Name as Emma


Id	Name	Year	Gender	Count
3	Emma	1880	F	2003
3	Emma	1882	M	2003

Include the below dependencies in your build.sbt file


libraryDependencies ++= Seq("com.databricks" % "spark-csv_2.10" % "1.4.0",
                             "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided")

Refer the below Scala Spark Code,


package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object USANameAnalysis {
  def main(args: Array[String]) {

    if (args.length < 3) {

      /*
      Run as below
      bin/spark-submit --class com.spark.USANameAnalysis --master local spark-scala-examples-assembly-1.0.jar
      file:///usr/local/spark/NationalNames.csv file:///usr/local/spark/output Zura

      */
      println("Usage inputFile outputFile nameToQuery")
      return
    }

    val conf = new SparkConf().setAppName("NAME-ANALYSIS")

    //Scala Spark Context.
    val sc = new SparkContext(conf)

    //Create the SQL context
    val sqlContext = new SQLContext(sc)

    //Load the CSV data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(args(0))

    df.printSchema()

    df.columns.foreach(println)

    //Then filter with name and output the data to an another CSV file
    val selectedData = df.filter(df("Name") === args(2))

    selectedData.collect().foreach(println)

    selectedData.write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save(args(1))
    /*
     Output file content is given below

     Id,Name,Year,Gender,Count
     32455,Zura,1893,F,6
     35552,Zura,1894,F,5
     108497,Zura,1913,F,5
     143367,Zura,1917,F,6

     */

     /*
      We can also map the DF to a table and query against it.

      df.registerTempTable("USA_NAME_DATA")
      val query = "SELECT * FROM USA_NAME_DATA where Name IN ('" + args(1) + "')"
      val specificResults = sqlContext.sql(query).collect()
      specificResults.foreach(println)

     */
    sc.stop()
  }
}


Refer my Github repo for the code https://github.com/dkbalachandar/spark-scala-examples