Updating system state via Server-Sent Events (SSE) without fuss

In any multi-user project, sooner or later there is a need to promptly notify clients about changes in objects in the system. In this article I will not consider something complex, such as joint editing of documents, but will write about a simpler situation that is found everywhere.

Let’s say we have a list of objects, for example a list of episodes for a show. One client looks at it, and the other adds another episode to the list. It would be nice to immediately display the fact of adding a new episode to the first client. The same applies to viewing information about an individual episode: if one client is viewing it, and another is editing it, it would be great to immediately display the editing result for the first one.

To solve this problem, it is very convenient to use the Server-Sent Events (SSE) mechanism. I want to talk about how to do this in projects that have Node on the back and React on the front.

Before writing code…

…let’s remember that we usually use CRUD operations when working with objects in the system. To notify about a change in the state of an object, we will most likely need only three of them: creation, update, deletion. These three operations can be divided into two groups:

The differences are that an existing object is updated and deleted and the client knows its identifier. The client can indicate this identifier to the server when it sends a request to receive information about changes in its state. Having received such a request, the server will be able to determine its eligibility within the current access model because both the client and the object are explicitly defined.

When creating an object, the most that the client can tell the server is its type. But the object type alone will most likely not be enough to determine access eligibility, because It is unlikely that all users on your system have access to all objects. In addition, you are unlikely to want to allow any user to obtain information about the creation of any objects on the system (unless he is a superuser).

Fortunately, objects in the system usually do not hang out on their own, but can be grouped according to some characteristic. In the example in the introduction, episodes are grouped by show, and shows are grouped by the user who created them. The same objects can often be grouped according to different criteria, which depend on the semantics of the system. Those. In general, we can say that objects are placed in containers that we can somehow identify in the system. In the example above, for episodes, the container is the show (which has its own ID), and for shows, the container is the user who created them.

Then, when it comes to tracking the creation of an object, the client can tell the system its type and the ID of the container in which it expects to appear. In this case, the server can already check access rights. And in general, usually, you need to track the appearance of an object in the context of some container, so this scheme is natural.

Why Server-Sent Events (SSE)?

When the described task first arose in my project, I immediately implemented the dumbest option – periodically polling information about objects. My heart bleeds when looking at the logs from the realization of how much traffic and server resources were wasted. The only consolation was that, anyway, the overall load was not so high as to start worrying. However, it was clear that it was impossible to live like this and something had to be done about it.

In the modern world, there are three ways to solve this issue:

  1. Polling: Short or Long

  2. Server-Sent Events (SSE)

  3. WebSockets

As a real hipsta pogromist, of course, the first thing I decided was that I needed to do something big and immediately grabbed WebSockets. I advise you to do the same (no!).

At first it was quite exciting to write your own protocol, authorization, subscription system, dispatching, connection restoration and that’s all. However, it soon became clear that the resulting system was starting to look like some kind of home-grown message broker, and it would take a lot of effort to marry it with the frontend. And I became depressed… for about a day, until I remembered that there were alternatives, or rather, an alternative.

Honestly, if you choose between Polling and SSE, it’s not very clear why you should choose the first one. This is an older method, which is now recommended only as a fallback option. Therefore, we will not even focus on this point.

Making a server

I’m going to add two endpoints: to receive notifications about changes in the state of an object and a list of objects. Before writing the controllers, let’s add the following file sse.ts, which implements work with the corresponding lists of clients. It’s pretty simple. Let me just draw your attention to the fact that I send a custom event with each alert and duplicate it in the data. This will allow the client to parse events in different ways depending on needs. But more on that later.

import { Response } from 'express';
import { TEpisode } from '../../../types';
import logger from '../../../utils/logger';

export type TSseClient = {
    res: Response;
    id: number; // идентификатор объекта или контейнера
};

const itemClients: TSseClient[] = []; // клиенты, которые слушают конкретный объект
const listClients: TSseClient[] = []; // клиенты, которые слушают список объектов в контейнере

export const add = (type: 'item'|'list', client: TSseClient) => {
    const clients = type === 'item' ? itemClients : listClients;
    clients.push(client);
    logger.debug(`Добавлен новый клиент в список "${type}". Длина списка: ${clients.length}`);
};

export const remove = (type: 'item'|'list', res: Response) => {
    const clients = type === 'item' ? itemClients : listClients;
    const index = clients.findIndex(client => client.res === res);
    if(index >= 0) {
        clients.splice(index, 1);
        logger.debug(`Удален клиент из списка "${type}". Длина списка: ${clients.length}`);
    }
};

export const notify = (event: 'create'|'update'|'remove', episode: TEpisode) => {
    try {
        // Разошлем всем слушателям списка
        listClients.forEach(client => {
            if(client.id === episode.show_id) {
                client.res.write(`event: ${event}\n`);
                client.res.write(`data: ${JSON.stringify({ event, episode })}\n\n`);
            }
        });
        // Создание объекта рассылается только слушателям списка
        if(event === 'create') return;
        // Разошлем всем слушателям эпизода
        itemClients.forEach(client => {
            if(client.id === episode.id) {
                client.res.write(`event: ${event}\n`);
                client.res.write(`data: ${JSON.stringify({ event, episode })}\n\n`);
            }
        });
    } catch(error) {
        logger.error('SSE notify error\n%o', error);
    }
};

export default { add, remove, notify };

As you can see from the code, it is designed to send out information about episodes that can be grouped in a unique way. Similar code will be required to work with other objects and grouping methods. In this case, it will coincide with the above by 99%. Therefore, it makes sense to slightly modify the code so that it can be used with any types of objects and containers and not engage in stupid copy-paste. Here, however, I will not cite these changes so as not to complicate the presentation.

This is what a controller might look like to notify about a change in the state of an object (in our case, an episode).

import { NextFunction, Request, Response } from 'express';
import HttpException from '../../../exceptions/http.exception';
import pool from "../../../db/postgres";
import logger from '../../../utils/logger';
import { TEpisode } from '../../../types';
import sse, { TSseClient } from './sse';
/**
 * Запросы на прослушивание изменения эпизода
 */
export const sseItem = async (req: Request, res: Response, next: NextFunction) => {
    // Нужен залогиненый юзер
    if(!req.user) {
        return next(new HttpException(401));
    }
    // Берем идентификатор из запроса
    const episode_id = parseInt(req.params.id);
    // Проверим идентификатор
    if(isNaN(episode_id)) {
        return next(new HttpException(400));
    }
    // Лезем в базу и т.д.
    try {
        const result = await pool.query('SELECT * FROM episodes WHERE id=$1', [ episode_id ]);
        if(result.rows.length === 0) {
            return next(new HttpException(404));
        } 
        
        const episode = result.rows[0] as TEpisode;
        // Только суперпользователь и сам пользователь имют право читать информацию
        if(!req.user.is_root && req.user.id !== episode.owner_id) {
            return next(new HttpException(403));
        }
        
        // Отправим заголовок SSE
        const headers = {
            'Content-Type': 'text/event-stream',
            'Connection': 'keep-alive',
            'Cache-Control': 'no-cache',
        };
        res.writeHead(200, headers);
        
        // Установим задержку переподключения в 1 секунду
        res.write(`retry: 1000\n`);
        // Отошлем актуальное состояние объекта
        res.write(`event: update\n`);
        res.write(`data: ${JSON.stringify({ event: 'update', episode })}\n\n`);
        
        // Добавим клиента в список
        const newClient: TSseClient = {
            res,
            id: episode.id,
        };
        sse.add('item', newClient);
        
        // При обрыве соединения удалим клиента из списка
        req.on('close', () => {
            logger.debug(`SSE connection closed for ${req.user?.email}`);
            sse.remove('item', res);
        });

    } catch(error) {
        logger.error('%o', error);
        return next(new HttpException(500, 'Server error', 'Ошибка сервера', error));
    }
};

export default sseItem;

One of the big advantages of SSE is that you can use the same client authorization mechanism as for other requests. For this purpose, I wrote a small middleware that pulls an authorization token from the request header, requests information about the logged-in user from Redis and places it in the user field in the request. You can then use this data to check access rights: in this case, the object can be accessed by its owner and superuser.

Please note that immediately after connection, information about the current state of the object is sent to the client. This was done to make it easier for the frontend to handle the situation with the connection being reset when switching browser tabs. If the user leaves the tab with your application, the browser will close all SSE connections and, accordingly, updates that occur before the user returns to the tab will be lost. As soon as the application tab is reactivated, the browser will automatically restore the SSE connection, through which the application will immediately receive the current state of the object. With this approach, we don’t need to bother with event identifiers and saving the history of object changes.

The notification controller about changes to objects in the container is very similar to the previous one. The difference is that when connecting, the current list of objects is immediately transmitted. Then events will be sent with changes to objects from the list.

import { NextFunction, Request, Response } from 'express';
import HttpException from '../../../exceptions/http.exception';
import pool from "../../../db/postgres";
import logger from '../../../utils/logger';
import { TEpisode, TShow } from '../../../types';
import sse, { TSseClient } from './sse';
/**
 * Запросы на прослушивание изменения эпизодов в конкретном шоу
 */
export const sseList = async (req: Request, res: Response, next: NextFunction) => {
    // Нужен залогиненый юзер
    if(!req.user) {
        return next(new HttpException(401));
    }
    // Берем идентификатор из запроса
    const show_id = parseInt(req.params.id);
    // Проверим идентификатор
    if(isNaN(show_id)) {
        return next(new HttpException(400));
    }
    // Лезем в базу и т.д.
    try {
        let result = await pool.query('SELECT * FROM shows WHERE id=$1', [ show_id ]);
        if(result.rows.length === 0) {
            return next(new HttpException(404));
        } 
        
        const show = result.rows[0] as TShow;
        // Только суперпользователь и сам пользователь имют право читать информацию
        if(!req.user.is_root && req.user.id !== show.owner_id) {
            return next(new HttpException(403));
        }
        
        // Отправим заголовок SSE
        const headers = {
            'Content-Type': 'text/event-stream',
            'Connection': 'keep-alive',
            'Cache-Control': 'no-cache',
        };
        res.writeHead(200, headers);

        // Установим задержку переподключения в 1 секунду
        res.write(`retry: 1000\n`);

        // Прочитаем из базы актуальный список объектов
        result = await pool.query('SELECT * FROM episodes WHERE show_id=$1 ORDER BY name', [ show_id ]);
        const episodes = result.rows as TEpisode[];
        // и отошлем его клиенту
        res.write(`event: list\n`);
        res.write(`data: ${JSON.stringify({ event: 'list', episodes })}\n\n`);

        // Добавим клиента в список
        const newClient: TSseClient = {
            res,
            id: show.id,
        };
        sse.add('list', newClient);
        
        // При обрыве соединения удалим клиента из списка
        req.on('close', () => {
            logger.debug(`SSE connection closed for ${req.user?.email}`);
            sse.remove('list', res);
        });

    } catch(error) {
        logger.error('%o', error);
        return next(new HttpException(500, 'Server error', 'Ошибка сервера', error));
    }
};

export default sseList;

This is practically all that needs to be done on the server side. There are mere little things that remain. First, add alert calls to the object creation, deletion and modification controllers:

import sse from './sse/sse';
import { TEpisode } from '../../types';
let episode: TEpisode;
...
// В контроллере создания
sse.notify('create', episode);
...
// В контроллере удаления
sse.notify('remove', episode);
...
// В контроллере модификации
sse.notify('update', episode);

Secondly, register the router. And that’s all, you can move on to the frontend.

Making a client

A React client simply asks for a hook, which might look something like this:

import { useEffect, useState } from 'react';
import { useAppSelector } from './store';
import { API_URL } from '../config';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import { TEpisode } from '../types';

type TSseMessage = {
	event: 'list'|'create'|'update'|'remove',
	episode?: TEpisode,
	episodes?: TEpisode[],
};

const useSse = (method: string) => {
	const token = useAppSelector(store => store.auth.token);
	const [ message, setMessage ] = useState<TSseMessage|null>(null);

	useEffect(() => {
		const controller = new AbortController();
		const { signal } = controller;

		const fetchData = async () => {
			await fetchEventSource(`${API_URL}/${method}`, {
				headers: {
					'Authorization': `Bearer ${token}`,
				},
				onmessage(event) {
					try {
						const parsedData = JSON.parse(event.data);
						setMessage(parsedData as TSseMessage);
					} catch(error) {
						console.error('useSse parsing error');
					}	
				},
				signal,
			});
		};

		fetchData();
		return () => controller.abort();

	}, [ method, token ]);

	return message;
};

export default useSse;

The important thing here is that instead of the standard EventSource used Fetch Event Source and here’s why: the fact is that EventSource does not allow you to set request headers, and without this it is impossible to use standard authorization using tokens.

As in the server description, here we believe that we are not going to track anything other than episodes. It is not difficult to modify the code for the general case (by adding error handling to it at the same time).

The component displaying the episode might look something like this:

import { useEffect } from 'react';
import { useParams } from 'react-router-dom';
import { useAppDispatch, useAppSelector } from '../hooks/store';
import { episodeSet } from '../store/episodeSlice';
import useSse from '../hooks/sse';

const Episode = () => {
	const dispatch = useAppDispatch();
	const episode = useAppSelector(store => store.episode);
	const { episode_id } = useParams();
	const message = useSse(`sse/episodes/${episode_id}`);

    useEffect(() => {
		if(!message || message.event !== 'update' || !message.episode) return;
		dispatch(episodeSet(message.episode));
	}, [ message, dispatch ]);

    if(!episode) return null;
  
	return (
		<div>
			{episode.name}
		</div>
	);
};

export default Episode;

Very simple! It is practically no different from a typical API request, but at the same time it always displays the current state of the object.

To display a list of objects, the component is not much more complicated. Please note that it is convenient to parse the event in the component itself, and that is why we added it to the data when we wrote the server.

import { FC, useEffect, useState } from 'react';
import { useAppDispatch, useAppSelector } from '../../hooks/store';
import { episodesAdd, episodesDel, episodesPut, episodesSet } from '../../store/episodesSlice';
import useSse from '../../hooks/sse';

type EpisodeListProps = {
    showId: number;
};

const EpisodeList: FC<EpisodeListProps> = ({ showId }) => {
	const dispatch = useAppDispatch();
	const episodes = useAppSelector(store => store.episodes);
	const message = useSse(`sse/episodes/shows/${showId}`);

	useEffect(() => {
		if(!message) return;
		switch(message.event) {
			case 'list':
				if(message.episodes) dispatch(episodesSet(message.episodes));
				break;
			case 'create':
				if(message.episode) dispatch(episodesAdd(message.episode));
				break;
			case 'remove':
				if(message.episode) dispatch(episodesDel(message.episode.id));
				break;
			case 'update':
				if(message.episode) dispatch(episodesPut(message.episode));
				break;
			default:
				return;
		}
	}, [ message, dispatch ]);

	return (
		<ul>
			{episodes.map(episode => <li key={episode.id}>{episode.name}<li/>)}
		</ul>
	);
};

export default EpisodeList;

Hooray! Hooray! This is all! No 🙁

Unfortunately, there are nuances

The most unpleasant of them is the greed of browsers. Typically they do not allow more than 6 SSE connections to be established with one server. And not in one tab, but in general. Six connections is extremely few.

To solve this problem, two ways are usually proposed:

  1. Using LocalStorage

  2. Using Service Workers

The second method personally seems preferable to me.

Another nuance is related to the fact that if the connection is lost, the browser will begin to diligently try to restore it, and sometimes for some reason it completely ignores the reconnection delay specified by the server. The solution to this problem is complex. First, you need to understand why the connection was broken. If this is a client error, then the connection most likely was not established immediately (and by the way, the reconnection delay setting may not have even been transmitted). In this case, the client should stop knocking on closed doors and take action. Perhaps the authorization token has ceased to be valid and the user needs to be re-authorized, perhaps an incorrect identifier was sent, etc.

If the cause of the interruption was a sudden crash of the server, then it is worth increasing the reconnection delay yourself, perhaps even making it progressive if the server does not respond for a long time. Or it may simply give an error to the user and refuse to continue working. It all depends on the system.

But now, perhaps, that’s all.

The picture for the article was generated by the Kandinsky 2.2 bot

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *