Reactive Development Using Vert.x

Lately, it seems like we’re hearing about the latest and greatest frameworks for Java. Tools like Ninja, SparkJava, and Play; but each one is opinionated and make you feel like you need to redesign your entire application to make use of their wonderful features. That’s why I was so relieved when I discovered Vert.x. Vert.x isn’t a framework, it’s a toolkit and it’s un-opinionated and it’s liberating. Vert.x doesn’t want you to redesign your entire application to make use of it, it just wants to make your life easier. Can you write your entire application in Vert.x? Sure! Can you add Vert.x capabilities to your existing Spring/Guice/CDI applications? Yep! Can you use Vert.x inside of your existing JavaEE applications? Absolutely! And that’s what makes it amazing.

Background

Vert.x was born when Tim Fox decided that he liked a lot of what was being developed in the NodeJS ecosystem, but he didn’t like some of the trade-offs of working in V8: Single-threadedness, limited library support, and JavaScript itself. Tim set out to write a toolkit which was unopinionated about how and where it is used, and he decided that the best place to implement it was on the JVM. So, Tim and the community set out to create an event-driven, non-blocking, reactive toolkit which in many ways mirrored what could be done in NodeJS, but also took advantage of the power available inside of the JVM. Node.x was born and it later progressed to become Vert.x.

Overview

Vert.x is designed to implement an event bus which is how different parts of the application can communicate in a non-blocking/thread safe manner. Parts of it were modeled after the Actor methodology exhibited by Eralng and Akka. It is also designed to take full advantage of today’s multi-core processors and highly concurrent programming demands. As such, by default, all Vert.x VERTICLES are implemented as single-threaded by default. Unlike NodeJS though, Vert.x can run MANY verticles in MANY threads. Additionally, you can specify that some verticles are โ€œworkerโ€ verticles and CAN be multi-threaded. And to really add some icing on the cake, Vert.x has low level support for multi-node clustering of the event bus via the use of Hazelcast. It has gone on to include many other amazing features which are too numerous to list here, but you can read more in the official Vert.x docs.

The first thing you need to know about Vert.x is, similar to NodeJS, never block the current thread. Everything in Vert.x is set up, by default, to use callbacks/futures/promises. Instead of doing synchronous operations, Vert.x provides async methods for doing most I/O and processor intensive operations which might block the current thread. Now, callbacks can be ugly and painful to work with, so Vert.x optionally provides an API based on RxJava which implements the same functionality using the Observer pattern. Finally, Vert.x makes it easy to use your existing classes and methods by providing the executeBlocking(Function f) method on many of it’s asynchronous APIs. This means you can choose how you prefer to work with Vert.x instead of the toolkit dictating to you how it must be used.

The second thing to know about Vert.x is that it composed of verticles, modules, and nodes. Verticles are the smallest unit of logic in Vert.x, and are usually represented by a single class. Verticles should be simple and single-purpose following the UNIX Philosophy. A group of verticles can be put together into a module, which is usually packaged as a single JAR file. A module represents a group of related functionality which when taken together could represent an entire application or just a portion of a larger distributed application. Lastly, nodes are single instances of the JVM which are running one or more modules/verticles. Because Vert.x has clustering built-in from the ground up, Vert.x applications can span nodes either on a single machine or across multiple machines in multiple geographic locations (though latency can hider performance).

Example Project

Now, I’ve been to a number of Meetups and conferences lately where the first thing they show you when talking about reactive programming is to build a chat room application. That’s all well and good, but it doesn’t really help you to completely understand the power of reactive development. Chat room apps are simple and simplistic. We can do better. In this tutorial, we’re going to take a legacy Spring application and convert it to take advantage of Vert.x. This has multiple purposes: It shows that the toolkit is easy to integrate with existing Java projects, it allows us to take advantage of existing tools which may be entrenched parts of our ecosystem, and it also lets us follow the DRY principle in that we don’t have to rewrite large swathes of code to get the benefits of Vert.x.

Our legacy Spring application is a contrived simple example of a REST API using Spring Boot, Spring Data JPA, and Spring REST. The source code can be found in the “master” branch HERE. There are other branches which we will use to demonstrate the progression as we go, so it should be simple for anyone with a little experience with git and Java 8 to follow along. Let’s start by examining the Spring Configuration class for the stock Spring application.


@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {
    public static void main(String[] args) {
        ApplicationContext ctx = SpringApplication.run(Application.class, args);

        System.out.println("Let's inspect the beans provided by Spring Boot:");

        String[] beanNames = ctx.getBeanDefinitionNames();
        Arrays.sort(beanNames);
        for (String beanName : beanNames) {
            System.out.println(beanName);
        }
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
        return builder.setType(EmbeddedDatabaseType.HSQL).build();
    }

    @Bean
    public EntityManagerFactory entityManagerFactory() {
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        vendorAdapter.setGenerateDdl(true);

        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setJpaVendorAdapter(vendorAdapter);
        factory.setPackagesToScan("com.zanclus.data.entities");
        factory.setDataSource(dataSource());
        factory.afterPropertiesSet();

        return factory.getObject();
    }

    @Bean
    public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
        final JpaTransactionManager txManager = new JpaTransactionManager();
        txManager.setEntityManagerFactory(emf);
        return txManager;
    }
}

As you can see at the top of the class, we have some pretty standard Spring Boot annotations. You’ll also see an @Slf4j annotation which is part of the lombok library, which is designed to help reduce boiler-plate code. We also have @Bean annotated methods for providing access to the JPA EntityManager, the TransactionManager, and DataSource. Each of these items provide injectable objects for the other classes to use. The remaining classes in the project are similarly simplistic. There is a Customer POJO which is the Entity type used in the service. There is a CustomerDAO which is created via Spring Data. Finally, there is a CustomerEndpoints class which is the JAX-RS annotated REST controller.

As explained earlier, this is all standard fare in a Spring Boot application. The problem with this application is that for the most part, it has limited scalability. You would either run this application inside of a Servlet container, or with an embedded server like Jetty or Undertow. Either way, each requests ties up a thread and is thus wasting resources when it waits for I/O operations.

Switching over to the Convert-To-Vert.x-Web branch, we can see that the Application class has changed a little. We now have some new @Bean annotated methods for injecting the Vertx instance itself, as well as an instance of ObjectMapper (part of the Jackson JSON library). We have also replaced the CustomerEnpoints class with a new CustomerVerticle. Pretty much everything else is the same.

The CustomerVerticle class is annotated with @Component, which means that Spring will instantiate that class on startup. It also has it’s start method annotated with @PostConstruct so that the Verticle is launched on startup. Looking at the actual content of the code, we see our first bits of Vert.x code: Router.

The Router class is part of the vertx-web library and allows us to use a fluent API to define HTTP URLs, methods, and header filters for our request handling. Adding the BodyHandler instance to the default route allows a POST/PUT body to be processed and converted to a JSON object which Vert.x can then process as part of the RoutingContext. The order of routes in Vert.x CAN be significant. If you define a route which has some sort of glob matching (* or regex), it can swallow requests for routes defined after it unless you implement chaining. Our example shows 3 routes initially.


    @PostConstruct
    public void start() throws Exception {
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.get("/v1/customer/:id")
                .produces("application/json")
                .blockingHandler(this::getCustomerById);
        router.put("/v1/customer")
                .consumes("application/json")
                .produces("application/json")
                .blockingHandler(this::addCustomer);
        router.get("/v1/customer")
                .produces("application/json")
                .blockingHandler(this::getAllCustomers);
        vertx.createHttpServer().requestHandler(router::accept).listen(8080);
    }

Notice that the HTTP method is defined, the “Accept” header is defined (via consumes), and the “Content-Type” header is defined (via produces). We also see that we are passing the handling of the request off via a call to the blockingHandler method. A blocking handler for a Vert.x route accepts a RoutingContext object as it’s only parameter. The RoutingContext holds the Vert.x Request object, Response object, and any parameters/POST body data (like “:id”). You’ll also see that I used method references rather than lambdas to insert the logic into the blockingHandler (I find it more readable). Each handler for the 3 request routes is defined in a separate method further down in the class. These methods basically just call the methods on the DAO, serialize or deserialize as needed, set some response headers, and end() the request by sending a response. Overall, pretty simple and straightforward.


    private void addCustomer(RoutingContext rc) {
        try {
            String body = rc.getBodyAsString();
            Customer customer = mapper.readValue(body, Customer.class);
            Customer saved = dao.save(customer);
            if (saved!=null) {
                rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));
            } else {
                rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");
            }
        } catch (IOException e) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", e);
        }
    }

    private void getCustomerById(RoutingContext rc) {
        log.info("Request for single customer");
        Long id = Long.parseLong(rc.request().getParam("id"));
        try {
            Customer customer = dao.findOne(id);
            if (customer==null) {
                rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");
            } else {
                rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));
            }
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);
        }
    }

    private void getAllCustomers(RoutingContext rc) {
        log.info("Request for all customers");
        List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());
        try {
            rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));
        } catch (JsonProcessingException jpe) {
            rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
            log.error("Server error", jpe);
        }
    }

“But this is more code and messier than my Spring annotations and classes”, you might say. That CAN be true, but it really depends on how you implement the code. This is meant to be an introductory example, so I left the code very simple and easy to follow. I COULD use an annotation library for Vert.x to implement the endpoints in a manner similar to JAX-RS. In addition, we have gained a massive scalability improvement. Under the hood, Vert.x Web uses Netty for low-level asynchronous I/O operations, thus providing us the ability to handle MANY more concurrent requests (limited by the size of the database connection pool).

We’ve already made some improvement to the scalability and concurrency of this application by using the Vert.x Web library, but we can improve things a little more by implementing the Vert.x EventBus. By separating the database operations into Worker Verticles instead of using blockingHandler, we can handle request processing more efficiently. This is show in the Convert-To-Worker-Verticles branch. The application class has remained the same, but we have changed the CustomerEndpoints class and added a new class called CustomerWorker. In addition, we added a new library called Spring Vert.x Extension which provides Spring Dependency Injections support to Vert.x Verticles. Start off by looking at the new CustomerEndpoints class.


    @PostConstruct
    public void start() throws Exception {
        log.info("Successfully create CustomerVerticle");
        DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);
        vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {
            if (res.succeeded()) {
                Router router = Router.router(vertx);
                router.route().handler(BodyHandler.create());
                final DeliveryOptions opts = new DeliveryOptions()
                        .setSendTimeout(2000);
                router.get("/v1/customer/:id")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "getCustomer")
                                    .addHeader("id", rc.request().getParam("id"));
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        });
                router.put("/v1/customer")
                        .consumes("application/json")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "addCustomer");
                            vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));
                        });
                router.get("/v1/customer")
                        .produces("application/json")
                        .handler(rc -> {
                            opts.addHeader("method", "getAllCustomers");
                            vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));
                        });
                vertx.createHttpServer().requestHandler(router::accept).listen(8080);
            } else {
                log.error("Failed to deploy worker verticles.", res.cause());
            }
        });
    }

The routes are the same, but the implementation code is not. Instead of using calls to blockingHandler, we have now implemented proper async handlers which send out events on the event bus. None of the database processing is happening in this Verticle anymore. We have moved the database processing to a Worker Verticle which has multiple instances to handle multiple requests in parallel in a thread-safe manner. We are also registering a callback for when those events are replied to so that we can send the appropriate response to the client making the request. Now, in the CustomerWorker Verticle we have implemented the database logic and error handling.

@Override
public void start() throws Exception {
    vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest);
}

public void handleDatabaseRequest(Message<Object> msg) {
    String method = msg.headers().get("method");

    DeliveryOptions opts = new DeliveryOptions();
    try {
        String retVal;
        switch (method) {
            case "getAllCustomers":
                retVal = mapper.writeValueAsString(dao.findAll());
                msg.reply(retVal, opts);
                break;
            case "getCustomer":
                Long id = Long.parseLong(msg.headers().get("id"));
                retVal = mapper.writeValueAsString(dao.findOne(id));
                msg.reply(retVal);
                break;
            case "addCustomer":
                retVal = mapper.writeValueAsString(
                                    dao.save(
                                            mapper.readValue(
                                                    ((JsonObject)msg.body()).encode(), Customer.class)));
                msg.reply(retVal);
                break;
            default:
                log.error("Invalid method '" + method + "'");
                opts.addHeader("error", "Invalid method '" + method + "'");
                msg.fail(1, "Invalid method");
        }
    } catch (IOException | NullPointerException e) {
        log.error("Problem parsing JSON data.", e);
        msg.fail(2, e.getLocalizedMessage());
    }
}

The CustomerWorker worker verticles register a consumer for messages on the event bus. The string which represents the address on the event bus is arbitrary, but it is recommended to use a reverse-tld style naming structure so that it is simple to ensure that the addresses are unique (“com.zanclus.customer”). Whenever a new message is sent to that address, it will be delivered to one, and only one, of the worker verticles. The worker verticle then calls handleDatabaseRequest to do the database work, JSON serialization, and error handling.

There you have it. You’ve seen that Vert.x can be integrated into your legacy applications to improve concurrency and efficiency without having to rewrite the entire application. We could have done something similar with an existing Google Guice or JavaEE CDI application. All of the business logic could remain relatively untouched while we tried in Vert.x to add reactive capabilities. The next steps are up to you. Some ideas for where to go next include Clustering, WebSockets, and VertxRx for ReactiveX sugar.

A persistent KeyValue Server in 40 lines and a sad fact

Advent time again .. picking up Peters well written overview on the uses of Unsafe, i’ll have a short fly-by on how low level techniques in Java can save development effort by enabling a higher level of abstraction or allow for Java performance levels probably unknown to many.

My major point is to show that conversion of Objects to bytes and vice versa is an important fundamental, affecting virtually any modern java application.

Hardware enjoys to process streams of bytes, not object graphs connected by pointers as “All memory is tape” (M.Thompson if I remember correctly ..).

Many basic technologies are therefore hard to use with vanilla Java heap objects:

  • Memory Mapped Files – a great and simple technology to persist application data safe, fast & easy.
  • Network communication is based on sending packets of bytes
  • Interprocess communication (shared memory)
  • Large main memory of today’s servers (64GB to 256GB). (GC issues)
  • CPU caches work best on data stored as a continuous stream of bytes in memory

so use of the Unsafe class in most cases boil down in helping to transform a java object graph into a continuous memory region and vice versa either using

  • [performance enhanced] object serialization or
  • wrapper classes to ease access to data stored in a continuous memory region.

(source of examples used in this post can be found here, messaging latency test here)


    Serialization based Off-Heap

    Consider a retail WebApplication where there might be millions of registered users. We are actually not interested in representing data in a relational database as all needed is a quick retrieve of user related data once he logs in. Additionally one would like to traverse the social graph quickly.

    Let’s take a simple user class holding some attributes and a list of ‘friends’ making up a social graph.

    easiest way to store this on heap, is a simple huge HashMap.

    Alternatively one can use off heap maps to store large amounts of data. An off heap map stores its keys and values inside the native heap, so garbage collection does not need to track this memory. In addition, native heap can be told to automagically get synchronized to disk (memory mapped files). This even works in case your application crashes, as the OS manages write back of changed memory regions.

    There are some open source off heap map implementations out there with various feature sets (e.g. ChronicleMap), for this example I’ll use a plain and simple implementation featuring fast iteration (optional full scan search) and ease of use.

    Serialization is used to store objects, deserialization is used in order to pull them to the java heap again. Pleasantly I have written the (afaik) fastest fully JDK compliant object serialization on the planet, so I’ll make use of that.

     Done:

    • persistence by memory mapping a file (map will reload upon creation). 
    • Java Heap still empty to serve real application processing with Full GC < 100ms. 
    • Significantly less overall memory consumption. A user record serialized is ~60 bytes, so in theory 300 million records fit into 180GB of server memory. No need to raise the big data flag and run 4096 hadoop nodes on AWS ;).
    Comparing a regular in-memory java HashMap and a fast-serialization based persistent off heap map holding 15 millions user records, will show following results (on a 3Ghz older XEON 2×6):

    consumed Java Heap (MB) Full GC (s) Native Heap (MB) get/put ops per s required VM size (MB)
    HashMap 6.865,00 26,039 0 3.800.000,00
    12.000,00
    OffheapMap (Serialization based)
    63,00
    0,026
    3.050
    750.000,00
    500,00

    [test source / blog project] Note: You’ll need at least 16GB of RAM to execute them.

    As one can see, even with fast serialization there is a heavy penalty (~factor 5) in access performance, anyway: compared to other persistence alternatives, its still superior (1-3 microseconds per “get” operation, “put()” very similar).

    Use of JDK serialization would perform at least 5 to 10 times slower (direct comparison below) and therefore render this approach useless.

    Trading performance gains against higher level of abstraction: “Serverize me”

    A single server won’t be able to serve (hundreds of) thousands of users, so we somehow need to share data amongst processes, even better: across machines.

    Using a fast implementation, its possible to generously use (fast-) serialization for over-the-network messaging. Again: if this would run like 5 to 10 times slower, it just wouldn’t be viable. Alternative approaches require an order of magnitude more work to achieve similar results.

    By wrapping the persistent off heap hash map by an Actor implementation (async ftw!), some lines of code make up a persistent KeyValue server with a TCP-based and a HTTP interface (uses kontraktor actors). Of course the Actor can still be used in-process if one decides so later on.

    Now that’s a micro service. Given it lacks any attempt of optimization and is single threaded, its reasonably fast [same XEON machine as above]:

    • 280_000 successful remote lookups per second 
    • 800_000 in case of fail lookups (key not found)
    • serialization based TCP interface (1 liner)
    • a stringy webservice for the REST-of-us (1 liner).

    [source: KVServer, KVClient] Note: You’ll need at least 16GB of RAM to execute the test.

    A real world implementation might want to double performance by directly putting received serialized object byte[] into the map instead of encoding it twice (encode/decode once for transmission over wire, then decode/encode for offheaping map).

    “RestActorServer.Publish(..);” is a one liner to also expose the KVActor as a webservice in addition to raw tcp:

    C like performance using flyweight wrappers / structs

    With serialization, regular Java Objects are transformed to a byte sequence. One can do the opposite: Create  wrapper classes which read data from fixed or computed positions of an underlying byte array or native memory address. (E.g. see this blog post).

    By moving the base pointer its possible to access different records by just moving the the wrapper’s offset. Copying such a “packed object” boils down to a memory copy. In addition, its pretty easy to write allocation free code this way. One downside is, that reading/writing single fields has a performance penalty compared to regular Java Objects. This can be made up for by using the Unsafe class.

    “flyweight” wrapper classes can be implemented manually as shown in the blog post cited, however as code grows this starts getting unmaintainable.
    Fast-serializaton provides a byproduct “struct emulation” supporting creation of flyweight wrapper classes from regular Java classes at runtime. Low level byte fiddling in application code can be avoided for the most part this way.

    How a regular Java class can be mapped to flat memory (fst-structs):

    Of course there are simpler tools out there to help reduce manual programming of encoding  (e.g. Slab) which might be more appropriate for many cases and use less “magic”.

    What kind of performance can be expected using the different approaches (sad fact incoming) ?

    Lets take the following struct-class consisting of a price update and an embedded struct denoting a tradable instrument (e.g. stock) and encode it using various methods:

    a ‘struct’ in code
    Pure encoding performance:
    Structs fast-Ser (no shared refs) fast-Ser JDK Ser (no shared) JDK Ser
    26.315.000,00 7.757.000,00 5.102.000,00 649.000,00 644.000,00




    Real world test with messaging throughput:

    In order to get a basic estimation of differences in a real application, i do an experiment how different encodings perform when used to send and receive messages at a high rate via reliable UDP messaging:

    The Test:
    A sender encodes messages as fast as possible and publishes them using reliable multicast, a subscriber receives and decodes them.

    Structs fast-Ser (no shared refs) fast-Ser JDK Ser (no shared) JDK Ser
    6.644.107,00 4.385.118,00 3.615.584,00 81.582,00 79.073,00

    (Tests done on I7/Win8, XEON/Linux scores slightly higher, msg size ~70 bytes for structs, ~60 bytes serialization).

    Slowest compared to fastest: factor of 82. The test highlights an issue not covered by micro-benchmarking: Encoding and Decoding should perform similar, as factual throughput is determined by Min(Encoding performance, Decoding performance). For unknown reasons JDK serialization manages to encode the message tested like 500_000 times per second, decoding performance is only 80_000 per second so in the test the receiver gets dropped quickly:



    ***** Stats for receive rate:   80351   per second *********
    ***** Stats for receive rate:   78769   per second *********
    SUB-ud4q has been dropped by PUB-9afs on service 1
    fatal, could not keep up. exiting

    (Creating backpressure here probably isn’t the right way to address the issue ๐Ÿ˜‰  )

    Conclusion:

    • a fast serialization allows for a level of abstraction in distributed applications impossible if serialization implementation is either
      – too slow
      – incomplete. E.g. cannot handle any serializable object graph
      – requires manual coding/adaptions. (would put many restrictions on actor message types, Futures, Spore’s, Maintenance nightmare)
    • Low Level utilities like Unsafe enable different representations of data resulting in extraordinary throughput or guaranteed latency boundaries (allocation free main path) for particular workloads. These are impossible to achieve by a large margin with JDK’s public tool set.
    • In distributed systems, communication performance is of fundamental importance. Removing Unsafe is  not the biggest fish to fry looking at the numbers above .. JSON or XML won’t fix this ;-).
    • While the HotSpot VM has reached an extraordinary level of performance and reliability, CPU is wasted in some parts of the JDK like there’s no tomorrow. Given we are living in the age of distributed applications and data, moving stuff over the wire should be easy to achieve (not manually coded) and as fast as possible. 
    Addendum: bounded latency

    A quick Ping Pong RTT latency benchmark showing that java can compete with C solutions easily, as long the main path is allocation free and techniques like described above are employed:

    [credits: charts+measurement done with HdrHistogram]

    This is an “experiment” rather than a benchmark (so do not read: ‘Proven: Java faster than C’), it shows low-level-Java can compete with C in at least this low-level domain.
    Of course its not exactly idiomatic Java code, however its still easier to handle, port and maintain compared to a JNI or pure C(++) solution. Low latency C(++) code won’t be that idiomatic either ๐Ÿ˜‰

    About me: I am a solution architect freelancing at an exchange company in the area of realtime GUIs, middleware, and low latency CEP (Complex Event Processing).
    I am blogging at http://java-is-the-new-c.blogspot.de/,
    hacking at https://github.com/RuedigerMoeller.

    This post is part of the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!