Reactive Development Using Vert.x

Lately, it seems like we’re hearing about the latest and greatest frameworks for Java. Tools like Ninja, SparkJava, and Play; but each one is opinionated and make you feel like you need to redesign your entire application to make use of their wonderful features. That’s why I was so relieved when I discovered Vert.x. Vert.x isn’t a framework, it’s a toolkit and it’s un-opinionated and it’s liberating. Vert.x doesn’t want you to redesign your entire application to make use of it, it just wants to make your life easier. Can you write your entire application in Vert.x? Sure! Can you add Vert.x capabilities to your existing Spring/Guice/CDI applications? Yep! Can you use Vert.x inside of your existing JavaEE applications? Absolutely! And that’s what makes it amazing.

Background

Vert.x was born when Tim Fox decided that he liked a lot of what was being developed in the NodeJS ecosystem, but he didn’t like some of the trade-offs of working in V8: Single-threadedness, limited library support, and JavaScript itself. Tim set out to write a toolkit which was unopinionated about how and where it is used, and he decided that the best place to implement it was on the JVM. So, Tim and the community set out to create an event-driven, non-blocking, reactive toolkit which in many ways mirrored what could be done in NodeJS, but also took advantage of the power available inside of the JVM. Node.x was born and it later progressed to become Vert.x.

Overview

Vert.x is designed to implement an event bus which is how different parts of the application can communicate in a non-blocking/thread safe manner. Parts of it were modeled after the Actor methodology exhibited by Eralng and Akka. It is also designed to take full advantage of today’s multi-core processors and highly concurrent programming demands. As such, by default, all Vert.x VERTICLES are implemented as single-threaded by default. Unlike NodeJS though, Vert.x can run MANY verticles in MANY threads. Additionally, you can specify that some verticles are β€œworker” verticles and CAN be multi-threaded. And to really add some icing on the cake, Vert.x has low level support for multi-node clustering of the event bus via the use of Hazelcast. It has gone on to include many other amazing features which are too numerous to list here, but you can read more in the official Vert.x docs.

The first thing you need to know about Vert.x is, similar to NodeJS, never block the current thread. Everything in Vert.x is set up, by default, to use callbacks/futures/promises. Instead of doing synchronous operations, Vert.x provides async methods for doing most I/O and processor intensive operations which might block the current thread. Now, callbacks can be ugly and painful to work with, so Vert.x optionally provides an API based on RxJava which implements the same functionality using the Observer pattern. Finally, Vert.x makes it easy to use your existing classes and methods by providing the executeBlocking(Function f) method on many of it’s asynchronous APIs. This means you can choose how you prefer to work with Vert.x instead of the toolkit dictating to you how it must be used.

The second thing to know about Vert.x is that it composed of verticles, modules, and nodes. Verticles are the smallest unit of logic in Vert.x, and are usually represented by a single class. Verticles should be simple and single-purpose following the UNIX Philosophy. A group of verticles can be put together into a module, which is usually packaged as a single JAR file. A module represents a group of related functionality which when taken together could represent an entire application or just a portion of a larger distributed application. Lastly, nodes are single instances of the JVM which are running one or more modules/verticles. Because Vert.x has clustering built-in from the ground up, Vert.x applications can span nodes either on a single machine or across multiple machines in multiple geographic locations (though latency can hider performance).

Example Project

Now, I’ve been to a number of Meetups and conferences lately where the first thing they show you when talking about reactive programming is to build a chat room application. That’s all well and good, but it doesn’t really help you to completely understand the power of reactive development. Chat room apps are simple and simplistic. We can do better. In this tutorial, we’re going to take a legacy Spring application and convert it to take advantage of Vert.x. This has multiple purposes: It shows that the toolkit is easy to integrate with existing Java projects, it allows us to take advantage of existing tools which may be entrenched parts of our ecosystem, and it also lets us follow the DRY principle in that we don’t have to rewrite large swathes of code to get the benefits of Vert.x.

Our legacy Spring application is a contrived simple example of a REST API using Spring Boot, Spring Data JPA, and Spring REST. The source code can be found in the “master” branch HERE. There are other branches which we will use to demonstrate the progression as we go, so it should be simple for anyone with a little experience with git and Java 8 to follow along. Let’s start by examining the Spring Configuration class for the stock Spring application.


@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {
    public static void main(String[] args) {
        ApplicationContext ctx = SpringApplication.run(Application.class, args);

        System.out.println("Let's inspect the beans provided by Spring Boot:");

        String[] beanNames = ctx.getBeanDefinitionNames();
        Arrays.sort(beanNames);
        for (String beanName : beanNames) {
            System.out.println(beanName);
        }
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
        return builder.setType(EmbeddedDatabaseType.HSQL).build();
    }

    @Bean
    public EntityManagerFactory entityManagerFactory() {
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        vendorAdapter.setGenerateDdl(true);

        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setJpaVendorAdapter(vendorAdapter);
        factory.setPackagesToScan("com.zanclus.data.entities");
        factory.setDataSource(dataSource());
        factory.afterPropertiesSet();

        return factory.getObject();
    }

    @Bean
    public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
        final JpaTransactionManager txManager = new JpaTransactionManager();
        txManager.setEntityManagerFactory(emf);
        return txManager;
    }
}

As you can see at the top of the class, we have some pretty standard Spring Boot annotations. You’ll also see an @Slf4j annotation which is part of the lombok library, which is designed to help reduce boiler-plate code. We also have @Bean annotated methods for providing access to the JPA EntityManager, the TransactionManager, and DataSource. Each of these items provide injectable objects for the other classes to use. The remaining classes in the project are similarly simplistic. There is a Customer POJO which is the Entity type used in the service. There is a CustomerDAO which is created via Spring Data. Finally, there is a CustomerEndpoints class which is the JAX-RS annotated REST controller.

As explained earlier, this is all standard fare in a Spring Boot application. The problem with this application is that for the most part, it has limited scalability. You would either run this application inside of a Servlet container, or with an embedded server like Jetty or Undertow. Either way, each requests ties up a thread and is thus wasting resources when it waits for I/O operations.

Switching over to the Convert-To-Vert.x-Web branch, we can see that the Application class has changed a little. We now have some new @Bean annotated methods for injecting the Vertx instance itself, as well as an instance of ObjectMapper (part of the Jackson JSON library). We have also replaced the CustomerEnpoints class with a new CustomerVerticle. Pretty much everything else is the same.

The CustomerVerticle class is annotated with @Component, which means that Spring will instantiate that class on startup. It also has it’s start method annotated with @PostConstruct so that the Verticle is launched on startup. Looking at the actual content of the code, we see our first bits of Vert.x code: Router.

The Router class is part of the vertx-web library and allows us to use a fluent API to define HTTP URLs, methods, and header filters for our request handling. Adding the BodyHandler instance to the default route allows a POST/PUT body to be processed and converted to a JSON object which Vert.x can then process as part of the RoutingContext. The order of routes in Vert.x CAN be significant. If you define a route which has some sort of glob matching (* or regex), it can swallow requests for routes defined after it unless you implement chaining. Our example shows 3 routes initially.


    @PostConstruct
    public void start() throws Exception {
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.get("/v1/customer/:id")
                .produces("application/json")
                .blockingHandler(this::getCustomerById);
        router.put("/v1/customer")
                .consumes("application/json")
                .produces("application/json")
                .blockingHandler(this::addCustomer);
        router.get("/v1/customer")
                .produces("application/json")
                .blockingHandler(this::getAllCustomers);
        vertx.createHttpServer().requestHandler(router::accept).listen(8080);
    }

Notice that the HTTP method is defined, the “Accept” header is defined (via consumes), and the “Content-Type” header is defined (via produces). We also see that we are passing the handling of the request off via a call to the blockingHandler method. A blocking handler for a Vert.x route accepts a RoutingContext object as it’s only parameter. The RoutingContext holds the Vert.x Request object, Response object, and any parameters/POST body data (like “:id”). You’ll also see that I used method references rather than lambdas to insert the logic into the blockingHandler (I find it more readable). Each handler for the 3 request routes is defined in a separate method further down in the class. These methods basically just call the methods on the DAO, serialize or deserialize as needed, set some response headers, and end() the request by sending a response. Overall, pretty simple and straightforward.


    private void addCustomer(RoutingContext rc) {
        try {
            String body = rc.getBodyAsString();
            Customer customer = mapper.readValue(body, Customer.class);
            Customer saved = dao.save(customer);
            if (saved!=null) {
                rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));
            } else {
                rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");
            }
        } catch (IOException e) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", e);
        }
    }

    private void getCustomerById(RoutingContext rc) {
        log.info("Request for single customer");
        Long id = Long.parseLong(rc.request().getParam("id"));
        try {
            Customer customer = dao.findOne(id);
            if (customer==null) {
                rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");
            } else {
                rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));
            }
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);
        }
    }

    private void getAllCustomers(RoutingContext rc) {
        log.info("Request for all customers");
        List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());
        try {
            rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);
        }
    }

