So, you want to optimize gRPC. Part 1

The question often arises about how to speed up gRPC. gRPC allows for high performance RPC, but it is not always clear how to achieve this performance. And I decided to try to show my train of thought when optimizing programs.

Consider a simple key-value service that is used by multiple clients. The service must work correctly with parallel data change operations. It should also be scalable. And at the end of the day, it has to be fast. To implement such a service, gRPC would be ideal. Let’s see how to implement it.

I wrote an example client and server in Java. The example has three main classes and a protobuf file that describes the API:

  • KvClient – simulates the user of the key-value service. It randomly creates, retrieves, modifies, and deletes keys and values. The size of the keys and values ​​used is also randomly determined using exponential distribution

  • KvService – implementation of the “key-value” service. It handles requests from clients. To simulate data storage on disk for read and write operations, small delays of 10 and 50 ms are added to make the example look like a real database.

  • KvRunner – organizes the interaction between the client and the server. It is the main entry point that starts the client and server processes and waits for the client to do its job. The runner runs for 60 seconds and then displays the number of RPCs completed.

  • kvstore.proto – definition of the Protocol Buffers of our service. This describes what customers can expect from the service. For simplicity, we will use Create, Retrieve, Update, and Delete (commonly known as CRUD) as operations. These operations operate on keys and values ​​made up of arbitrary bytes. While this is somewhat similar to REST, it remains possible for us to add more complex operations in the future.

Protocol buffers can be used without gRPC – it is a convenient way to define service interfaces and generate client and server code. The generated code acts as the glue between the application logic and the gRPC library. This gRPC used by the client is what we call a stub.

Initial version

Client

Now that we have figured out what the program should do, we can begin to study how it does it. As mentioned above, the client performs random RPCs. For example, the following code makes a request create:

private void doCreate(KeyValueServiceBlockingStub stub) {
  ByteString key = createRandomKey();
  try {
    CreateResponse res = stub.create(
        CreateRequest.newBuilder()
            .setKey(key)
            .setValue(randomBytes(MEAN_VALUE_SIZE))
            .build());
    if (!res.equals(CreateResponse.getDefaultInstance())) {
      throw new RuntimeException("Invalid response");
    }
  } catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
      knownKeys.remove(key);
      logger.log(Level.INFO, "Key already existed", e);
    } else {
      throw e;
    }
  }
}

A random key and a random value are generated. The request is sent to the server and the client waits for a response. When a response is received, it is checked if it matches what was expected, and if it does not, then an exception is thrown. Although keys are randomly generated, they must be unique, so make sure the key is not already in use. To check for uniqueness, we keep track of the generated keys so that we don’t use the same key twice. However, it is likely that some other client has already created a certain key, so we log it and move on. Otherwise, an exception is thrown.

Here we use blocking gRPC API that sends a request and waits for a response. This is the simplest thread-blocking gRPC stub. It turns out that the client can simultaneously perform no more than one RPC.

Server

Server side request handles the following code:

private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

The service retrieves the key and value from the request as ByteBuffer… And to control that concurrent requests won’t damage the store, the method is declared as synchronized… After simulating writing to disk, the data is saved in Map

Unlike client-side code, the server-side handler is non-blocking… To send a response, it calls onNext() at responseObserver… To complete sending the message, the onCompleted()

Performance

The code looks correct and safe – let’s see how it works. I am using an Ubuntu computer with a 12-core processor and 32GB of memory. Let’s build and run:

$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s

real	1m0.927s
user	0m10.688s
sys	0m1.456s

Gee! On such a powerful machine, there are only 16 RPCs per second. The processor is barely used and we don’t know how much memory was consumed. Let’s find out why this is such a bad result.

Optimization

Analysis

Before making any changes, let’s take a look at what the program does. We need to figure out where the code is spending its time in order to figure out what needs to be optimized. At this stage, the profiling tools are not needed yet, we can simply analyze the code.

The client starts up and runs the RPC sequentially for about a minute. At each iteration, it randomly decideswhich operation to perform:

void doClientWork(AtomicBoolean done) {
  Random random = new Random();
  KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);

  while (!done.get()) {
    // Pick a random CRUD action to take.
    int command = random.nextInt(4);
    if (command == 0) {
      doCreate(stub);
      continue;
    }
    /* ... */
    rpcCount++;
  }
}

It means that no more than one RPC can run at the same time… Each call must wait for the previous one to complete. How long does each RPC take? After examining the server code, we see that the heaviest operation takes about 50ms. At maximum efficiency, you will only be able to execute 20 requests per second:

20 requests = 1000ms / (50ms / request)

Our code can execute about 16 requests per second, which seems to be true. We can verify this assumption by looking at the output of the command timeused to run the code. When making requests in the method simulateWork the server just sleeps (sleep). It turns out that the program is mostly idle, waiting for the RPC to complete.

You can confirm this by looking above at the real execution time between start and termination (real) and the CPU usage time (user). One minute passed, but the processor only spent 10 seconds. My powerful multi-core processor was busy only 16% of the time. Thus, if you force the program to do more work during this time, it looks like you can increase the number of RPCs.

Hypothesis

Now we can clearly see the problem and can offer a solution. One way to speed things up is to make sure the processor isn’t idle. To do this, we will do the work in parallel.

There are three types of stubs in the gRPC library for Java: blocking, non-blocking, and ListenableFuture… We have already seen blocking on the client and non-blocking on the server. The ListenableFuture API is a compromise between the two, offering both blocking and non-blocking behavior. As long as we do not block the thread waiting for completion, we can start new RPCs without waiting for the old ones to finish.

