Sunday, December 22, 2013

Reactive Cassandra ...

Or an adventure on reading data reactively from Cassandra.

Overview
Let's first try to define what reactive means from programming point of view.

Functional reactive programming is programming paradigm for reactive programming using the building blocks of functional programming. 

Functional programming is a programming paradigm, a style of building the structure and the elements of computer programs, that treats computation, as the evaluation of mathematical functions thats avoids state and mutable data. Functional programming emphasises functions that produce results that depend only on  their inputs and not on program state.

How can we do functional programming in Java? Java is Object Oriented Programming language where mutable state is present everywhere.

Any java developer around the world have used any of the interfaces: java.lang.Runnable, java.util.Comparator, java.util.concurrent.Callable or java.awt.event.ActionListener. All of this interfaces are having only single method declared. These interfaces are known as Single Abstract Methods or SAM. A popular way as these are used is by creating Anonymous inner classes. 


public class RunnableTest {
  public static void main(Sting[] args){
    new Thread(new Runnable(){
      @Override
      public void run(){
        System.out.println("A new thread is running ...");
      }
    }).start();
  }
}
 

Functional Programming in Java is hard since function is not included in the language specification. It will become simpler in Java 8 with introduction of “lambda’s”. But how can we do functional programming in Java?
Let’s see a simple example.



@FunctionalInterface
public interface Worker {
   public void doWork();
}
public class FunctionalWorker {
  public static void main(String[] args){
    // anonymous inner class way
    execute( new Worker(){
      @Override
      public void doWork() {
        System.out.println ("working ...");
      }
    });
    // lambda's way
    execute(() -> System.out.println("working in lambda's way ..."));
  }
  
  public static void execute(Worker worker){
    worker.doWork();
  }
}



Reactive programming is a programming paradigm oriented around data flows and the propagation of changes. For example, in imperative programming setting, a := b+c, would mean that a is being assigned the result of  b +c in the instant the expression is evaluated. Later values of b or c can be changed without effect on a. In reactive programming, the value of a will be automatically updated based on the new values.

So, we should have a good understanding of what Functional Reactive Programming is, so let's go and build a prototype...

Reading data reactively from Cassandra

Cassandra is one of the NoSql storage out there is quite popular. 
Let's imagine that we have to build an Avatar service. This service will store avatars meta information and the content directly in cassandra. 

The java driver that we are using is providing us support to query cassandra asynchronous, through the executeAsync() method. The invocation of this method will return a Future. As we all know java Futures are block-able and could not be composed.

Ok, so we have async support but we still need a way to be able to read it reactively...

Netflix built and later open sourced the RxJava library that is providing Functional Reactive Programming for Java (plus other JVM languages).


Functional reactive offers efficient execution and composition by providing a collection of operators capable of filtering, selecting, transforming, combining and composing Observable's.
The Observable data type can be thought of as a "push" equivalent to Iterable which is "pull". With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast with the Observable type, the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.
The Observable type adds two missing semantics to the Gang of Four's Observer pattern, which are available in the Iterable type:
  1. The ability for the producer to signal to the consumer that there is no more data available.
  2. The ability for the producer to signal to the consumer that an error has occurred.

With these two simple additions, we have unified the Iterable and Observable types. The only difference between them is the direction in which the data flows. This is very important because now any operation we perform on an Iterable, can also be performed on an Observable. 

Let's see how we can combine the RxJava and Cassandra async query execution to build an Observable.

package net.devsprint.reactive.cassandra;
 
import java.util.concurrent.ExecutorService;
 
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
 
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
 
/**
 * Wraps an async execution of Datastax Java driver into an observable.
 * 
 */
public class ObservableCassandra {
 
 public static Observable executeAsync(final Session execution,
   final String query, final ExecutorService executorService) {
  return Observable.create(new Observable.OnSubscribeFunc() {
 
   @Override
   public Subscription onSubscribe(final Observer observer) {
    try {
     Futures.addCallback(execution.executeAsync(query),
      new FutureCallback() {
 
      @Override
      public void onSuccess(ResultSet result) {
       observer.onNext(result);
       observer.onCompleted();
      }
 
      @Override
      public void onFailure(Throwable t) {
       observer.onError(t);
      }
      }, executorService);
    } catch (Throwable e) {
     // If any Throwable can be thrown from
     // executeAsync
     observer.onError(e);
    }
    return Subscriptions.empty();
   }
  });
 }
 
}

executeAsync() method is returning a Guava Listenable Future. Adding a callback on this future allows us to properly implement the Observer interface.

A simple service could be implemented as following:

package net.devsprint.reactive.cassandra;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import rx.Observable;
 
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
 
public class AvatarService {
 private static final String QUERY = "select * avatars";
 private static final ExecutorService executorService = Executors
   .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 
 private final Session session;
 
 public AvatarService(Session session) {
  this.session = session;
 }
 
 Observable getAvatars() {
  return ObservableCassandra
    .executeAsync(session, QUERY, executorService);
 }
 
}


Assuming that the query is heavy, we are providing a separate execution context where the callback will be executed.

With these two classes we have an Avatar service that could be started but it will not do any thing. It will start fetching the data from Cassandra only when there will be at least one subscriber.

A complete example could be found at Reactive Cassandra.


Meta: this post is part of the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!

No comments:

Post a Comment