loading
Request demo now

The reactor pattern and non-blocking IO

Our previous blog articles introduced an overview of CELUM’s reactive stack and the graph query language. This time, it is about the core concept of a reactive architectures and a comparison of a blocking and non-blocking implementation. At first, some theory:

Reactor pattern

The reactor pattern is not a new invention of the recent years, it was described in a book called “Pattern Languages of Program Design” by Jim Coplien and Douglas C. Schmidt published in 1995. In short, the pattern is for handling concurrent service requests, which are demultiplexed and dispatched synchronously to associated request handlers.

Ok, let us take a closer look what that means. It says concurrent so this would mean the pattern makes a lot of sense if there are many requests at the same time that require limited resources, for example CPU time.

Furthermore, it is about services and request handlers. In a reactor pattern based application for each offered service, a separate request handler is introduced that processes these types of requests. Incoming requests are registered and queued for processing.

The task of demultiplexing and dispatching is typically done in a so-called event loop. The event loop runs in a single thread and will await events to occur that signal when the underlying resources are ready to be consumed without a blocking call. These events are for example, when a network connection is available or a file is ready for reading from a disk or network. In such a case, the event loop will dispatch the event to the associated request handler to invoke the right method. As a result, the whole operation of that request is executed in an asynchronous non-blocking way.

clients event loop workers

Figure 1 https://github.com/robbie-cao/note/blob/master/eventloop.md

Summing up, the benefit of the reactor pattern is to avoid the creation of a thread for each request that needs to be handled.

 

Vert.x

Vert.x is a reactive toolkit that runs on the Java Virtual Machine. It is based on Netty, a low-level non-blocking asynchronous event-driven network framework. Due to a polyglot approach, you can use Vert.x in several languages including Java, Javascript, Scala, Groovy and more. Tim Fox, while employed by VMware, started Vert.x in 2011. The project was moved to the Eclipse Foundation in 2013 and is now available in a mature 3.4.x version.

The reactor pattern describes that a single thread runs in a loop to deliver the events to the handlers. As todays computers have multiple cores, Vert.x extends this pattern and implements a multi-reactor pattern because each Vert.x instance can maintain several event loops. By default, it attaches two event loops per CPU core thread.

Because this pattern is so important, Vert.x will also help you to implement it correctly. Blocking the event loop is a no-go for Vert.x and you will receive warnings in the log when this happens, reminding you to respect this threading and programming model.

io.vertx.core.impl.BlockedThreadChecker

WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 13223 ms, time limit is 2000

io.vertx.core.VertxException: Thread blocked

The C10k problem

Now back to the problem of blocking code and the term C10k, which is nowadays often used in combination with this topic. C10k is an acronym for the problem of handling ten thousand network socket connections concurrently.

In the beginning, let us look at how an HTTP request is handled in a typically web server/servlet container. When accepting the incoming request, the web server establishes a TCP connection, reads, and parses the content. After that, the request is handed over to the application specific logic, which is, in an I/O bound application, accessing the file system, network resources or a database for data. As we all know, I/O operations are extremely slow compared to the computational processing of data. To bring this back into our minds, some numbers by Jeff Dean from Google Research (https://gist.github.com/jboner/2841832) to look at:

o   100ns                                   Main Memory Reference

o   250,000 ns                         Read 1 MB sequentially from memory                  

o   10,000,000 ns                   Disk seek

o   10,000,000 ns                   Read 1 MB sequentially from network

o   20,000,000 ns                   Read 1 MB sequentially from disk

o   Writes are 40 times more expensive than reads

Back to our request, which is currently waiting (blocking) because the CPU has to wait for the I/O device to load all the data. It has to wait and can do nothing else. Does this mean idle on CPU resources? Unfortunately, yes. This is not what we wanted, we want to utilize the CPU as much as possible throughout our operation and finally write the response to the client.

And this was just one request or one thread we were looking at, but in case of concurrent requests the operating system tries to avoid long wait times (busy-wait) on the CPU using multitasking and context switches. When one thread is blocking and waiting for IO, it switches to another thread. However, this comes with additional costs as threads themselves also have an overhead in terms of memory and context switching they need.

reactor pattern context switching

Figure 2 https://www.bryanbraun.com/2012/06/25/multitasking-and-context-switching/

With this, we have now explored the core problems of the C10k problem. A server handling thousands of clients and threads simultaneously needs to leverage non-blocking IO as a scalability solution for C10k.

Reactive vs. blocking implementation

Let us see whether we can experience the difference between a reactive non-blocking implementation and a blocking implementation with an example.

The example is a simple HTTP based service that calculates the checksum of a file identified by an id and simulates a blocking call by sleeping for 1 second. The file sizes are around 200 kilobytes and all roughly the same so it does not really have a big influence for which file the checksum is calculated.

The call looks like this:

http://localhost:8080/hasher?id=17

The result of such a request is simply the SHA512 digest as HEX string of this file:

90491c182f1f9ac6a86845c9b34c6f41e1485e8dfc5f4bbfd646dc6f661ce7986eed287c3a7314a368e55a2167b8597c2a974ef2c7c536e44455b1316474bcd4

Servlet Implementation

At first, we look at the traditional servlet based implementation:


@WebServlet(name = "hashServlet", urlPatterns = {"/hasher"}, loadOnStartup = 1)
public class HashingServlet extends HttpServlet { 
     @Override
     protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws
ServletException, IOException {
       try {     
        Thread.sleep(1000);  
        } catch (InterruptedException e) {     
          //ignore
        }   
        String id = req.getParameter("id");
        String pathToFile = getFileBasedOnId(id);
         try (FileInputStream fileInputStream = new FileInputStream(new
File(pathToFile)); PrintWriter writer = resp.getWriter()) {
           String sha512Hex = DigestUtils.sha512Hex(fileInputStream);
           resp.setContentType("text/plain");
           writer.println(sha512Hex);
         } catch (Exception e) {
           System.err.println(e.getMessage());
           }
  }
private String getFileBasedOnId(String id) {
   return "E:/temp/hasher/stillframes - world/stillframe" + id + ".png";
 }
}
 

We extend from the HttpServlet base class and provide an implementation for the doGet method. At first, we simulate a remote call by sleeping for 1 second. This will block the execution of the thread executing this request. Next, we read the requested file from the file system and calculate the checksum of it using DigestUtils from the standard library commons-codec. The result is written as plain text into the response. This code will be later deployed in a Servlet container to handle these requests.

Vert.x Implementation

Next, we look at the Vert.x based implementation.


public class HashingVerticle extends AbstractVerticle {
     @Override
     public void start() throws Exception {
          Router router = Router.router(vertx);    

          router.route().handler(BodyHandler.create());
          router.get("/hasher").handler(this::handleHash);

          vertx.createHttpServer().requestHandler(router::accept).listen(8080);
}

private void handleHash(RoutingContext routingContext) {
         String id = routingContext.request().getParam("id");
         String pathToFile = getFileBasedOnId(id);

         HttpServerResponse response = routingContext.response();
         vertx.setTimer(1000, timer -> {
                vertx.fileSystem().readFile(pathToFile, handler -> {  
                    if (handler.succeeded()) {    
                         byte[] bytes = handler.result().getBytes(); 
                         String hash = DigestUtils.sha512Hex(bytes);        
                         response.putHeader("content-type", "text/plain").end(hash);       
                        }     
                     });   
                  });
}

private String getFileBasedOnId(String id) {
         return "E:/temp/hasher/stillframes - world/stillframe" + id + ".png";
       }
}

Vert.x comes with an optional deployment model and in this model, you write your application as a set of Verticles. This concept gives you a scalable infrastructure and modularization boundaries as you communicate over an event bus in a message-oriented way between these Verticles. The concept goes so far, that you can even use separate classloaders, which would allow you to deploy different versions of a Verticle. In a book about Microservices, Eberhard Wolff described Vert.x as a Nanoservice, due to the similarities with Microservices but the fact that they can run side-by-side in a single JVM,  making them even smaller.

In Vert.x there are different kinds of Verticles, but for our simple use case a standard Verticle is sufficient. A Verticle needs to be assigned to an event loop thread, which happens in the start method you have to implement when extending from AbstractVerticle. Vert.x also guarantees that all the code in a Verticle instance is executed on the same event loop. As a consequence, you don’t have to worry about synchronized and other threading and scaling topics.

In the start method, we create a router and define which request handler handles which route. After that, we simply create an http server listening on port 8080. No Servlet container or such is required.

The handleHash method is implemented using a timer as a counterpart to the blocking Thread.sleep implementation. The timer will not block the event loop thread, fire after 1 second and call the handler, which is in our case the implementation of reading the file and calculating the checksum and using the end method writing the result into the response. Due to the asynchronous handling, you will see the typical lambda callback functions very intensively used in reactive implementations.

Load test

In order to test and compare these two http service implementations we will be using Gatling as a load-testing framework. With Gatling, you have a powerful DSL to express your tests in code form. It is developed in Scala and based on Netty for non-blocking HTTP, which is also a good fit for this blog post.

The load test for our service is also simple to write using Gatling:


class LoadRunner extends Simulation {
  val idFeeder = Iterator.continually(
    Map("randomId" -> Random.nextInt(4000))
  )
  val httpConf = http
    .baseURL("http://localhost:8080")
  val scn = scenario("Hashing LoadTest")
      .feed(idFeeder)
      .exec(http("Hasher").get("/hasher?id=${randomId}")
      .check(status.is(200)))
  setUp(    
      scn.inject(rampUsersPerSec(10) to 300 during (60 seconds))
   ).protocols(httpConf)
}

We extend from the Simulation base class and we need a setUp method and a scenario (scn) on a specific protocol (httpConf). The scenario describes our use case and executes our HTTP get call with a random id. The random id comes from a feeder, which is a random integer, restricted to 4000 because there are only 4000 files in the directory and for those we calculate the checksums. Finally, by using inject, we define the number of users running the stress test. Gatling is very flexible and provides a broad range of injection possibilities. In our example, we run a test by starting with 10 users per second and increase them to 300 per second in a timeframe of 60 seconds.

The gatling-maven-plugin makes it easy to execute such a simulation by running mvn gatling:execute.

 

Results

The servlet-based implementation was running on an Apache Tomcat 8.5.5 with a configuration setting of 100 max threads for the HTTP connector.

Running the test with the load test scenario from above we get this table as a result. 9300 requests executed, no failed requests. The minimum was 1002ms and the mean time was around 16 seconds.

Executions

 

Total

OK

KO

9300

9300

9300

0

Mean req/s

90.291

90.291

-

Response Time (ms)

 

Total

OK

KO

Min

1002

1002

-

50th percentile

14273

14214

-

75th percentile

27748

27745

-

95th percentile

39557

39557

-

99th percentile

42030

42030

-

Max

42685

42685

-

Mean

16489

16489

-

Std Deviation

13158

13158

-

 

A look on the charts reveals some more details.

response time active users

The orange line shows the number of active users, which climbed just above 4k, where we also reached the max time of around 40k seconds response time. Initially, we see that the response times are as expected slightly above 1 second. As the number of requests is rising this turns into a dramatic increase.

number of requests active users

In this chart, we can see that starting with 100 requests per second the servlet approach cannot keep up anymore due to our 1-second blocking call. On a much smaller scale, we are facing what the C10k problem is about.

number of responses active users

Here we see the number of responses is limited to 100 and it takes nearly 45 seconds to finish all users.

The same scenario is executed against a Vert.x server running 10 instances of our Verticle. This means 10 event loop threads, which is a 1/10 of the Tomcat threads. The result table shows the same number of requests, but much better response times.

Executions

 

Total

OK

KO

 

9300

9300

0

Mean req/s

152.459

152.459

-

Response Time (ms)

 

Total

OK

KO

Min

1001

1001

-

50th percentile

1004

1004

-

75th percentile

1007

1007

-

95th percentile

1012

1012

-

99th percentile

1035

1035

-

Max

1252

1252

-

Mean

1006

1006

-

Std Deviation

8

8

-

 

response time per penctiles over time

Looking at the chart of response times, it looks like a straight line and there are only minor bumps in the chart also confirming the low standard deviation of 8ms. Due to that, also the next two charts show that the requests and responses can keep up very well with the rising load.

number of requests per second

number of responses per second

Before people start to complain, using a timer here is really an idealistic solution and not 100% fair compared to Thread.sleep, but in our artificial example, we wanted to focus on this difference of blocking vs non-blocking execution. To make this comparison equal for both implementations, we now removed the Thread.sleep and the vertx.timer method and load test both once more.

From a configuration point of view, we also give Tomcat 500 max threads in the HTTP connector and sticked to our 10 instances of our Verticle, which is a little more than the eight CPU cores/threads of our machine. The Gatling tests now injects from 10 to 200 users per second in a time frame of 30 seconds and every user now does 100 requests to put the system a little bit more under stress.

Results

The numbers of the servlet-based implementation are:

Executions

 

Total

OK

KO

 

315000

315000

0

Mean req/s

2460.938

2460.938

-

Response Time (ms)

 

Total

OK

KO

Min

0

0

-

50th percentile

903

905

-

75th percentile

1204

1204

-

95th percentile

1890

1890

-

99th percentile

2592

2592

-

Max

22319

22319

-

Mean

897

897

-

Std Deviation

584

584

-

 

As we can see the mean time is slightly below 1 second, almost every request is finished under 2.6 seconds and there is a max value with over 22 seconds.

indictor reactive number of responses

Around 40% could be finished under 800ms, about 25% lasted longer than 1200ms.

response time active users

In the response times, we see that apart from two major spikes the chart is rather flat and delivers a constant rate of equal times.

response time number per second

Nothing special as well in the responses per second chart. Let us look at the reactive implementation.

 

The numbers of the Vert.x implementation are:

Executions

Servlet-Impl.

Diff

 

Total

OK

KO

 

 

 

315000

315000

 

315000

0

Mean req/s

2863.636

2863.636

-

2460.938

-402.698

Response Time (ms)

 

 

 

Total

OK

KO

OK

 

Min

0

0

-

0

 

50th percentile

751

751

-

905

154

75th percentile

935

935

-

1204

269

95th percentile

1263

1263

-

1890

627

99th percentile

1528

1528

-

2592

1064

Max

2085

2085

-

22319

20234

Mean

694

694

-

897

203

Std Deviation

367

367

-

584

217

 

We receive better results compared to the servlet-based implementation. 200ms better mean time, 1 second faster looking at the 99th percentile and less deviation.

Indicators

As this indicator shows, more than 50% are faster than 800ms, only a small percentage is slower than 1200ms.

response time

The response times show no spikes this time and the percentile lines from 50% up to 90% follow closely the active users line delivering a good performance.

response time reactor pattern

The responses are also very stable over the time.

From these charts, we cannot really tell why the numbers are different. All we know is that the servlet-based implementation cannot keep up in this scenario. We need more data to analyse the results, for example the stack telemetry data, which we captured using a Profiler in both runs.

reactor patternreactor pattern 2

The stack of the servlet-based implementation reveals that there are 500 http-nio-threads not only in runnable state (green) but also very often in blocking state (red). Looking at the thread dump of some of those, they are all blocking the digest calculation, waiting for underlying resources because all threads want to be handled concurrently.

reactor pattern 3

In the stack of the reactive implementation, it looks completely different. Instead of 500 threads, we have 10 event loop threads in constant usage, leveraging the available CPUs. Additionally, we see that Vert.x maintains a pool of a default of 20 internal threads for blocking operations . This visualizes very well why the non-blocking implementation outperforms the blocking version exactly how the theory of the reactor pattern describes it.

Summary

We have made a journey from the theory of the reactor pattern to Vert.x as one implementation providing a high-level API to this reactive approach. By knowing the C10K problem, we looked at two load test scenarios where at first we compared an extreme version of a long lasting blocking operation to a non-blocking version of the implementation. In the second scenario, we switched to a very realistic example, where requests use a lot of IO and have to fight for CPU resources, showing that the reactive non-blocking approach can play its cards right.

We hope you enjoyed this blog article. Stay tuned.