Experiment

To test our hypothesis, let’s use in the client ListenableFuture… This means that now we need to think more about concurrency in the code. For example, when tracking used keys on the client, we need to safely read, modify, and write the keys. You also need to make sure that in case of an error, we will stop performing new RPCs (correct error handling will be covered in the next post). Finally, we need to increment the RPC completed count and remember that the RPCs are performed on different threads.

All of these changes add complexity to the code. But this is the trade-off we make to improve performance. Often the simplicity of the code goes against optimization. However, the code below is still quite readable and the flow of execution flows from top to bottom. Here is the corrected method doCreate ():

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
  Futures.addCallback(res, new FutureCallback<CreateResponse>() {
    @Override
    public void onSuccess(CreateResponse result) {
      if (!result.equals(CreateResponse.getDefaultInstance())) {
        error.compareAndSet(null, new RuntimeException("Invalid response"));
      }
      synchronized (knownKeys) {
        knownKeys.add(key);
      }
    }

    @Override
    public void onFailure(Throwable t) {
      Status status = Status.fromThrowable
      if (status.getCode() == Code.ALREADY_EXISTS) {
        synchronized (knownKeys) {
          knownKeys.remove(key);
        }
        logger.log(Level.INFO, "Key already existed", t);
      } else {
        error.compareAndSet(null, t);
      }
    }
  });
}

The stab was changed to KeyValueServiceFutureStubwhich creates Future instead of returning a value directly. GRPC Java uses ListenableFuturewhich allows you to add a callback on completion Future… Here we are not too worried about the answer. We are more concerned about whether the RPC succeeded or not. Therefore, the code mostly handles errors and not the response.

The first change is how we count the number of RPCs. Instead of incrementing the counter in the main loop, we increment it when the RPC completes.

Next, for each RPC, we create a new object that handles both successful and unsuccessful attempts. Because doCreate()will have completed by the time the callback method runs after the RPC has completed, we need a way to propagate errors other than throw. Instead, we are trying to update the link atomically. The main loop checks from time to time to see if an error has occurred and stops when it detects an error.

Finally, the code adds a key to knownKeys only when the RPC has actually completed, and is dropped when it is known to have failed. We synchronize on a variable to make sure the two threads don’t collide. Note: although access to knownKeys thread safe, but the likelihood of occurrence race conditions remains. One thread can read from knownKeys, then remove the second from knownKeys, and then the first one can perform RPC using the read key. Synchronization on keys only guarantees consistency, not correctness. Fixing this is beyond the scope of this post, so we’ll just log it and move on. You will see several of these messages in the log when you run the program.

Run the code

If you run this program, you will see that it does not work:

WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	...

What?! Why am I showing you code that doesn’t work? Usually, in real life, nothing works on the first try. In this case, there was not enough memory. And when the program runs out of memory, strange things start to happen. And often the root cause is difficult to find, as there are many confusing moments. The error message says “unable to create new native thread” although we did not create any new threads. Experience helps me a lot in fixing such problems, not debugging. I met OOM a lot and realized what Java is telling us about the last straw that overflowed the cup. Our program started to use a lot of memory and the last memory allocation that failed was accidentally happened while creating a thread.

So what happened? In the blocking version, the next RPC was not started until the previous one finished. It was slow, but it also helped us avoid creating tons of RPCs that we ended up running out of memory for. We must take this into account in the version with ListenableFuture

To solve this problem, you can implement a limitation on the number of active RPCs. We will try to get permission before starting a new RPC. If we receive it, then we perform RPC. If not, then we wait until it becomes available. When the RPC completes (success or failure), the permission is returned. For this we will use a semaphore:

private final Semaphore limiter = new Semaphore(100);

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
    throws InterruptedException {
  limiter.acquire();
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() ->  {
    rpcCount.incrementAndGet();
    limiter.release();
  }, MoreExecutors.directExecutor());
  /* ... */
}

Now the code runs successfully and does not run out of memory.

results

Everything looks much better after the changes made:

$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s

real	1m0.923s
user	0m12.772s
sys	0m1.572s

Our code executes on 46% more RPC per second than before. We can also see that about 20% more CPU is being used. Our hypothesis turned out to be correct, and the changes we made worked. Nor did we make any changes to the server. And also did not use any special profilers or tracers.

Does these numbers make sense? We expect about 1/4 of the operations to be mutable (create, update and delete). And the reading will also be in 1/4 of the cases. The average RPC time should be approximately equal to the weighted average:

.25 * 50ms (create)
  .25 * 10ms (retrieve)
  .25 * 50ms (update)
 +.25 * 50ms (delete)
------------
        40ms

With an average of 40ms per RPC, we expect the number of RPCs per second to be:

25 requests = 1000ms / (40ms / request)

This is roughly what we see. But the server still processes requests sequentially, so we have more work to do on it in the future. For now, though, our optimization seems to have worked.

findings

There are many ways to optimize gRPC code. To do this, you need to understand what your code does and what it should do. This post shows only the very basics of how to approach optimization. Always measure performance before and after making changes and use these measurements as a guide for optimization.


The translation of the article was prepared on the eve of the start of the course Java Developer. Basic… We invite everyone to visit free webinar, within which our experts will talk about career prospects after completing the course, as well as answer your questions.

Sign up for a webinar.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *