Riak – Code to do sorting on the custom object using Protobuf API

In Riak, we can store like a DB and query like Solr.

Assume that we have an application which collects the employee comments about a particular topic. An employee can add multiple comments. So all these comments are stored in a Riak bucket with the employee id and you have a requirement to fetch
all the comments made by an employee based on the timestamp.

The below program will do the same. The steps for this program is given below

1. Create a search index
2. Assign the bucket with the search index
3. Add the different comments
4. Then query with search index and do the sorting on the storedTime
5. Once you get the Riak keys, then retrieve the comments one by one.

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.search.Search;
import com.basho.riak.client.api.commands.search.StoreIndex;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.operations.SearchOperation;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.search.YokozunaIndex;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.cas.osd.mp.utils.MnConstants;

import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;


public class RiakMain {

    public static void main(String[] args) {
        RiakClient riakClient = null;
        Namespace namespace = new Namespace("riakbucket");
        try {
            //Get the riak client
            riakClient = getRiakClient();
            //Create the index
            StoreIndex index = new StoreIndex.Builder(new YokozunaIndex("bucket_index")).build();
            riakClient.execute(index);
            //Assign the index to the bucket named 'riakbucket'
            Thread.sleep(5000);
            StoreBucketProperties sbp = new StoreBucketProperties.Builder(namespace).
                    withAllowMulti(true).withSearchIndex("bucket_index").build();
            riakClient.execute(sbp);
            //Create the comments
            createTheComments(riakClient, namespace);
            //Retrieve it based on the timestamp. note that i am using the Riak Default key for storing the comments.
            //We may have different approach for handling this.
            //So first Fetch the keys first, then retrieve the data
            Search searchOp = new Search.Builder("bucket_index", "employeeId_s:emp1").sort("storedTime_l desc").
                    withStart(0).
                    withRows(1000).build();
            List keys = new ArrayList();
            SearchOperation.Response response = riakClient.execute(searchOp);
            List<Map<String, List>> results = response.getAllResults();
            for (Map<String, List> map : results) {
                keys.addAll(map.get(MnConstants.YZ_RK));
            }
            for (String key : keys) {
                getData(riakClient, namespace, key);
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (riakClient != null) {
                riakClient.shutdown();
            }

        }
    }

    private static RiakClient getRiakClient() throws UnknownHostException {

        //Change the host accordingly
        RiakNode node = new RiakNode.Builder().withRemoteAddress("localhost")
                .withRemotePort(8087).build();
        RiakCluster riakCluster = new RiakCluster.Builder(node).build();
        riakCluster.start();
        return new RiakClient(riakCluster);
    }

    private static void createTheComments(RiakClient riakClient, Namespace ns) throws ExecutionException, InterruptedException {
        CommentsRiakObject commentsRiakObject1 = new CommentsRiakObject();
        commentsRiakObject1.setEmployeeId_s("emp1");
        commentsRiakObject1.setComments_s("Comment 1");
        commentsRiakObject1.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue1 = new StoreValue.Builder(commentsRiakObject1).withNamespace(ns).build();
        riakClient.execute(storeValue1);

        CommentsRiakObject commentsRiakObject2 = new CommentsRiakObject();
        commentsRiakObject2.setEmployeeId_s("emp2");
        commentsRiakObject2.setComments_s("Comment 2");
        commentsRiakObject2.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue2 = new StoreValue.Builder(commentsRiakObject2).withNamespace(ns).build();
        riakClient.execute(storeValue2);

        CommentsRiakObject commentsRiakObject3 = new CommentsRiakObject();
        commentsRiakObject3.setEmployeeId_s("emp1");
        commentsRiakObject3.setComments_s("Comment 3");
        commentsRiakObject3.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue3 = new StoreValue.Builder(commentsRiakObject3).withNamespace(ns).build();
        riakClient.execute(storeValue3);
    }

    private static void getData(RiakClient riakClient, Namespace namespace, String key) throws ExecutionException, InterruptedException {
        Location loc = new Location(namespace, key);
        FetchValue fetchOp = new FetchValue.Builder(loc).build();
        CommentsRiakObject commentsRiakObject = riakClient.execute(fetchOp).getValue(CommentsRiakObject.class);
        System.out.println(commentsRiakObject);
    }

    public static class CommentsRiakObject implements Serializable {

        private String employeeId_s;

        private String comments_s;

        private long storedTime_l;

        public String getEmployeeId_s() {
            return employeeId_s;
        }

        public void setEmployeeId_s(String employeeId_s) {
            this.employeeId_s = employeeId_s;
        }

        public String getComments_s() {
            return comments_s;
        }

        public void setComments_s(String comments_s) {
            this.comments_s = comments_s;
        }

        public long getStoredTime_l() {
            return storedTime_l;
        }

        public void setStoredTime_l(long storedTime_l) {
            this.storedTime_l = storedTime_l;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;

            if (o == null || getClass() != o.getClass()) return false;

            CommentsRiakObject that = (CommentsRiakObject) o;

            return new EqualsBuilder()
                    .append(storedTime_l, that.storedTime_l)
                    .append(employeeId_s, that.employeeId_s)
                    .append(comments_s, that.comments_s)
                    .isEquals();
        }

        @Override
        public int hashCode() {
            return new HashCodeBuilder(17, 37)
                    .append(employeeId_s)
                    .append(comments_s)
                    .append(storedTime_l)
                    .toHashCode();
        }

        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("employeeId_s", employeeId_s)
                    .append("comments_s", comments_s)
                    .append("storedTime_l", storedTime_l)
                    .toString();
        }
    }
}

Advertisements

Interface Description Language

It’s a format used to describe the interface of an application(App1) in a language independent way so that any application(App2) can use this format and communicate with this application(App1).

For example, An application(App1) is written in Java language assume this as web services and an another application(App2) wants to consume the App1 services. So we have to define and describe the App1 services in IDL language so that this can be used by other application(App2) easily.

Few of IDL languages are given below,

Web Services Description Language
Web Application Description Language
RESTful Service Description Language
Protocol Buffer
Apache Thrift
Avro IDL
JSON-WSP
 

 

Why we use Protocol Buffer instead of Json

The definition of Protocol buffer is given below,

“Protocol Buffers are a way of encoding structured data in an efficient yet extensible format”

  1. It allows as us to define the schema in .proto file. So we have to generate the mapping classes using that file. Its easy to know the request the response fields by going through it.
  2. Backward compatibility is very easy as we are numbering each fields in the proto file
  3. Its best suitable for the backend application. Ours is a ReSt application which interacts with an another backend application. Obviously the backend application supports only protobuf format so we dont have any choice other than using Protobuf.
  4. We can specify the required and optional flag for each fields. This allows us to extend the functionality in the future without making any changes to the client application