Jet bike

On one of the projects I came across Spring Reactor. A good technology of asynchronous streams. Many spears were broken about its use. But now is not about that. But about how I invented the wheel.

Or like this:

No, reactive.

Let's start in order:

the task is to search for addresses by elastic index, said – done. Using the elastic reactive driver, we make a request and get the result, if necessary, we make a transformation, take the required number. All is well. But then the customer comes and says: – listen, we need to make it possible when we request an address to a certain level of detail (city, street, house), and if there are fewer answers than we requested, is it possible to add the search result of the next level of detail to the result?

What a problem! In flask there is a method concat() which is used to combine two streams. But there was a problem, the method concat() uses a sequential subscription to streams. And it always starts the second stream, since it simply combines streams, and the results can be limited by a separate method take(). But how to make the launch of the second stream optional? And also make the second stream dependent on the first result?

We tried different combinations, but we couldn't get a stone flower. Then we decided to reinvent the wheel: make our own concatenation class.

The desired behavior of the new class is:

  • must keep track of the amount of data;

  • must detect the first emitted object from the original stream;

  • must emit the received data asynchronously;

  • and upon receiving a signal about the end of the first stream, it must start the second stream and subscribe to it, also emitting the received data.

    They called it BridgeFlux, a name like a name.

Here are its properties

int capacity;   -- количество объектов, которые могут быть излучены
BlockingQueue<AddressHintResponseDto> queue; -- буффер из которого будем излучать
AtomicBoolean done;   -- флаг окончания генерации
AtomicBoolean doneParent; -- флаг работы родительского потока.
AtomicInteger counter; -- счетчик излученных объектов
AtomicReference<AddressHintResponseDto> atomicReference; -- хранитель первого излученного объекта

The constructor will look like this:

public BridgeFlux (int capacity) {
this.capacity = capacity;
queue = new ArrayBlockingQueue<>(capacity);
done = new AtomicBoolean(true);
doneParent = new AtomicBoolean(true);
atomicReference = new AtomicReference<>(null);
counter = new AtomicInteger(0);
}

This is how it is created:

new BridgeFlux(requestDto.getMaxResultSize())

We just initialized the buffer and set the flags to the starting state.

Since it was more convenient to do the signing and processing of the parent stream from the outside, in this class we only saved the data to the buffer. Generally speaking, it could have been done more universally, but it turned out the way it did. This is how the data from the stream is put into the buffer:

public void add(AddressHintResponseDto address) {
    try {
        queue.put(address);
        counter.incrementAndGet();
        atomicReference.compareAndSet(null,address);
    } catch (InterruptedException e) {
        throw new RuntimeException(e.getMessage());
    }
}

This is how a subscription to a stream is made in an external class to receive data:

getFlux(query, requestDto, lang).distinct()
        .take(requestDto.getMaxResultSize())
        .subscribe(
                bridgeFlux::add, // onNext
                err -> log.error("onError: Exception occurred: " + err.getMessage(), err),  // onError
                () -> complete(bridgeFlux, requestDto, lang)// onComplete
        );

All is well, now we successfully put data into the buffer. Next, the received data must be immediately emitted!

public Flux<AddressHintResponseDto> createFlux() {
    return Flux.<AddressHintResponseDto>create(sink ->
                            sink.onRequest(n -> {
                                while (!sink.isCancelled() && done.get()) {
                                    try {
                                        var address = queue.poll(10, TimeUnit.MILLISECONDS);
                                        if (!doneParent.get() && queue.isEmpty()) {
                                            done.compareAndSet(true,false);
                                        }
                                        if (address != null) {
                                            sink.next(address);
                                        }
                                    } catch (InterruptedException e) {
                                        sink.error(e);
                                    }
                                }
                                if (!doneParent.get() && !sink.isCancelled()) {
                                    sink.complete();
                                }
                            }),
                    FluxSink.OverflowStrategy.IGNORE)
            .subscribeOn(Schedulers.boundedElastic());
}

here is an infinite loop, constantly spinning, and if there is data in the buffer, it takes it from the buffer and sends it to the stream, stops according to the set flags. It emits like this:

bridgeFlux.createFlux().cache();

At this stage, we have ensured continuous transfer from the processed streams to the resulting stream.

But according to the conditions, it is also necessary to output how many objects passed through the buffer and output the first emitted object.

/**
 * выдать количество данных прошедших через буфер
 */
public int getCount() {
    return counter.get();
}

/**
 * выдать первый пришедший в буффер элемент
 */
public AddressHintResponseDto getSavedAddressHintResponseDto() {
    return atomicReference.get();
}

In addition, it is necessary to handle the situation when the external flow has finished emitting and the next flow needs to be launched:

/**
 * при получении команды окончания верхнего потока, запустить принятый поток и подписаться на него,
 * получая данные из него в прописывая их в буффер
 * при окончании потока выставить флаг окончания излучения
 */
public void stop(Flux<AddressHintResponseDto> addFlux) {
    addFlux.subscribe(
            this::add, // onNext
            err -> log.error("onError: Exception occurred: " + err.getMessage(), err),  // onError
            () -> doneParent.compareAndSet(true,false) // onComplete
    );
}

As a result, it became possible to get a list of streets by the query mask, and if the number of responses is less than requested, then make an additional request to get houses on the first received street. And with a reactive client, everything looks like this: first, the streets come and are drawn, and a little later, the houses located on the first street come. At the same time, we do not always need to run a second request, which greatly saves resources.

Here is the entire code of the class:

import com.address_hints.client.dto.response.AddressHintResponseDto;
import com.address_hints.fias.parser.LanguageParser;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;


/**
 * класс мост потоков
 * обрабатывает полученные данные из буфера
 * и генерирует поток данных из этого буфера.
 * так же запускает и обрабатывает дополнительный поток
 */
@Slf4j
public class BridgeFlux {

    int capacity;
    BlockingQueue<AddressHintResponseDto> queue;

    /**
     * флаг окончания генерации
     *
     * комбинация этих флагов нужна для тестирования, так как тестирование идет в один трэд
     * и потоки сразу финализируются
     */
    AtomicBoolean done;
    /**
     * флаг окончания работы порождаемого потока
     */
    AtomicBoolean doneParent;
    AtomicInteger counter;
    AtomicReference<AddressHintResponseDto> atomicReference;

    public BridgeFlux (int capacity) {
        this.capacity = capacity;
        queue = new ArrayBlockingQueue<>(capacity);
        done = new AtomicBoolean(true);
        doneParent = new AtomicBoolean(true);
        atomicReference = new AtomicReference<>(null);
        counter = new AtomicInteger(0);
    }

    /**
     * при получении команды окончания верхнего потока, запустить принятый поток и подписаться на него,
     * получая данные из него в прописывая их в буффер
     * при окончании потока выставить флаг окончания излучения
     */
    public void stop(Flux<AddressHintResponseDto> addFlux) {
        addFlux.subscribe(
                this::add, // onNext
                err -> log.error("onError: Exception occurred: " + err.getMessage(), err),  // onError
                () -> doneParent.compareAndSet(true,false) // onComplete
        );
    }

    /**
     * выдать количество данных прошедших через буфер
     */
    public int getCount() {
        return counter.get();
    }

    /**
     * выдать первый пришедший в буффер элемент
     */
    public AddressHintResponseDto getSavedAddressHintResponseDto() {
        return atomicReference.get();
    }

    /**
     * складывает в буфер, если не получилось, то падает с ошибкой
     */
    @SuppressWarnings({"java:S2142","java:S112"})
    public void add(AddressHintResponseDto address) {
        try {
            queue.put(address);
            counter.incrementAndGet();
            atomicReference.compareAndSet(null,address);
        } catch (InterruptedException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * излучатель данных, излучает при налии данных в буфере, если есть в буфере вынимает из него и отправляет в поток
     * останавливается либо по прерыванию либо по выставленному флагу остановиться.
     */
    @SuppressWarnings({"java:S2142","java:S3776"})
    public Flux<AddressHintResponseDto> createFlux() {
        return Flux.<AddressHintResponseDto>create(sink ->
                                sink.onRequest(n -> {
                                    while (!sink.isCancelled() && done.get()) {
                                        try {
                                            var address = queue.poll(10, TimeUnit.MILLISECONDS);
                                            if (!doneParent.get() && queue.isEmpty()) {
                                                done.compareAndSet(true,false);
                                            }
                                            if (address != null) {
                                                sink.next(address);
                                            }
                                        } catch (InterruptedException e) {
                                            sink.error(e);
                                        }
                                    }
                                    if (!doneParent.get() && !sink.isCancelled()) {
                                        sink.complete();
                                    }
                                }),
                        FluxSink.OverflowStrategy.IGNORE)
                .subscribeOn(Schedulers.boundedElastic());
    }
}

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *