Riak HTTP API Commands/Services

We can access RIAK database through Protocol Buffer API and HTTP API. In this post, I am going to show how we can use HTTP API to check the data, bucket properties, create index and and query the data.

Replace RIAK_SERVER_NAME with the appropriate Riak server name

To view the statistics data:
http://RIAK_SERVER_NAME:8098/stats

List out all the buckets:
http://RIAK_SERVER_NAME:8098/types/default/buckets?buckets=true

Bucket Keys:
http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/keys?keys=true

Replace BUCKET_NAME with the actual bucket name

Check the bucket properties:
http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/props

Replace BUCKET_NAME with the actual bucket name

Create an index:

Each bucket should be assigned with a search index for querying purpose. Run the below command to create the index.

curl –XPUT http://RIAK_SERVER_NAME:8098/search/index/INDEX_NAME

Install CURL first then run the above command or otherwise install the Advanced Rest Client Chrome Plugin. then access http://RIAK_SERVER_NAME:8098/search/index/INDEX_NAME with PUT method.

To check whether the index created or not is by accessing the below service and check the response.

http://RIAK_SERVER_NAME:8098/search/index/INDEX_NAME

Assign the index to a bucket:

curl -XPUT http://RIAK_SERVER_NAME:8098/buckets/BUCKET_NAME/props -H  ‘Content-Type:application/json’  -d  ‘{“props”:{“search_index”:”INDEX_NAME”}}’

Delete an object with key:
curl -X “DELETE” http://RIAK_SERVER_NAME:8098/buckets/BUCKET_NAME/keys/KEY

Replace BUCKET_NAME and INDEX_NAME appropriately

To check whether the index is assigned to the bucket or not is by accessing the below service and check the response.

http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/props

Check the response and make sure that the search_index property value is your INDEX_NAME

Retrieve the data with key:

http://RIAK_SERVER_NAME:8098/types/default/buckets/BUCKET_NAME/keys/KEY_VALUE

Replace BUCKET_NAME and KEY_VALUE appropriately.

Query with search index:

Assume that you have stored the below model in RIAK database with the key as customer id or any other unique id.


import java.io.Serializable;

public class RiakCustomerData implements Serializable {

    private static final long serialVersionUID = 7818749365067437253L;
    
    private String customerId_s;

    private long timestamp_l;

    private String firstName_s;

    private String lastName_s;

    private String orderData_s;

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        RiakCustomerData that = (RiakCustomerData) o;

        if (timestamp_l != that.timestamp_l) return false;
        if (customerId_s != null ? !customerId_s.equals(that.customerId_s) : that.customerId_s != null) return false;
        if (firstName_s != null ? !firstName_s.equals(that.firstName_s) : that.firstName_s != null) return false;
        if (lastName_s != null ? !lastName_s.equals(that.lastName_s) : that.lastName_s != null) return false;
        return !(orderData_s != null ? !orderData_s.equals(that.orderData_s) : that.orderData_s != null);

    }

    @Override
    public int hashCode() {
        int result = customerId_s != null ? customerId_s.hashCode() : 0;
        result = 31 * result + (int) (timestamp_l ^ (timestamp_l >>> 32));
        result = 31 * result + (firstName_s != null ? firstName_s.hashCode() : 0);
        result = 31 * result + (lastName_s != null ? lastName_s.hashCode() : 0);
        result = 31 * result + (orderData_s != null ? orderData_s.hashCode() : 0);
        return result;
    }
}

Now you have a scenario and you want to list out all the customers who has the name as John. So to do this, access the below service

http://RIAK_SERVER_NAME:8098/search/query/INDEX_NAME?q=firstName_s:John&sort:timeStamp_l%20desc

Advertisements

Riak Http API Commands to create search index and assign to a bucket

Riak Http port number is 8098 and the below are the commands to create the search index and assign it it a bucket. I use Curl to run it

Bucket Name: bucket
Search index Name: bucketIndex

Create the search index: 
curl -XPUT http://localhost:8098/search/index/bucketIndex

Assign the search index to a bucket: 
curl -XPUT http://localhost:8098/buckets/bucket/props -H ‘Content-Type:application/json’ -d ‘{“props”:{“search_index”:”bucketIndex”}}’

Check the bucket properties: 
curl http://localhost:8098/types/default/buckets/bucket/props

Riak – Code to do sorting on the custom object using Protobuf API

In Riak, we can store like a DB and query like Solr.

Assume that we have an application which collects the employee comments about a particular topic. An employee can add multiple comments. So all these comments are stored in a Riak bucket with the employee id and you have a requirement to fetch
all the comments made by an employee based on the timestamp.

The below program will do the same. The steps for this program is given below

1. Create a search index
2. Assign the bucket with the search index
3. Add the different comments
4. Then query with search index and do the sorting on the storedTime
5. Once you get the Riak keys, then retrieve the comments one by one.

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.search.Search;
import com.basho.riak.client.api.commands.search.StoreIndex;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.operations.SearchOperation;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.search.YokozunaIndex;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.cas.osd.mp.utils.MnConstants;

import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;


public class RiakMain {

    public static void main(String[] args) {
        RiakClient riakClient = null;
        Namespace namespace = new Namespace("riakbucket");
        try {
            //Get the riak client
            riakClient = getRiakClient();
            //Create the index
            StoreIndex index = new StoreIndex.Builder(new YokozunaIndex("bucket_index")).build();
            riakClient.execute(index);
            //Assign the index to the bucket named 'riakbucket'
            Thread.sleep(5000);
            StoreBucketProperties sbp = new StoreBucketProperties.Builder(namespace).
                    withAllowMulti(true).withSearchIndex("bucket_index").build();
            riakClient.execute(sbp);
            //Create the comments
            createTheComments(riakClient, namespace);
            //Retrieve it based on the timestamp. note that i am using the Riak Default key for storing the comments.
            //We may have different approach for handling this.
            //So first Fetch the keys first, then retrieve the data
            Search searchOp = new Search.Builder("bucket_index", "employeeId_s:emp1").sort("storedTime_l desc").
                    withStart(0).
                    withRows(1000).build();
            List keys = new ArrayList();
            SearchOperation.Response response = riakClient.execute(searchOp);
            List<Map<String, List>> results = response.getAllResults();
            for (Map<String, List> map : results) {
                keys.addAll(map.get(MnConstants.YZ_RK));
            }
            for (String key : keys) {
                getData(riakClient, namespace, key);
            }

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (riakClient != null) {
                riakClient.shutdown();
            }

        }
    }

    private static RiakClient getRiakClient() throws UnknownHostException {

        //Change the host accordingly
        RiakNode node = new RiakNode.Builder().withRemoteAddress("localhost")
                .withRemotePort(8087).build();
        RiakCluster riakCluster = new RiakCluster.Builder(node).build();
        riakCluster.start();
        return new RiakClient(riakCluster);
    }

    private static void createTheComments(RiakClient riakClient, Namespace ns) throws ExecutionException, InterruptedException {
        CommentsRiakObject commentsRiakObject1 = new CommentsRiakObject();
        commentsRiakObject1.setEmployeeId_s("emp1");
        commentsRiakObject1.setComments_s("Comment 1");
        commentsRiakObject1.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue1 = new StoreValue.Builder(commentsRiakObject1).withNamespace(ns).build();
        riakClient.execute(storeValue1);

        CommentsRiakObject commentsRiakObject2 = new CommentsRiakObject();
        commentsRiakObject2.setEmployeeId_s("emp2");
        commentsRiakObject2.setComments_s("Comment 2");
        commentsRiakObject2.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue2 = new StoreValue.Builder(commentsRiakObject2).withNamespace(ns).build();
        riakClient.execute(storeValue2);

        CommentsRiakObject commentsRiakObject3 = new CommentsRiakObject();
        commentsRiakObject3.setEmployeeId_s("emp1");
        commentsRiakObject3.setComments_s("Comment 3");
        commentsRiakObject3.setStoredTime_l(System.currentTimeMillis());
        StoreValue storeValue3 = new StoreValue.Builder(commentsRiakObject3).withNamespace(ns).build();
        riakClient.execute(storeValue3);
    }

    private static void getData(RiakClient riakClient, Namespace namespace, String key) throws ExecutionException, InterruptedException {
        Location loc = new Location(namespace, key);
        FetchValue fetchOp = new FetchValue.Builder(loc).build();
        CommentsRiakObject commentsRiakObject = riakClient.execute(fetchOp).getValue(CommentsRiakObject.class);
        System.out.println(commentsRiakObject);
    }

    public static class CommentsRiakObject implements Serializable {

        private String employeeId_s;

        private String comments_s;

        private long storedTime_l;

        public String getEmployeeId_s() {
            return employeeId_s;
        }

        public void setEmployeeId_s(String employeeId_s) {
            this.employeeId_s = employeeId_s;
        }

        public String getComments_s() {
            return comments_s;
        }

        public void setComments_s(String comments_s) {
            this.comments_s = comments_s;
        }

        public long getStoredTime_l() {
            return storedTime_l;
        }

        public void setStoredTime_l(long storedTime_l) {
            this.storedTime_l = storedTime_l;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;

            if (o == null || getClass() != o.getClass()) return false;

            CommentsRiakObject that = (CommentsRiakObject) o;

            return new EqualsBuilder()
                    .append(storedTime_l, that.storedTime_l)
                    .append(employeeId_s, that.employeeId_s)
                    .append(comments_s, that.comments_s)
                    .isEquals();
        }

        @Override
        public int hashCode() {
            return new HashCodeBuilder(17, 37)
                    .append(employeeId_s)
                    .append(comments_s)
                    .append(storedTime_l)
                    .toHashCode();
        }

        @Override
        public String toString() {
            return new ToStringBuilder(this)
                    .append("employeeId_s", employeeId_s)
                    .append("comments_s", comments_s)
                    .append("storedTime_l", storedTime_l)
                    .toString();
        }
    }
}

Riak – Code to create the index and assign it to a bucket using Protobuf API


import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.search.StoreIndex;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.search.YokozunaIndex;

import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;


public class RiakMain {

    public static void main(String[] args) {
        RiakClient riakClient = null;
        try {
            riakClient = getRiakClient();
            //Create the index
            StoreIndex index = new StoreIndex.Builder(new YokozunaIndex("bucket_index")).build();
            riakClient.execute(index);
            //Assign the index to the bucket named 'riakbucket'
            Thread.sleep(5000);
            StoreBucketProperties sbp = new StoreBucketProperties.Builder(new Namespace("riakbucket")).
                    withAllowMulti(true).withSearchIndex("bucket_index").build();
            riakClient.execute(sbp);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (riakClient != null) {
                riakClient.shutdown();
            }

        }
    }

    private static RiakClient getRiakClient() throws UnknownHostException {

        //Change the host accordingly
        RiakNode node = new RiakNode.Builder().withRemoteAddress("localhost")
                .withRemotePort(8087).build();
        RiakCluster riakCluster = new RiakCluster.Builder(node).build();
        riakCluster.start();
        return new RiakClient(riakCluster);
    }
}

Riak pluggable backend

Riak supports pluggable backend which means we can choose the different storage back end according to our needs. We have the below storage backends

  • BitCask
  • LevelDB
  • Memory
  • Multi

BitCask
Bitcask is an Erlang application that provides an API for storing and retrieving key/value data using log-structured hash tables that provide very fast access.

By default Riak uses this storage back end. The major weakness in this storage back end is key storage behavior. It maintains all the keys in memory all the time which means that the riak node must have enough memory to contain your entire keyspace, plus additional space for other operational components and operating- system-resident filesystem buffer space

LevelDB

eLevelDB is an Erlang application that encapsulates LevelDB, an open-source, on-disk key/value store created by Google Fellows Jeffrey Dean and Sanjay Ghemawat.

LevelDB is a relatively new entrant into the growing list of key/value database libraries, but it has some very interesting qualities that we believe make it an ideal candidate for use in Riak. LevelDB’s storage architecture is more like BigTable’s memtable/sstable model than it is like Bitcask. This design and implementation provide the possibility of a storage engine without Bitcask’s RAM limitation

Reading is slow when the levels are more which is the major weakness when using this storage

Memory
All data is stored in memory tables and the data will never be persisted into the disk or any other storage. It uses Erlang Ets tables to manage data

Multi
Riak allows us to run multiple backends within a single Riak cluster. We can store each bucket into a different memory storage. Any combination of the three available backends—Bitcask, LevelDB, and Memory—can be used.

We can create a different bucket type and specify the backend and finally create a bucket and assign the appropriate bucket type. In this way we can leverage the multiple backend option

Riak Data Storage

If you are trying to store the custom objects in Riak, then please make sure to name the fields appropriately.

For example,
Assume the below custom object

SaveData is a Model class and it has the below fields,
userId
timestamp
value

If we save this information, It will work fine but we will have an issue while trying to retrieve the data with the index[Each bucket should have an index for effective searching and faster retrieval]

so we should hint the data type in the field name. For example, userId is a string field, so we can rename this into userId_s and the timeStamp is a long value, so we can rename this into timstamp_l and value is string field, so we can rename it to value_s

Oracle Coherence vs Riak

I have used Oracle Coherence Server and Riak database in the last 2 years and noted down the differences as below,

Oracle Coherence Riak
In Memory Data Grid High performance Key/Value data store. Its a No SQL database. Distributed data store which means the data gets distributed across multiple nodes
Its not a commercial version Apache license but also have a commercial version
In Coherence, we should have to create a cache, then we can store the data in key/value format In Riak, we should create a bucket similar like cache in coherence server, then we can store the data in key/value format
Supports API calls, Query and ReSt API calls. Supports Protobuf API calls and also provide ReSt API’s
The object to be stored is either a Java object or POF object[Portable object format] It can store json/binary data
We can use CohQL to interact with coherence cache[Its a light weight query language] We can use ReSt API to interact with Riak buckets