Spark SQL + XML – How to escape column names with hyphen symbol

Assume that you have the below XML content

<?xml version="1.0" encoding="UTF-8"?>
<users>
    <user>
        <first-name>Bala</first-name>
        <middle-name></middle-name>
        <last-name>Samy</last-name>
        <age>60</age>
    </user>
    <user>
        <first-name>Adam</first-name>
        <middle-name></middle-name>
        <last-name>Williams</last-name>
        <age>60</age>
    </user>
</users>

You want to write a spark SQL program to parse this content and run queries against it. We can query all the data but if you want to run a query with where clause against the columns first-name, last-name and middle-name,the query wont work as those columns contains hypen in it.

Now you have two solutions.
Either you have to get rid of hypen symbol by replacing it(Please refer my another post to know how to do this Spark XML – How to replace hyphen symbols found in XML elements) with underscore/empty or use the back tick symbol. I have used the second solution. Please refer my below code


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkUserDataOperation {

  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage ")
    }
    val inFile = args(0)
    val conf = new SparkConf().setAppName("SparkUserDataOperation")
    val sc = new SparkContext(conf)
    //Create the SQL context
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read
      .format("com.databricks.spark.xml")
      .option("rootTag", "users")
      .option("rowTag", "user")
      .load(inFile)

    df.printSchema()
    df.columns.foreach(println)
    //Map it to a table
    df.registerTempTable("users")
    //Query it
    val allResults = sqlContext.sql("SELECT * FROM users").collect()
    print("Print all records::")
    allResults.foreach(println)
     //Note that the first-name is surrounded with backtick symbol.
    val specificResults = sqlContext.sql("SELECT * FROM users where `first-name` IN ('Bala')").collect()
    print("Print Query records::")
    specificResults.foreach(println)
    sc.stop()
  }
}

Advertisements

Spark SQL + Scala

I have created a Spark Scala program to analysis the population content with Spark SQL API.

The data set is used for this analysis is found in https://github.com/dkbalachandar/spark-scala-examples/blob/master/src/main/resources/population.json

The spark program which reads the above data set and map it to a table and run a query against it is as given below.



package com.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkSQLPopulationDataAnalysis {
  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage inputFile")
    }
    val inputFile = args(0)
    val conf = new SparkConf().setAppName("SPARK-SQL")
    val sc = new SparkContext(conf)
    //Create the SQL context
    val sqlContext = new SQLContext(sc)
    //Read the data from the JSON file
    val population = sqlContext.read.json(inputFile)
    //Map it to a table
    population.registerTempTable("population")
    //Query it
    val allResults = sqlContext.sql("SELECT * FROM population").collect()
    print("Print all records::")
    allResults.foreach(println)
    val queryResults = sqlContext.sql("SELECT age, females, males, total, year FROM population where age IN (1,2,3)").collect()
    print("Query and Print records::")
    queryResults.foreach(println)
    sc.stop()
  }
}

Please refer https://raw.githubusercontent.com/dkbalachandar/spark-scala-examples for more examples.