Implementación de suscriptores en C++: bailemos un poco más lejos de la estufa / Sudo Null IT News

En el último artículo, después de descartar todo lo que sabíamos del mundo de la “maldita empresa de TI” y vaciar la copa, “escribimos de rodillas” un simple notificador. Un objeto que llama a una cadena de clientes (suscriptores) prerregistrados cuando ocurre algún evento.

¿Qué pasa con los argumentos?

El notificador no admitía el envío de parámetros personalizados a los suscriptores; es hora de solucionar este problema convirtiéndolo en una plantilla. Deje que la lista de tipos de parámetros se especifique directamente al declarar el objeto notificador. No haremos ningún truco para determinar la forma óptima de pasar argumentos. Si el programador transfiere uno char cargado const char&habiendo usado un puntero completo de 8 bytes en el interior (para arquitectura de 64 bits) en lugar de un byte, bueno… Escríbale en la reseña. Nuevamente, puede especificar una referencia no constante como tipo de argumento, pero tenga en cuenta que ahora cualquier suscriptor de la cadena puede cambiar el valor del argumento y el siguiente verá el valor modificado. Mantengámoslo simple. Y espero que no se nos acuse aquí de que la simplicidad es peor que el robo.

E incluso un poco más rápido…

Enloquecido por la velocidad… Los programadores que desarrollan código de alto rendimiento pueden notar que usar un mutex por contenedor, incluso para iterar sobre él, es excesivo. Durante la iteración, nadie cambia el contenedor. ¿Por qué hay un candado aquí? Sí, el bucle cambia el contador de referencia para usar el objeto de suscripción, pero ¿tal vez este contador pueda volverse atómico? Luego, mutex se puede reemplazar con r/w lock o, en términos de C++, share_mutex. Y use un bloqueo compartido en el ciclo de entrega de eventos notify(), pero establezca un bloqueo exclusivo cuando sea necesario cambiar el contenedor, en los métodos subscribe() / unsubscribe(). ¿Cuándo podría notarse dicha optimización? En el caso de una gran cantidad de eventos para entregar a suscriptores y suscriptores muy rápidos, que, digamos, no hacen casi nada por dentro y, por lo tanto, no toman tiempo. Es decir, si la CPU fueraohLa mayoría de las veces es el bucle en notify() el que se ejecuta, no el código del suscriptor.

Ok, ¿en qué se convertirá el código? Como puede ver a continuación, no se ha hecho mucho más grande:

template <class ... Args> class notifier {
    ... // as before
private:
    struct subscription {
        std::function<void(Args ...)> m_callback;
        const sub_id_t m_id;
        std::atomic<unsigned> m_refs = 0;    // note! <- atomic
        std::condition_variable_any m_waiter;

        subscription(std::function<void(Args ...)> c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}
    };

public:
    sub_id_t subscribe(std::function<void(Args ...)> callback) { ... }

    ... // as before

    void notify(Args ... args) {
        std::shared_lock l{m_list_mtx};

        for (auto &s : m_list) {
            s.m_refs.fetch_add(1, std::memory_order_relaxed);
            l.unlock();

            try {
                s.m_callback(args ...);
            } catch (...) {
            }

            l.lock();
            // shared lock defences 's' object from deletion but not from
            // modifying the 'm_refs' counter from multiple threads
            if (s.m_refs.fetch_sub(1, std::memory_order_relaxed) == 1)
                s.m_waiter.notify_all();
        }
    }

private:
    sub_id_t m_next_id = 0;
    std::shared_mutex m_list_mtx;    // note! <- shared mutex
    std::list<subscription> m_list;
};

Código completo aquí – https://github.com/Corosan/subscribers/blob/main/src/notifs-2-1.cpp

Facilidad de uso

El artículo anterior ya mencionó el inconveniente de la implementación existente: la imposibilidad de cancelar la suscripción(…) directamente desde el suscriptor. Es decir, todo es posible en el mundo, pero este código simplemente se congelará. Es hora de arreglar esto.

Sería bueno, en lugar de un contador entero de subprocesos que ejecutan el código del suscriptor, almacenar una lista de todos los identificadores de los subprocesos correspondientes. Luego, al cancelar la suscripción, puede verificar si el identificador del hilo actual está en esta lista. En caso afirmativo, el suscriptor llama a cancelar la suscripción (…). ¿Qué hacer en este caso? En primer lugar, establezca alguna bandera que indique que ya no es necesario tocar a este suscriptor (por cierto, esto se puede hacer en la implementación anterior; de lo contrario, puede resultar que con un flujo denso de notificaciones sea imposible cancelar la suscripción). En segundo lugar, espere hasta que la lista de ID de subprocesos se reduzca a un elemento. Es decir, en él solo quedará nuestro hilo, en el que se llama cancelar suscripción(…). En tercer lugar… puede hacer lo mismo que en el caso anterior: eliminar el objeto de suscripción. Sin embargo, si el usuario lo usó para controlar la vida útil del suscriptor, tendremos un problema difícil de diagnosticar: usarlo después de la eliminación. Es decir, eliminaremos el objeto de suscripción, eliminará el funtor de suscriptor, lo que activará la eliminación de algunos otros objetos involucrados en el procesamiento del evento. ¡Pero aún no hemos abandonado el functor! No, no es necesario que hagas eso. Es mejor delegar la tarea de eliminar el objeto de suscripción en este caso al ciclo de notificación en notificar(…). Y sal tú mismo.

Reescribamos la implementación para que se ajuste a la lógica que acabamos de comentar. Sí, tendremos que decir adiós a la aceleración descrita anteriormente: ¿qué tipo de contadores de referencia atómica existen si en lugar de un contador ahora tenemos un contenedor completo de identificadores?

template <class ... Args> class notifier {
public:
    typedef int sub_id_t;

private:
    struct subscription {
        std::function<void(Args ...)> m_callback;
        const sub_id_t m_id;
        // ids of threads which currently execute this subscription's callback
        std::vector<std::thread::id> m_active_cycle_threads;
        std::condition_variable m_waiter;
        bool m_unsubscribe_from_callback = false;

        subscription(std::function<void(Args ...)> c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}
    };   

public:
    ... // as earlier

    bool unsubscribe(sub_id_t id) {
        std::unique_lock l{m_list_mtx};

        auto it = find_if(m_list.begin(), m_list.end(),
            (id)(auto& v){ return v.m_id == id; });

        if (it != m_list.end()) {
            auto& threads = it->m_active_cycle_threads;
            // Looking for current thread ID in a list of
            // currently executing the subscriber threads
            auto thread_it = find(threads.begin(), threads.end(),
                std::this_thread::get_id());

            if (thread_it == threads.end()) {
                // Trivial case when the unsubscribe operation is called not
                // from some subscriber's callback - as earlier
                it->m_waiter.wait(l, (&it, &threads){ return threads.empty(); });
                m_list.erase(it);
                return true;
            } else {
                // This subscription object will be removed by a notification
                // delivery cycle eventually, which has originated a call chain
                // yielded to this unsubscribe call.
                it->m_unsubscribe_from_callback = true;
                it->m_waiter.wait(l, (&it, &threads){ return threads.size() <= 1; });
                return true;
            }
        }
        return false;
    }

    void notify(Args ... args) {
        // using temporary list of items to be deleted - it allows to
        // defer object destruction to be executed not under lock
        std::list<subscription> garbage;
        std::unique_lock l{m_list_mtx};

        for (auto it = m_list.begin(); it != m_list.end(); ) {
            if (it->m_unsubscribe_from_callback) {
                // somebody tries to remove the subscription - don't touch it
                ++it;
                continue;
            }
            auto& threads = it->m_active_cycle_threads;

            // It's not a good to touch a heap allocator at this fast delivery
            // cycle. But an allocation inside this container is expected at
            // beginning phase only - the active threads list not going to grow
            // in future usually
            threads.push_back(std::this_thread::get_id());
            l.unlock();

            try {
                // Note that no std::forward<> optimization here because
                // arguments can't be forwarded into more than one subscriber -
                // all but the first one will get giglets
                it->m_callback(args ...);
            } catch (...) {
            }

            l.lock();
            threads.erase(
                find(threads.begin(), threads.end(), std::this_thread::get_id()));

            // If all callbacks have gone (no active threads registered inside
            // the subscription), issue a notification on the condition variable
            // for somebody who may wait on it inside an unsubscribe()
            // operation.
            // If the only thread is registered and a flag about pending
            // unsubscription is set, issue a notification for the only live
            // callback so it can return from the unsubscribe operation.
            if (threads.empty() || (threads.size() == 1 && it->m_unsubscribe_from_callback))
                it->m_waiter.notify_all();
            if (threads.empty() && it->m_unsubscribe_from_callback)
                garbage.splice(garbage.begin(), m_list, it++);
            else
                ++it;
        }
        // Note that garbage will be cleared after the m_list_mtx is unlocked
    }
    ... // as earlier
};

Algunos lectores pueden bufar con desdén (uf, justo en el código crítico, trabajando con la búsqueda lineal para eliminar un elemento), el identificador del hilo. Y una búsqueda lineal similar en cancelar suscripción (…) para ver si se llama en el contexto de entrega de notificación de notificar (…). Se anima a estos lectores a optimizar el almacenamiento y la recuperación de ID de subprocesos como ejercicio casero. Y escriba un punto de referencia para comparar. Algo me dice que no será más rápido. Después de todo, el contenedor m_active_cycle_threads no almacena una lista de todos los subprocesos en el sistema, y ​​no todos los subprocesos en los que se está ejecutando este proceso, sino solo los subprocesos que, por casualidad, están sirviendo al mismo suscriptor en este momento. No creo que haya más de una docena de ellos. Sólo si el propio suscriptor no es un peso pesado en el uso de la CPU (y en este caso, ¿de qué tipo de optimización estamos hablando?). Y las operaciones lineales en matrices de una docena de elementos, cuando estos elementos están ubicados uno al lado del otro en la memoria, son las operaciones más rápidas. La localidad de los datos desde el punto de vista de la memoria caché de la CPU juega aquí el papel más importante.

Entre bastidores todavía existe la cuestión de qué hacer si uno o más suscriptores fallan. Y lanzando una excepción. Mientras se traga tranquilamente. Quizás sería mejor hacerlo personalizable. Pero no estoy seguro de que exista una buena “solución general” además de registrar el error. Por lo tanto, la “personalización” aquí nuevamente puede generar un exceso de código sin un beneficio obvio.

¿Cómo usarlo?

Como en el artículo anterior, ¡es elemental! Pero escribamos una prueba que demuestre que cancelar la suscripción desde dentro de un suscriptor también funciona:

int main() {
    // no any mean in specifying int and char arguments except to check
    // that the code is correct. At least from compilation point of view
    notifier<int, char> s;

    // MT test where the same subscription is called from two threads and one of them tries to
    // unsubscribe while other works for some time
    int id1;
    const auto main_thread_id = std::this_thread::get_id();
    id1 = s.subscribe((&id1, &s, main_thread_id)(int, char){
        if (main_thread_id == std::this_thread::get_id()) {
            g_sync_logger() << "subscriber 3 started from thread 1";
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            g_sync_logger() << "subscriber 3 - try to unsubscribe";
            s.unsubscribe(id1);
            g_sync_logger() << "subscriber 3 finished on thread 1";
        } else {
            g_sync_logger() << "subscriber 3 started from thread 2";
            std::this_thread::sleep_for(std::chrono::seconds(1));
            g_sync_logger() << "subscriber 3 finished on thread 2";
        }});

    std::thread t = std::thread{(&s){ s.notify(4, 'd'); }};
    s.notify(5, 'e');

    t.join();

    verify(! s.unsubscribe(id1));
    verify(s.count() == 0);
}

Salida de la utilidad (tenga en cuenta que la cancelación de la suscripción se inicia en el primer hilo, pero espera a que el segundo termine de funcionar dentro del suscriptor):

$ ./notifs-2-2
86393.515 (51812) subscriber 3 started from thread 1
86393.515 (51813) subscriber 3 started from thread 2
86393.715 (51812) subscriber 3 - try to unsubscribe
86394.515 (51813) subscriber 3 finished on thread 2
86394.516 (51812) subscriber 3 finished on thread 1

Código completo para esta opción: https://github.com/Corosan/subscribers/blob/main/src/notifs-2-2.cpp

Bueno, resultó ser una implementación bastante aceptable sin un montón de programación de metaplantillas (toda la clase de notificación junto con los comentarios) en 130 líneas. Esto debería ser suficiente para la mayoría de las tareas típicas de código de producción. Y es importante señalar que este código es relativamente simple y comprensible para la mayoría de los especialistas. Desventaja: aquí no hay seguridad laboral :).

En el próximo artículo saltaremos esta línea y la haremos un poco más “moderna” y “académica”. Al mismo tiempo, elevando el listón para aquellos que quieran cambiarlo :). A menudo resulta difícil detenerse en esta dirección. Espero que el tono sarcástico de la última palabra sugiera que este camino es resbaladizo y que es mejor no seguirlo. Al menos en código real, que se escribe en grupo con colegas que también lo leen y lo apoyan. Bueno, en casa para la promoción. ChSV calificaciones – Recomiendo ampliamente los libros:

  1. Ivan Čukić. Programación funcional en C++. https://www.manning.com/books/functional-programming-in-c-plus-plus

  2. David Abrahams, Aleksey Gurtovoy. Metaprogramación de plantillas de C++: conceptos, herramientas y técnicas de Boost y más allá. ISBN 0-321-22725-5.

Publicaciones Similares

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *