Spark Cogroup

Spark Cogroup: When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

In this post, we are going to see how we can use Spark Cogroup with an example.

Cogroup can be used to join multiple pair RDD’s. Assume that we have three paid RDD’s such as employeeRdd contains the list of employee objects, addressRdd contains the list of address objects and departmentRdd contains the list of department objects. The key for these Rdd’s are empId. Now we want to join all these Rdd’s with a cogroup command.

case class declaration:


  case class Employee(name: String, empId: String)
  case class Address(streetAddress: String, city: String, zipCode:String, empId: String)
  case class Department(department: String, empId: String)


In the below code, we use the above case classes and a cogroup to join multiple Rdd’s. Here, the departmentRdd does not contain the department details for empId “4” so when we use a cogroup, it returns an empty list for department object and list of address object for empId “4”



 val employeeRdd = sc.parallelize(
        List(
          ("1", Employee(name = "John", empId = "1")),
          ("2", Employee(name = "Peter", empId = "2")),
          ("3", Employee(name = "Adam", empId = "3")),
          ("4", Employee(name = "Wade", empId = "4"))
        ))

      val addressRdd = sc.parallelize(List(
        ("1", Address("Walker St", "Columbus", "43202", "1")),
        ("2", Address("Runner St", "Columbus", "43202", "2")),
        ("3", Address("Long St", "Columbus", "43202", "3")),
        ("4", Address("River St", "Columbus", "43202", "4"))
      ))

      val departmentRdd = sc.parallelize(List(
        ("1", Department("Sales", "1")),
        ("2", Department("Engg", "2")),
        ("3", Department("Marketing", "3"))))

      val employeeDetails = employeeRdd.cogroup(addressRdd, departmentRdd)
        .mapValues { case (employees, addresses, departments) => (employees.head, addresses.toList, departments.toList) }

      employeeDetails.collect().foreach(println)


The output is given below,


(4,(Employee(Wade,4),List(Address(River St,Columbus,43202,4)),List()))
(1,(Employee(John,1),List(Address(Walker St,Columbus,43202,1)),List(Department(Sales,1))))
(2,(Employee(Peter,2),List(Address(Runner St,Columbus,43202,2)),List(Department(Engg,2))))
(3,(Employee(Adam,3),List(Address(Long St,Columbus,43202,3)),List(Department(Marketing,3))))

Angular Pipe

Angular Pipe is used to transform a data/value into desired output in the HTML page.

It takes data as input and transforms the data into expected output.

For example, the syntax of the angular pipe is given below,
{{data | pipe}}

{{Hello world | lowercase }}

There are lots of inbuilt pipes and we can create custom pipes as well.

Assume that you have an array of name and want to join it with a comma symbol. Refer below to know how to do that.

First thing is to create join.ts file and copy the below code in it.

join.ts


import {Pipe, PipeTransform} from '@angular/core'
@Pipe ({
   name : 'join'
})
export class JoinPipe implements PipeTransform {
    public transform(input: string[]): string {
        return input ? input.join(",") : ""
    }
}

Then add the above pipe in app.module.ts as below.


...
import {Join} from "./pipe/join.pipe"
....

    declarations: [Join]

....
export class AppModule {
}

Assume that app.component.ts contains a string array named as “names”. I have not given the app.component.ts file here.

Update the app.componenet.html as below to make use of the join pipe to combine the names.


All names : {{ names | join }