Spark Cogroup

Spark Cogroup: When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

In this post, we are going to see how we can use Spark Cogroup with an example.

Cogroup can be used to join multiple pair RDD’s. Assume that we have three paid RDD’s such as employeeRdd contains the list of employee objects, addressRdd contains the list of address objects and departmentRdd contains the list of department objects. The key for these Rdd’s are empId. Now we want to join all these Rdd’s with a cogroup command.

case class declaration:


  case class Employee(name: String, empId: String)
  case class Address(streetAddress: String, city: String, zipCode:String, empId: String)
  case class Department(department: String, empId: String)


In the below code, we use the above case classes and a cogroup to join multiple Rdd’s. Here, the departmentRdd does not contain the department details for empId “4” so when we use a cogroup, it returns an empty list for department object and list of address object for empId “4”



 val employeeRdd = sc.parallelize(
        List(
          ("1", Employee(name = "John", empId = "1")),
          ("2", Employee(name = "Peter", empId = "2")),
          ("3", Employee(name = "Adam", empId = "3")),
          ("4", Employee(name = "Wade", empId = "4"))
        ))

      val addressRdd = sc.parallelize(List(
        ("1", Address("Walker St", "Columbus", "43202", "1")),
        ("2", Address("Runner St", "Columbus", "43202", "2")),
        ("3", Address("Long St", "Columbus", "43202", "3")),
        ("4", Address("River St", "Columbus", "43202", "4"))
      ))

      val departmentRdd = sc.parallelize(List(
        ("1", Department("Sales", "1")),
        ("2", Department("Engg", "2")),
        ("3", Department("Marketing", "3"))))

      val employeeDetails = employeeRdd.cogroup(addressRdd, departmentRdd)
        .mapValues { case (employees, addresses, departments) => (employees.head, addresses.toList, departments.toList) }

      employeeDetails.collect().foreach(println)


The output is given below,


(4,(Employee(Wade,4),List(Address(River St,Columbus,43202,4)),List()))
(1,(Employee(John,1),List(Address(Walker St,Columbus,43202,1)),List(Department(Sales,1))))
(2,(Employee(Peter,2),List(Address(Runner St,Columbus,43202,2)),List(Department(Engg,2))))
(3,(Employee(Adam,3),List(Address(Long St,Columbus,43202,3)),List(Department(Marketing,3))))

Advertisements