Riak – Code to do sorting on the custom object using Protobuf API

In Riak, we can store like a DB and query like Solr.

Assume that we have an application which collects the employee comments about a particular topic. An employee can add multiple comments. So all these comments are stored in a Riak bucket with the employee id and you have a requirement to fetch
all the comments made by an employee based on the timestamp.

The below program will do the same. The steps for this program is given below

1. Create a search index
2. Assign the bucket with the search index
3. Add the different comments
4. Then query with search index and do the sorting on the storedTime
5. Once you get the Riak keys, then retrieve the comments one by one.

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.search.Search;
import com.basho.riak.client.api.commands.search.StoreIndex;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.operations.SearchOperation;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.search.YokozunaIndex;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.cas.osd.mp.utils.MnConstants;

import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;


public class RiakMain {

    public static void main(String[] args) {
        RiakClient riakClient = null;
        Namespace namespace = new Namespace("riakbucket");
        try {
            //Get the riak client
            riakClient = getRiakClient();
            //Create the index
            StoreIndex index = new StoreIndex.Builder(new YokozunaIndex("bucket_index")).build();
            riakClient.execute(index);
            //Assign the index to the bucket named 'riakbucket'
            Thread.sleep(5000);
            StoreBucketProperties sbp = new StoreBucketProperties.Builder(namespace).
                    withAllowMulti(true).withSearchIndex("bucket_index").build();
            riakClient.execute(sbp);
            //Create the comments
            createTheComments(riakClient, namespace);
            //Retrieve it based on the timestamp. note that i am using the Riak Default key for storing the comments.
            //We may have different approach for handling this.
            //So first Fetch the keys first, then retrieve the data
            Search searchOp = new Search.Builder("bucket_index", "employeeId_s:emp1").sort("storedTime_l desc").
                    withStart(0).
                    withRows(1000).build();
            List keys = new ArrayList();
            SearchOperation.Response response = riakClient.execute(searchOp);
            List<Map<String, List>> results = response.getAllResults();
            for (Map<String, List> map : results) {
                keys.addAll(map.get(MnConstants.YZ_RK));
            }
            for (String key : keys) {
                getData(riakClient, namespace, key);
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (riakClient != null) {
                riakClient.shutdown();
            }

        }
    }

    private static RiakClient getRiakClient() throws UnknownHostException {

        //Change the host accordingly
        RiakNode node = new RiakNode.Builder().withRemoteAddress("localhost")
                .withRemotePort(8087).build();
        RiakCluster riakCluster = new RiakCluster.Builder(node).build();
        riakCluster.start();
        return new RiakClient(riakCluster);
    }

    private static void createTheComments(RiakClient riakClient, Namespace ns) throws ExecutionException, InterruptedException {
        CommentsRiakObject commentsRiakObject1 = new CommentsRiakObject();
        commentsRiakObject1.setEmployeeId_s("emp1");
        commentsRiakObject1.setComments_s("Comment 1");
        commentsRiakObject1.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue1 = new StoreValue.Builder(commentsRiakObject1).withNamespace(ns).build();
        riakClient.execute(storeValue1);

        CommentsRiakObject commentsRiakObject2 = new CommentsRiakObject();
        commentsRiakObject2.setEmployeeId_s("emp2");
        commentsRiakObject2.setComments_s("Comment 2");
        commentsRiakObject2.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue2 = new StoreValue.Builder(commentsRiakObject2).withNamespace(ns).build();
        riakClient.execute(storeValue2);

        CommentsRiakObject commentsRiakObject3 = new CommentsRiakObject();
        commentsRiakObject3.setEmployeeId_s("emp1");
        commentsRiakObject3.setComments_s("Comment 3");
        commentsRiakObject3.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue3 = new StoreValue.Builder(commentsRiakObject3).withNamespace(ns).build();
        riakClient.execute(storeValue3);
    }

    private static void getData(RiakClient riakClient, Namespace namespace, String key) throws ExecutionException, InterruptedException {
        Location loc = new Location(namespace, key);
        FetchValue fetchOp = new FetchValue.Builder(loc).build();
        CommentsRiakObject commentsRiakObject = riakClient.execute(fetchOp).getValue(CommentsRiakObject.class);
        System.out.println(commentsRiakObject);
    }

    public static class CommentsRiakObject implements Serializable {

        private String employeeId_s;

        private String comments_s;

        private long storedTime_l;

        public String getEmployeeId_s() {
            return employeeId_s;
        }

        public void setEmployeeId_s(String employeeId_s) {
            this.employeeId_s = employeeId_s;
        }

        public String getComments_s() {
            return comments_s;
        }

        public void setComments_s(String comments_s) {
            this.comments_s = comments_s;
        }

        public long getStoredTime_l() {
            return storedTime_l;
        }

        public void setStoredTime_l(long storedTime_l) {
            this.storedTime_l = storedTime_l;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;

            if (o == null || getClass() != o.getClass()) return false;

            CommentsRiakObject that = (CommentsRiakObject) o;

            return new EqualsBuilder()
                    .append(storedTime_l, that.storedTime_l)
                    .append(employeeId_s, that.employeeId_s)
                    .append(comments_s, that.comments_s)
                    .isEquals();
        }

        @Override
        public int hashCode() {
            return new HashCodeBuilder(17, 37)
                    .append(employeeId_s)
                    .append(comments_s)
                    .append(storedTime_l)
                    .toHashCode();
        }

        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("employeeId_s", employeeId_s)
                    .append("comments_s", comments_s)
                    .append("storedTime_l", storedTime_l)
                    .toString();
        }
    }
}

Advertisements

Build Operate Check – Acceptance Pattern

Build operate Check is one of the acceptance pattern. So its good that to follow this pattern while creating the Junit test cases.

The first step is to build the input data and hold it in memory, then execute it on the actual code which means call/invoke the code to operate on that data. Finally check the results of that operation.

Please refer the below example.


package com.dao.test;

import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import com.dao.ContactDAO;
import com.model.Contact;

public class ContactDAOTest{
    @Test
    public void testCreateContactData() throws Exception {
        //Build the test data
        Contact contact = buildContactData("bala","dublin");
        //Call the ContactDAO to create the contact information. Assume that 
        //Create() returns an id after the successful creation of the contact in DB
        String id = new ContactDAO().create(contact);
        //check the id and make sure that its not null/blank
        assertNotNull(id);
    }

    private Contact buildContactData(String name, String city) {
        Contact contact = new Contact();
        contact.setName(name);
        contact.setCity(city);
        return contact;
    }
}



package com.model;
public static class Contact {

        private String id;
        private String name;
        private String city;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getCity() {
            return city;
        }

        public void setCity(String city) {
            this.city = city;
        }
    }