“But this is more code and messier than my Spring annotations and classes”, you might say. That CAN be true, but it really depends on how you implement the code. This is meant to be an introductory example, so I left the code very simple and easy to follow. I COULD use an annotation library for Vert.x to implement the endpoints in a manner similar to JAX-RS. In addition, we have gained a massive scalability improvement. Under the hood, Vert.x Web uses Netty for low-level asynchronous I/O operations, thus providing us the ability to handle MANY more concurrent requests (limited by the size of the database connection pool).

We’ve already made some improvement to the scalability and concurrency of this application by using the Vert.x Web library, but we can improve things a little more by implementing the Vert.x EventBus. By separating the database operations into Worker Verticles instead of using blockingHandler, we can handle request processing more efficiently. This is show in the Convert-To-Worker-Verticles branch. The application class has remained the same, but we have changed the CustomerEndpoints class and added a new class called CustomerWorker. In addition, we added a new library called Spring Vert.x Extension which provides Spring Dependency Injections support to Vert.x Verticles. Start off by looking at the new CustomerEndpoints class.


    @PostConstruct
    public void start() throws Exception {
        log.info("Successfully create CustomerVerticle");
        DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);
        vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {
            if (res.succeeded()) {
                Router router = Router.router(vertx);
                router.route().handler(BodyHandler.create());
                final DeliveryOptions opts = new DeliveryOptions()
                        .setSendTimeout(2000);
                router.get("/v1/customer/:id")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "getCustomer")
                                    .addHeader("id", rc.request().getParam("id"));
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        });
                router.put("/v1/customer")
                        .consumes("application/json")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "addCustomer");
                            vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));
                        });
                router.get("/v1/customer")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "getAllCustomers");
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        });
                vertx.createHttpServer().requestHandler(router::accept).listen(8080);
            } else {
                log.error("Failed to deploy worker verticles.", res.cause());
            }
        });
    }

The routes are the same, but the implementation code is not. Instead of using calls to blockingHandler, we have now implemented proper async handlers which send out events on the event bus. None of the database processing is happening in this Verticle anymore. We have moved the database processing to a Worker Verticle which has multiple instances to handle multiple requests in parallel in a thread-safe manner. We are also registering a callback for when those events are replied to so that we can send the appropriate response to the client making the request. Now, in the CustomerWorker Verticle we have implemented the database logic and error handling.

@Override
public void start() throws Exception {
    vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest);
}

public void handleDatabaseRequest(Message<Object> msg) {
    String method = msg.headers().get("method");

    DeliveryOptions opts = new DeliveryOptions();
    try {
        String retVal;
        switch (method) {
            case "getAllCustomers":
                retVal = mapper.writeValueAsString(dao.findAll());
                msg.reply(retVal, opts);
                break;
            case "getCustomer":
                Long id = Long.parseLong(msg.headers().get("id"));
                retVal = mapper.writeValueAsString(dao.findOne(id));
                msg.reply(retVal);
                break;
            case "addCustomer":
                retVal = mapper.writeValueAsString(
                                    dao.save(
                                            mapper.readValue(
                                                    ((JsonObject)msg.body()).encode(), Customer.class)));
                msg.reply(retVal);
                break;
            default:
                log.error("Invalid method '" + method + "'");
                opts.addHeader("error", "Invalid method '" + method + "'");
                msg.fail(1, "Invalid method");
        }
    } catch (IOException | NullPointerException e) {
        log.error("Problem parsing JSON data.", e);
        msg.fail(2, e.getLocalizedMessage());
    }
}

The CustomerWorker worker verticles register a consumer for messages on the event bus. The string which represents the address on the event bus is arbitrary, but it is recommended to use a reverse-tld style naming structure so that it is simple to ensure that the addresses are unique (“com.zanclus.customer”). Whenever a new message is sent to that address, it will be delivered to one, and only one, of the worker verticles. The worker verticle then calls handleDatabaseRequest to do the database work, JSON serialization, and error handling.

There you have it. You’ve seen that Vert.x can be integrated into your legacy applications to improve concurrency and efficiency without having to rewrite the entire application. We could have done something similar with an existing Google Guice or JavaEE CDI application. All of the business logic could remain relatively untouched while we tried in Vert.x to add reactive capabilities. The next steps are up to you. Some ideas for where to go next include Clustering, WebSockets, and VertxRx for ReactiveX sugar.

Spring’s @Primary annotation in action

Spring is a framework that never stops to amaze me. It’s because of the fact that it offers plenty of different solutions that allow us, developers, to complete our tasks without writing millions of lines of code. Instead we are able to do the same in a much more readable, standardized manner. In this post I will try to describe one of its features that most likely is well known to all of you but in my opinion its importance is undervalued. The feature that I’ll be talking about is the @Primary annotation.

The problem

On a couple of projects that I was working on we have came accross a common business problem – we had a point of entry to a more complex logic – some container, that would gather the results of several other processors into a single output (something like map-filter-reduce functions from the functional programming). To some extent it resembled the Composite pattern. Putting it all together our approach was as follows:

  1. We had a container that had an autowired list of processors implementing a common interface
  2. Our container implemented the same interface as the elements of the autowired list
  3. We wanted the client class that would use the container to have this whole processing work transparent – he is interesed only in the result
  4. The processors have some logic (predicate) basing on which a processor is applicable to the current set of input data
  5. The results of the processing were then combined into a list and then reduced to a single output
There are numerous ways of dealing with this issue – I’ll present one that uses Spring with the @Primary annotation.

The solution

Let’s start with defining how our use case will fit to the aforementioned preconditions. Our set of data is a Person class that looks as follows:

Person.java

package com.blogspot.toomuchcoding.person.domain;

public final class Person {
private final String name;
private final int age;
private final boolean stupid;

public Person(String name, int age, boolean stupid) {
this.name = name;
this.age = age;
this.stupid = stupid;
}

public String getName() {
return name;
}

public int getAge() {
return age;
}

public boolean isStupid() {
return stupid;
}
}

Nothing out of the ordinary. Now let us define the contract:

PersonProcessingService.java

package com.blogspot.toomuchcoding.person.service;

import com.blogspot.toomuchcoding.person.domain.Person;

public interface PersonProcessingService {
boolean isApplicableFor(Person person);
String process(Person person);
}

As stated in the preconditions each implementaiton of the PersonProcessingService has to define two points of the contract :

  1. whether it is applicable for the current Person 
  2. how it processess a Person.

Now let’s take a look at some of the Processors that we have – I’ll not post the code here cause it’s pointless – you can check out the code later on Github or on Bitbucket. We have the following @Component annotated implementations of PersonProcessingService:

  • AgePersonProcessingService
    • is applicable if Person’s age is greater or equal 18
    • returns a String containing “AGE” as processing takes place – that’s kind of silly but it’s just a demo right? πŸ™‚
  • IntelligencePersonProcessingService
    • is applicable if a Person is stupid
    • returns a String containing “STUPID” as processing takes place
  • NamePersonProcessingService
    • is applicable if a Person has a name
    • returns a String containing “NAME” as processing takes place
The logic is fairly simple. Now our container of PersonProcessingServices would want to iterate for a given Person over the processors, check if the current processor is applicable (filter) and if that is the case add the String that is a result of processing of a Person to the list of responses (map – a function converting a Person to a String) and finaly join those responses by a comma (reduce). Let’s check it out how it’s done:

PersonProcessingServiceContainer.java

package com.blogspot.toomuchcoding.person.service;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import com.blogspot.toomuchcoding.person.domain.Person;

@Component
@Primary
class PersonProcessingServiceContainer implements PersonProcessingService {

private static final Logger LOGGER = LoggerFactory.getLogger(PersonProcessingServiceContainer.class);

@Autowired
private List<PersonProcessingService> personProcessingServices = new ArrayList<PersonProcessingService>();

@Override
public boolean isApplicableFor(Person person) {
return person != null;
}

@Override
public String process(Person person) {
List<String> output = new ArrayList<String>();
for(PersonProcessingService personProcessingService : personProcessingServices){
if(personProcessingService.isApplicableFor(person)){
output.add(personProcessingService.process(person));
}
}
String result = StringUtils.join(output, ",");
LOGGER.info(result);
return result;
}

public List<PersonProcessingService> getPersonProcessingServices() {
return personProcessingServices;
}
}


As you can see we have a container that is annotated with @Primary which means that if an implementation of the PersonProcessingService will have to be injected then Spring will pick the PersonProcessingServiceContainer to be injected. The cool thing is that we have an autowired list of PersonProcessingServices which means that all other implementations of that interface will get autowired there (the container will not autowire itself to the list!).

Now let’s check out the Spock tests that prove that I’m not telling any lies. If you aren’t using Spock already in your project then you should move it straight away πŸ™‚

PersonProcessingServiceContainerIntegrationSpec.groovy

package com.blogspot.toomuchcoding.person.service
import com.blogspot.toomuchcoding.configuration.SpringConfiguration
import com.blogspot.toomuchcoding.person.domain.Person
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification
import spock.lang.Unroll

import static org.hamcrest.CoreMatchers.notNullValue

@ContextConfiguration(classes = [SpringConfiguration])
class PersonProcessingServiceContainerIntegrationSpec extends Specification {

@Autowired
PersonProcessingService personProcessingService

def "should autowire container even though there are many implementations of service"(){
expect:
personProcessingService instanceof PersonProcessingServiceContainer
}

def "the autowired container should not have itself in the list of autowired services"(){
expect:
personProcessingService instanceof PersonProcessingServiceContainer
and:
!(personProcessingService as PersonProcessingServiceContainer).personProcessingServices.findResult {
it instanceof PersonProcessingServiceContainer
}
}

def "should not be applicable for processing if a person doesn't exist"(){
given:
Person person = null
expect:
!personProcessingService.isApplicableFor(person)
}

def "should return an empty result for a person not applicable for anything"(){
given:
Person person = new Person("", 17, false)
when:
def result = personProcessingService.process(person)
then:
result notNullValue()
result.isEmpty()
}

@Unroll("For name [#name], age [#age] and being stupid [#stupid] the result should contain keywords #keywords")
def "should perform different processing depending on input"(){
given:
Person person = new Person(name, age, stupid)
when:
def result = personProcessingService.process(person)
then:
keywords.every {
result.contains(it)
}
where:
name | age | stupid || keywords
"jan" | 20 | true || ['NAME', 'AGE', 'STUPID']
"" | 20 | true || ['AGE', 'STUPID']
"" | 20 | false || ['AGE']
null | 17 | true || ['STUPID']
"jan" | 17 | true || ['NAME']
}
}

The tests are pretty straight forward:

  1. We prove that the autowired field is in fact our container – the PersonProcessingServiceContainer.
  2. Then we show that we can’t find an object in the collection of autowired implementations of the PersonProcessingService, that is of PersonProcessingServiceContainer type
  3. In the next two tests we prove that the logic behind our processors is working
  4. Last but not least is the Spock’s finest – the where clause that allows us create beautiful paramterized tests.

Per module feature

Imagine the situation in which you have an implementation of the interface that is defined in your core module.

@Component
class CoreModuleClass implements SomeInterface {
...
}

What if you decide in your other module that has the dependence to the core module that you don’t want to use this CoreModuleClass and want to have some custom logic wherever the SomeInterface is autowired? Well – use @Primary!

@Component
@Primary
class CountryModuleClass implements SomeInterface {
...
}

In that way you are sure that wherever the SomeInterface has to be autowired it will be your CountryModuleClass that will be injected in the field.

Conclusion

In this post you could see how to

  • use the @Primary annotation to create a composite like container of interface implementations
  • use the @Primary annotation to provide a per module implementation of the interface that will take precedence over other @Components in terms of autowiring
  • write cool Spock tests πŸ™‚

The code

You can find the code presented here on Too Much Coding’s Github repository or on Too Much Coding’s Bitbucket repository.