Inner World: Java NIO

Note: this article is not a how-to guide or a collection of best-practices. It is aimed primarily at an overview of the existing NIO channels in Java and the operating principle of non-blocking I/O.

Thread is waiting to be read

Thread is waiting to be read


At the beginning of the article I will give a quote describing the difference between the approaches in Java IO and Java NIO:

“The main difference between the two approaches to organizing I/O is that Java IO is thread-oriented, while Java NIO is buffer-oriented. Let's take a closer look.

Thread-oriented I/O involves reading/writing from/to a stream one or more bytes per unit of time, one at a time. This information is not cached anywhere. Thus, it is not possible to arbitrarily move forward or backward through the data stream. If you want to do this kind of manipulation, you will have to cache the data in a buffer first.

The approach on which Java NIO is based is slightly different. The data is read into a buffer for further processing. You can move forward and backward on the buffer. This gives you a little more flexibility when processing data. At the same time, you need to check whether the buffer contains the amount of data necessary for correct processing. You also need to make sure that when reading data into the buffer, you do not destroy the unprocessed data in the buffer.”

Java NIO contains three entities that are important for understanding it: Buffer, Channel And Selector. Let's look at them in order.

Buffer is a container for primitive type data. It is a more functional and convenient replacement for arrays of primitives. In Java, NIO is used as an object that stores a fixed amount of data to be sent to or received from an I/O service. It sits between the application and the channel that writes data to or reads data from the buffer.

Channel – a link for input/output operations. Represents an open connection to an object, such as a hardware device, file, network socket, or software component, that is capable of performing one or more different I/O operations, such as reading or writing. Let's look at them in more detail.

Java NIO has many channel implementations. Below is the hierarchy of interfaces.

Rice.  1: Hierarchy of Channel family interfaces in Java NIO.

Rice. 1: Hierarchy of Channel family interfaces in Java NIO.

Brief description of the features of the Channel family interfaces.
  • Channel – parent class for the entire family

  • ReadableByteChannel – a channel that reads bytes from a data source.

  • ScatteringByteChannel – a channel that reads bytes from a data source into an array of buffers.

  • WritableByteChannel – channel that writes bytes to the data receiver.

  • GatheringByteChannel – a channel that writes bytes to the data receiver from the buffer array.

  • ByteChannel – a channel that can both read and write data. Interface that connects ReadableByteChannel And WritableByteChannel.

  • SeekableByteChannel – a channel that remembers the current reading position and has the ability to change it. In other words, it allows you to navigate through the data source.

  • AsynchronousChannel – a channel that supports asynchronous I/O operations.

  • AsynchronousByteChannel – a channel that supports asynchronous byte read/write operations.

  • NetworkChannel – a channel that uses a network socket.

  • MulticastChannel – a channel that supports multicasting over the IP protocol.

  • InterruptibleChannel – a channel that can be closed asynchronously and interrupt the operation.

Note: Streams from Java OI are always unidirectional: InputStream/OutputStream. Channels from Java NIO can be bidirectional.

The following diagram shows the class hierarchy with abstract implementations.

Rice.  2: Hierarchy of interfaces and abstract implementations of the Channel family in Java NIO.

Rice. 2: Hierarchy of interfaces and abstract implementations of the Channel family in Java NIO.

In the diagram, interfaces (implementations) are divided into 3 groups:

  • red – blocking channels;

  • purple – asynchronous channels;

  • green – non-blocking channels.

Yes, it turns out that not all Java NIO channels are non-blocking! Perhaps this is why NIO is not Non-blockable I/O, but New I/O. You can also notice that asynchronous and non-blocking channels have different implementations (and concepts). Let's look at all three groups in order.

Blocking channels

This group of channels works in the standard paradigm – after calling the read/write function, the calling thread is blocked until the operation is completed. The main difference from standard IO here is the buffer-oriented approach.

Listing 1: Reading a file using a pipe from Java NIO
void nio_blocking_readFile() throws IOException, URISyntaxException {
    URL fileUrl = NioTest.class.getResource(testFilePath);
    var filepath = Path.of(fileUrl.toURI());
  
    try (ReadableByteChannel inputChannel = FileChannel.open(filepath)) {
        var buffer = ByteBuffer.allocate(300_000);
        int readByteCount = inputChannel.read(buffer);
        var fileString = new String(buffer.array(), StandardCharsets.UTF_8);
        System.out.println(fileString);
    }
}

The code in Listing 1 is functionally equivalent to the following code:

Listing 2: Reading a file using a stream from Java IO
void io_readFile() throws IOException {
    try (
        InputStream fileStream = NioTest.class.getResourceAsStream(testFilePath);
        var inputStream = new BufferedInputStream(fileStream)
    ) {
        byte[] fileBytes = inputStream.readAllBytes();
        var fileString = new String(fileBytes, StandardCharsets.UTF_8);
        System.out.println(fileString);
    }
}

Asynchronous channels

This group of channels has the ability to asynchronously read/write operations. It is possible to read/write with a callback, or simply receive an object Futureand the read/write operation itself will take place in the background.

Listing 3: Asynchronously reading a file with Future
void nio_async_readFile() throws URISyntaxException, IOException {
    URL fileUrl = NioTest.class.getResource(testFilePath);
    var path = Path.of(fileUrl.toURI());
  
    try (var inputChannel = AsynchronousFileChannel.open(path)) {
        var buffer = ByteBuffer.allocate(300_000);
        Future<Integer> futureResult = inputChannel.read(buffer, 0);
      
        while (!futureResult.isDone()) {
            System.out.println("Файл еще не загружен в буффер");
        }
      
        var fileString = new String(buffer.array(), StandardCharsets.UTF_8);
        System.out.println(fileString);
    }
}
Listing 4: Asynchronously reading a file using a callback
void nio_async_readFile() throws URISyntaxException, IOException {
    URL fileUrl = NioTest.class.getResource(testFilePath);
    var path = Path.of(fileUrl.toURI());
  
    try (var inputChannel = AsynchronousFileChannel.open(path)) {
        var buffer = ByteBuffer.allocate(300_000);
        inputChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                var fileString = new String(buffer.array(), StandardCharsets.UTF_8);
                System.out.println(fileString);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                //do nothing
            }
        });
      
        try {
            Thread.sleep(3_000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Non-blocking channels

This group of channels can switch between blocking and non-blocking mode. You may notice that all non-blocking channel implementations in Java NIO work with sockets.

The article will consider non-blocking channels using the example of two classes – ServerSocketChannel And SocketChannel.

ServerSockerChannel

The server socket is opened with the command ServerSocketChannel.open(). The created channel is public, but it is not bound to a specific socket. To associate it with a socket, you need to call serverSocketChannel.socket().bind().

By default, the channel is blocking. To switch it to non-blocking mode, you need to call serverSocketChannel.configureBlocking(false).

We catch connections through a call serverSocketChannel.accept(). If blocking mode is specified, the calling thread is blocked until the connection is accepted. Otherwise (non-blocking mode enabled), null is immediately returned if there are no pending connections. The channels returned by this method are always blocking, regardless of the channel type set (but they can be set to non-blocking mode). .

Listing 5: Blocked server
void nio_server_blockable() throws IOException {
    //Открытие канала. Под капотом вызывается SelectorProvider, реализация которого является платформозависимой
    var ssc = ServerSocketChannel.open();
    //Созданный канал является открытым, но не привязан к конкретному сокету. Что бы связать его с сокетом, необходимо вызвать код из следующей строки
    ssc.socket().bind(new InetSocketAddress(9999));
    //По дефолту канал является блокирующим. Что бы перевести его в неблокирующий режим, нужно в следующей строке передать false
    ssc.configureBlocking(true);
    var responseMessage = "Привет от сервера! : " + ssc.socket().getLocalSocketAddress();
    var sendBuffer = ByteBuffer.wrap(responseMessage.getBytes());
  
    while (true) {
        //Ловим соединения через вызов ssc.accept()
        //Поток блокируется до момента принятия соединения
        try (SocketChannel sc = ssc.accept()) {
            System.out.println("Принято соединение от  " + sc.socket().getRemoteSocketAddress());
            var receivedBuffer = ByteBuffer.allocate(100);
            sc.read(receivedBuffer);
            var requestMessage = new String(receivedBuffer.array());
            System.out.println(requestMessage);
          
            sendBuffer.rewind();
            sc.write(sendBuffer);
        }
    }
}

On line 14, the application stops waiting for a connection request.

Listing 6: Non-blocking server
void nio_server_non_blockable() throws IOException {
    var ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress(9999));
    //Включаем неблокирующий режим канала
    ssc.configureBlocking(false);
    var responseMessage = "Привет от сервера! : " + ssc.socket().getLocalSocketAddress();
    var sendBuffer = ByteBuffer.wrap(responseMessage.getBytes());
  
    while (true) {
        System.out.print(".");
        //Ловим соединения через вызов ssc.accept().
        //Т.к. стоит неблокирующий режим, метод accept немедленно вернет null, если нет ожидающих подключений
        try (SocketChannel sc = ssc.accept()) {
            if (sc != null) {
                System.out.println();
                System.out.println("Принято соединение от  " + sc.socket().getRemoteSocketAddress());
                var receivedBuffer = ByteBuffer.allocate(100);
                sc.read(receivedBuffer);
                var requestMessage = new String(receivedBuffer.array());
                System.out.println(requestMessage);
              
                sendBuffer.rewind();
                sc.write(sendBuffer);
            } else {
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

In case there are no pending connections, line 13 returns null.

As we can see, “unblockability” is not some kind of silver bullet. ServerSocketChannel.accept() simply does not wait for a connection, but returns immediately null.

SocketChannel

The client socket is opened with the command SocketChannel.open(). If we enable non-blocking mode, then almost the same thing happens as with ServerSocketChannel – the channel does not wait for “sent” or “received” data to appear and simply moves on. However, if there is data, the thread is blocked and the channel reads it. In fact, if the channel is not yet ready to be read or written when the operation is called, we simply skip the operation. The examples use the server written in Listing 6.

Listing 7: Blockable client
void nio_client_blockable() throws IOException {
    try (SocketChannel sc = SocketChannel.open()) {
        sc.configureBlocking(true);
        sc.connect(new InetSocketAddress("localhost", 9999));
      
        var requestMessage = "Привет от клиента! " + LocalDateTime.now();
        ByteBuffer buffer = ByteBuffer.wrap(requestMessage.getBytes());
        sc.write(buffer);
      
        var receivedBuffer = ByteBuffer.allocate(100);
        //Приложение останавливается в ожидании ответа
        sc.read(receivedBuffer);
        var responseMessage = new String(receivedBuffer.array());
        System.out.println(responseMessage);
    }
}

On line 10, the application stops waiting for a response.

Listing 8: Non-blocking client
void nio_client_non_blockable() throws IOException {
    try (SocketChannel sc = SocketChannel.open()) {
        //Включаем неблокирующий режим канала
        sc.configureBlocking(false);
        sc.connect(new InetSocketAddress("localhost", 9999));
      
        while (!sc.finishConnect()) {
            System.out.println("waiting to finish connection");
        }
      
        var requestMessage = "Привет от клиента! " + LocalDateTime.now();
        ByteBuffer buffer = ByteBuffer.wrap(requestMessage.getBytes());
        sc.write(buffer);
      
        var receivedBuffer = ByteBuffer.allocate(100);
        //Ответа еще нет, канал ничего не прочтет, буффер останется пустым
        sc.read(receivedBuffer);
        Thread.sleep(1_000);
        var responseMessage = new String(receivedBuffer.array());
        //Консоль выведет пустую строку
        System.out.println(responseMessage);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

On line 17, the client does not read anything from the socket because there is no response yet. The read operation is skipped. You may notice that nothing is being read in the background because an empty string is printed to the console despite waiting after calling the read operation.

Listing 9: Non-blocking client waiting for a response
void nio_client_non_blockable() throws IOException {
    try (SocketChannel sc = SocketChannel.open()) {
        sc.configureBlocking(false);
        sc.connect(new InetSocketAddress("localhost", 9999));

        while (!sc.finishConnect()) {
            System.out.println("waiting to finish connection");
        }

        ByteBuffer buffer = ByteBuffer.wrap(("Привет от клиента! " + LocalDateTime.now()).getBytes());
        sc.write(buffer);
        Thread.sleep(1_000);

        var receivedBuffer = ByteBuffer.allocate(100);
        sc.read(receivedBuffer);
        var responseMessage = new String(receivedBuffer.array());
        //Консоль выведет ответ
        System.out.println(responseMessage);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Since we set a delay of one second between writing the request and reading the response, by the time the read operation is called, the channel already has data to read, and the response will be printed to the console.

At first glance, “non-blocking” doesn't look good – we simply don't wait until the operation is ready to execute, and return control to the calling function. The operation is skipped and we do not receive (write) any data.

Note: here you can notice the difference between “asynchronous” and “non-blocking”. In the asynchronous approach, the application still waits for the data, but in a background thread (i.e. the thread is still in the waiting state, but not the calling one), while in the non-blocking approach, we skip the operation if it is not ready to execute (socket is not ready to receive data, or the socket does not have data to read).

The question arises: what does this “unblockability” give us? It is unlikely that anyone will be satisfied with skipping an operation if the channel is not yet ready to give out the data. To answer this question we need to understand the class Selector.

Selector

Selector – this is an object that belongs to a group of channels and determines which channel is ready for writing/reading/connecting, etc. It allows one thread to manage multiple channels (connections). This allows you to reduce the cost of switching between threads.

I will not describe the methods of the classes in question; they are perfectly described in the documentation. But I will try to describe the process of interacting with the selector and using it.

After opening the selector, you must register the channel you are using. Only non-blocking channels can be used for use with the selector. Those. we will not be able to register, for example, FileChannel. The channel itself checks whether it supports the operations transmitted for monitoring and registers its SelectionKey in the selector (essentially, the selector adds the key to the list of observables). Selector.select() returns the number of channels ready to be used. How does he know how many channels are ready for use? SelectionKey contains a link to the channel. The selector goes through each channel and asks if it is ready to write/read/etc., and counts the number of such channels.

Next, if there are channels waiting to be processed, we pull out a lot of ready ones SelectedKey and process them. In addition, it is possible to directly pass the callback to the function select()and the selector will apply it to the channels that are ready for interaction.

To understand the operations available to a channel, there are constants that denote them. Instance SelectionKey contains operations ready to be executed in the channel. Below is a list of operations available to the channels. They can be combined in any way.

Available operations

OP_READ

The channel has readable data

OP_WRITE

The channel is available for recording

OP_CONNECT

The channel is ready to terminate the connection or is waiting for an error message

OP_ACCEPT

The channel is ready to receive connections (only for ServerSocketChannel)

Listing 10: Implementing a non-blocking server using a selector
void nio_non_blockable_selector_server() throws IOException {
    try (ServerSocketChannel channel = ServerSocketChannel.open();
         //Открытие селектора. Под капотом вызывается SelectorProvider, реализация которого является платформозависимой
         Selector selector = Selector.open()) {
        channel.socket().bind(new InetSocketAddress(9999));
        channel.configureBlocking(false);
        //Регистрируем серверный канал в селекторе с интересующим типом операции - принятие подключения
        SelectionKey registeredKey = channel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //Получаем количество готовых к обработке каналов.
            int numReadyChannels = selector.select();
            if (numReadyChannels == 0) {
                continue;
            }
            //Получаем готовые к обработке каналы
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            //Обрабатываем каналы в соответствии с типом доступной каналу операции
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    //Принятие подключения серверным сокетом
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    if (client == null) {
                        continue;
                    }
                    client.configureBlocking(false);
                    //Регистрируем принятое подключение в селекторе с интересующим типом операции - чтение
                    client.register(selector, SelectionKey.OP_READ);
                }

                if (key.isReadable()) {
                    //Тут происходит обработка принятых подключений
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer requestBuffer = ByteBuffer.allocate(100);
                    int r = client.read(requestBuffer);
                    if (r == -1) {
                        client.close();
                    } else {
                        //В этом блоке происходит обработка запроса
                        System.out.println(new String(requestBuffer.array()));
                        String responseMessage = "Привет от сервера! : " + client.socket().getLocalSocketAddress();
                        //Несмотря на то, что интересующая операция, переданная в селектор - чтение, мы все равно можем писать в сокет
                        client.write(ByteBuffer.wrap(responseMessage.getBytes()));
                    }
                }
                //Удаляем ключ после обработки. Если канал снова будет доступным, его ключ снова появится в selectedKeys
                keyIterator.remove();
            }
        }
    }
}
Listing 11: Implementing a non-blocking server using a selector and registering a callback
void nio_non_blockable_selector_server() throws IOException {
    try (ServerSocketChannel channel = ServerSocketChannel.open();
         Selector selector = Selector.open()) {
        channel.socket().bind(new InetSocketAddress(9999));
        channel.configureBlocking(false);
        SelectionKey registeredKey = channel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //Обрабатываем доступные к ожиданию подключения с использованием каллбэка
            selector.select(key -> {

                if (key.isAcceptable()) {
                    try {
                        //Принятие подключения серверным сокетом
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        //Регистрируем принятое подключение в селекторе с интересующим типом операции - чтение
                        client.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }

                if (key.isReadable()) {
                    try {
                        //Тут происходит обработка принятых подключений
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer requestBuffer = ByteBuffer.allocate(100);
                        int r = client.read(requestBuffer);
                        if (r == -1) {
                            client.close();
                        } else {
                            //В этом блоке происходит обработка запроса
                            System.out.println(new String(requestBuffer.array()));
                            String responseMessage = "Привет от сервера! : " + client.socket().getLocalSocketAddress();
                            //Несмотря на то, что интересующая операция, переданная в селектор - чтение, мы все равно можем писать в сокет
                            client.write(ByteBuffer.wrap(responseMessage.getBytes()));
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }
}

A non-blocking client works in much the same way: we send a message and, instead of waiting for a response, we register it in a selector. In doing so, we can create a separate thread that checks the selector and processes the ready-to-run threads.

Listing 12 – Non-blocking client using a selector and processing responses in a separate thread
void nio_clientSocket_non_blockable_selector_1() throws IOException {
    try (SocketChannel sc = SocketChannel.open();
         Selector selector = Selector.open()) {
        sc.configureBlocking(false);
        //Регистрируем канал в селекторе с интересующим типом операции - чтение
        SelectionKey registeredKey = sc.register(selector, SelectionKey.OP_READ);

        //Создаем поток, который будет опрашивать селектор и обрабатывать ответы на наши запросы
        var selectorThread = new Thread(() -> {
            while (true) {
                try {
                    int numReadyChannels = selector.select();
                    if (numReadyChannels == 0) {
                        continue;
                    }

                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                    
                    while (keyIterator.hasNext()) {
                        SelectionKey key = keyIterator.next();
                        if (key.isReadable()) {
                            //Этот тот канал, который мы открыли в начале функции
                            //Мы отловили его дя чтения ответа
                            SocketChannel client = (SocketChannel) key.channel();
                            var received = ByteBuffer.allocate(100);
                            client.read(received);
                            System.out.println(new String(received.array()));
                        }
                        keyIterator.remove();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        selectorThread.setDaemon(true);
        selectorThread.start();

        sc.connect(new InetSocketAddress("localhost", 9999));
        while (!sc.finishConnect()) {
            System.out.println("waiting to finish connection");
        }
        
        String requestMessage = "Привет от клиента! " + LocalDateTime.now();
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestMessage.getBytes());
        sc.write(requestBuffer);

        Thread.sleep(2000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

This way the main thread is not blocked while waiting for a response. However, we can create a separate limited pool of threads that will process our selectors and transfer processing of received requests/responses to executing threads.

Other articles from the series “Inner World”

Inner World: Project Reactor

useful links

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *