JavaScript Multithreading with SharedArrayBuffer and Atomics: Basics

They help here SharedArrayBuffer And Atomics.

SharedArrayBuffer

SharedArrayBuffer – this is special a type of buffer that allows multiple threads to share the same block of memory. Unlike the same default ArrayBufferwhich is only accessible by one thread, SharedArrayBuffer gives a shared memory area that can be accessed by multiple worker threads.

Internally SharedArrayBuffer is a fixed memory block. size that can be used to store binary data. It is used in conjunction with typed arrays such as Uint8Array or Int32Array.

But before working with SharedArrayBuffer, it is very important to use certain requirements for setting up the environment – setting up HTTP headers.

COOP The header allows you to protect the site from cross-origin attacks. It must be set to the value same-origin:

Cross-Origin-Opener-Policy: same-origin

COEP The header prevents the site from being embedded in other documents. It should be set to the value require-corp or credentialless:

Cross-Origin-Embedder-Policy: require-corp

Now let's look at how to create and pass a SharedArrayBuffer between threads.

Creating a SharedArrayBuffer

Constructor SharedArrayBuffer is used to create a buffer of a certain size in bytes. It takes two parameters:

  • length: Buffer size in bytes.

  • options (optional): an object with parameters, such as maxByteLengthwhich defines the maximum buffer size.

const sab = new SharedArrayBuffer(1024);
const growableSab = new SharedArrayBuffer(8, { maxByteLength: 16 });

SharedArrayBuffer Methods

grow allows you to increase the size SharedArrayBufferif it was created with the parameter maxByteLength. The new size must be less than or equal to maxByteLength:

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });

if (buffer.growable) {
  buffer.grow(12);
}

slice returns new SharedArrayBuffercontaining a copy of the bytes from the original buffer at index start (inclusive) up to end (exclusively).

const sab = new SharedArrayBuffer(1024);
const slicedSab = sab.slice(2, 100);

This allows you to create buffers based on subsets of data from the original buffer without changing the original.

SharedArrayBuffer Properties

byteLength returns length SharedArrayBuffer in bytes. This value is set when the buffer is created and cannot be changed.

const sab = new SharedArrayBuffer(1024);
console.log(sab.byteLength); // 1024

Property growable indicates whether it can SharedArrayBuffer change your size. If the buffer was created with the parameter maxByteLengththis property will be equal true.

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
console.log(buffer.growable); // true

maxByteLength returns the maximum buffer size in bytes that was set when the buffer was created.

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
console.log(buffer.maxByteLength); // 16

Atomics

Atomics is a built-in object in JS that provides a set of static methods for performing atomic operations. Atomic operations are performed as a single, indivisible action, this is how atomics avoids data races.

Basic Atomics methods:

Atomics.add(typedArray, index, value)
Atomically adds a value to an array element at the given index and returns the previous value:

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
Atomics.add(int32, 0, 5); // добавляет 5 к int32[0]

Atomics.sub(typedArray, index, value)
Atomically subtracts a value from an array element at a given index and returns the previous value:

Atomics.sub(int32, 0, 2); // вычитает 2 из int32[0]

Atomics.load(typedArray, index)
Atomically reads the value of an array element at a given index:

let value = Atomics.load(int32, 0); // считывает значение int32[0]

Atomics.store(typedArray, index, value)
Atomically writes a value to an array element at a given index:

Atomics.store(int32, 0, 10); // Записывает 10 в int32[0]

Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)
Atomically compares the current value of an array element at a given index with the expected value. If they are equal, then writes the new value and returns the old one:

Atomics.compareExchange(int32, 0, 10, 20); // если int32[0] равно 10, то записывает 20 и возвращает старое значение

Atomics.exchange(typedArray, index, value)
Atomically writes a value to an array element at a given index and returns the old value:

Atomics.exchange(int32, 0, 15); // записывает 15 в int32[0] и возвращает старое значение

Atomics.wait(typedArray, index, value, timeout)
Checks the value of an array element and waits for it to change until the specified value or timeout expires. Returns “ok”, “not-equal” or “timed-out”:

let result = Atomics.wait(int32, 0, 10, 1000); // ожидает изменения int32[0] до 10 или 1000 мс

Atomics.notify(typedArray, index, count)
Notifies threads waiting to change the array element at the given index. Returns the number of notified threads:

let notified = Atomics.notify(int32, 0, 1); // уведомляет один поток, ожидающий изменения int32[0]

Atomics can be used for synchronous operations. For example, to synchronize the work of threads:

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);

// в главном потоке
const worker = new Worker('worker.js');
worker.postMessage(sab);

Atomics.store(int32, 0, 0);
Atomics.wait(int32, 0, 0); // ожидание изменения int32[0]

// В рабочем потоке (worker.js)
self.onmessage = function(event) {
    const int32 = new Int32Array(event.data);
    Atomics.store(int32, 0, 1);
    Atomics.notify(int32, 0, 1); // уведомление главного потока
};

Combining Atomics and SharedArrayBuffer

Real time request counter

Let's say there is a web service that processes a large number of requests, and we need to monitor the number of active requests in real time:

// главный поток
const buffer = new SharedArrayBuffer(4); // Создаем буфер на 4 байта
const counter = new Int32Array(buffer);

const worker = new Worker('worker.js');
worker.postMessage(buffer);

// обрабатываем новый запрос
function handleRequest() {
    Atomics.add(counter, 0, 1); // увеличиваем счетчик
    console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);
    
    // симулируем завершение запроса через 2 секунды
    setTimeout(() => {
        Atomics.sub(counter, 0, 1); // уменьшаем счетчик
        console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);
    }, 2000);
}

// рабочий поток (worker.js)
self.onmessage = function(event) {
    const counter = new Int32Array(event.data);
    setInterval(() => {
        console.log(`[Worker] Активные запросы: ${Atomics.load(counter, 0)}`);
    }, 1000);
};

We use SharedArrayBuffer to store the counter of active requests, and Atomics allows us to safely increment and decrement the counter from different threads.

Parallel data processing

Consider the problem of parallel processing of a large array of data, for example, applying a function to each element of the array:

/// главный поток
const buffer = new SharedArrayBuffer(1024 * 4); // буфер для 1024 чисел
const data = new Int32Array(buffer);

// инициализируем массив случайными числами
for (let i = 0; i < data.length; i++) {
    data[i] = Math.floor(Math.random() * 100);
}

const worker = new Worker('worker.js');
worker.postMessage(buffer);

function processData() {
    for (let i = 0; i < data.length; i++) {
        data[i] *= 2; // применяем функцию к каждому элементу
    }
    console.log('Обработка данных завершена');
}

processData();

// рабочий поток (worker.js)
self.onmessage = function(event) {
    const data = new Int32Array(event.data);
    for (let i = 0; i < data.length; i++) {
        Atomics.store(data, i, Atomics.load(data, i) * 2); // атомарное умножение на 2
    }
    console.log('Рабочий поток: обработка данных завершена');
};

State synchronization between threads

Let's say there is some kind of toy where several threads need to synchronize their state:

// главный поток
const buffer = new SharedArrayBuffer(4); // буфер на 4 байта для хранения состояния
const state = new Int32Array(buffer);

const worker = new Worker('worker.js');
worker.postMessage(buffer);

function updateState(newState) {
    Atomics.store(state, 0, newState); // обовляем состояние
    Atomics.notify(state, 0); // уведомляем рабочий поток об изменении состояния
    console.log(`Состояние обновлено: ${newState}`);
}

updateState(1);

// рабочий поток (worker.js)
self.onmessage = function(event) {
    const state = new Int32Array(event.data);

    function waitForStateChange() {
        Atomics.wait(state, 0, Atomics.load(state, 0)); // ожидаем изменения состояния
        console.log(`Рабочий поток: состояние изменено на ${Atomics.load(state, 0)}`);
        waitForStateChange(); // рекурсивно продолжаем ожидание изменений
    }

    waitForStateChange();
};

My colleagues from OTUS talk about popular programming languages ​​and practical tools as part of online courses. By this link you can view the full catalog of courses, as well as sign up for open lessons.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *