Riak – Code to do sorting on the custom object using Protobuf API

In Riak, we can store like a DB and query like Solr.

Assume that we have an application which collects the employee comments about a particular topic. An employee can add multiple comments. So all these comments are stored in a Riak bucket with the employee id and you have a requirement to fetch
all the comments made by an employee based on the timestamp.

The below program will do the same. The steps for this program is given below

1. Create a search index
2. Assign the bucket with the search index
3. Add the different comments
4. Then query with search index and do the sorting on the storedTime
5. Once you get the Riak keys, then retrieve the comments one by one.

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.search.Search;
import com.basho.riak.client.api.commands.search.StoreIndex;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.operations.SearchOperation;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.search.YokozunaIndex;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.cas.osd.mp.utils.MnConstants;

import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;


public class RiakMain {

    public static void main(String[] args) {
        RiakClient riakClient = null;
        Namespace namespace = new Namespace("riakbucket");
        try {
            //Get the riak client
            riakClient = getRiakClient();
            //Create the index
            StoreIndex index = new StoreIndex.Builder(new YokozunaIndex("bucket_index")).build();
            riakClient.execute(index);
            //Assign the index to the bucket named 'riakbucket'
            Thread.sleep(5000);
            StoreBucketProperties sbp = new StoreBucketProperties.Builder(namespace).
                    withAllowMulti(true).withSearchIndex("bucket_index").build();
            riakClient.execute(sbp);
            //Create the comments
            createTheComments(riakClient, namespace);
            //Retrieve it based on the timestamp. note that i am using the Riak Default key for storing the comments.
            //We may have different approach for handling this.
            //So first Fetch the keys first, then retrieve the data
            Search searchOp = new Search.Builder("bucket_index", "employeeId_s:emp1").sort("storedTime_l desc").
                    withStart(0).
                    withRows(1000).build();
            List keys = new ArrayList();
            SearchOperation.Response response = riakClient.execute(searchOp);
            List<Map<String, List>> results = response.getAllResults();
            for (Map<String, List> map : results) {
                keys.addAll(map.get(MnConstants.YZ_RK));
            }
            for (String key : keys) {
                getData(riakClient, namespace, key);
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (riakClient != null) {
                riakClient.shutdown();
            }

        }
    }

    private static RiakClient getRiakClient() throws UnknownHostException {

        //Change the host accordingly
        RiakNode node = new RiakNode.Builder().withRemoteAddress("localhost")
                .withRemotePort(8087).build();
        RiakCluster riakCluster = new RiakCluster.Builder(node).build();
        riakCluster.start();
        return new RiakClient(riakCluster);
    }

    private static void createTheComments(RiakClient riakClient, Namespace ns) throws ExecutionException, InterruptedException {
        CommentsRiakObject commentsRiakObject1 = new CommentsRiakObject();
        commentsRiakObject1.setEmployeeId_s("emp1");
        commentsRiakObject1.setComments_s("Comment 1");
        commentsRiakObject1.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue1 = new StoreValue.Builder(commentsRiakObject1).withNamespace(ns).build();
        riakClient.execute(storeValue1);

        CommentsRiakObject commentsRiakObject2 = new CommentsRiakObject();
        commentsRiakObject2.setEmployeeId_s("emp2");
        commentsRiakObject2.setComments_s("Comment 2");
        commentsRiakObject2.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue2 = new StoreValue.Builder(commentsRiakObject2).withNamespace(ns).build();
        riakClient.execute(storeValue2);

        CommentsRiakObject commentsRiakObject3 = new CommentsRiakObject();
        commentsRiakObject3.setEmployeeId_s("emp1");
        commentsRiakObject3.setComments_s("Comment 3");
        commentsRiakObject3.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue3 = new StoreValue.Builder(commentsRiakObject3).withNamespace(ns).build();
        riakClient.execute(storeValue3);
    }

    private static void getData(RiakClient riakClient, Namespace namespace, String key) throws ExecutionException, InterruptedException {
        Location loc = new Location(namespace, key);
        FetchValue fetchOp = new FetchValue.Builder(loc).build();
        CommentsRiakObject commentsRiakObject = riakClient.execute(fetchOp).getValue(CommentsRiakObject.class);
        System.out.println(commentsRiakObject);
    }

    public static class CommentsRiakObject implements Serializable {

        private String employeeId_s;

        private String comments_s;

        private long storedTime_l;

        public String getEmployeeId_s() {
            return employeeId_s;
        }

        public void setEmployeeId_s(String employeeId_s) {
            this.employeeId_s = employeeId_s;
        }

        public String getComments_s() {
            return comments_s;
        }

        public void setComments_s(String comments_s) {
            this.comments_s = comments_s;
        }

        public long getStoredTime_l() {
            return storedTime_l;
        }

        public void setStoredTime_l(long storedTime_l) {
            this.storedTime_l = storedTime_l;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;

            if (o == null || getClass() != o.getClass()) return false;

            CommentsRiakObject that = (CommentsRiakObject) o;

            return new EqualsBuilder()
                    .append(storedTime_l, that.storedTime_l)
                    .append(employeeId_s, that.employeeId_s)
                    .append(comments_s, that.comments_s)
                    .isEquals();
        }

        @Override
        public int hashCode() {
            return new HashCodeBuilder(17, 37)
                    .append(employeeId_s)
                    .append(comments_s)
                    .append(storedTime_l)
                    .toHashCode();
        }

        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("employeeId_s", employeeId_s)
                    .append("comments_s", comments_s)
                    .append("storedTime_l", storedTime_l)
                    .toString();
        }
    }
}

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s