We understand using non-blocking I / O in Linux OS. We write an example of a server on bare system calls

How do you usually write a server, if you don’t really care about performance? The program starts, then starts accepting incoming connections from clients, and for each client it starts a new thread that services this client. If you use any, God forgive me, Spring or Flask or Poco there, then it does something inside itself – except that threads can be reused, that is, taken from a certain pool. This is all quite convenient, but not very effective. Chances are your client-serving threads don’t live long, and most of the time they’re either waiting to receive data from the client or send it to the client—that is, waiting for system calls to return. Creating an OS thread is quite expensive, as is context switching between OS threads. If you want to be able to serve many customers effectively, you need to come up with something else. For example, callbacks. But it’s pretty inconvenient.

Another option is to use nonblocking I/O in combination with some implementation of user-space threads (user-space threads or fibers). In this article, I will show you how to write all this with your own hands.

All code is in repositories. There are 3 branches: good-old-one-thread-impl contains the original implementation, hand-context is a version with manual context switching and the implementation of fiber-local variables, the remaining two branches contain an attempt to make this implementation work in several OS threads. All code is provided only as proof-on-concept and contains errors.

Introductory concepts

What are user-space streams? These are threads that the OS does not take part in switching and about which it knows nothing. They can all work in one OS thread or in several (like goroutines in Go, virtual threads in Java 19). These threads implement the idea of ​​cooperative multitasking: a thread can only be switched if it asks for it. In our case, a thread will be terminated when it is waiting for some input or output – while it is waiting for it, other threads will run.

What is the non-blocking I/O interface in Linux? Usually system calls are used for input and output. read And write. There are others, but we will consider these ones, since they are the most common: with the help of them you can work with files, networks, pipes, so you can read signals (see signalfd). Typically, such system calls block until data is read or written, but the file descriptor can be configured to be non-blocking – then the system calls read And write will not block – and if the data cannot be read or written, an error is returned EAGAIN. It remains to somehow learn to understand which file descriptors are ready for I / O, so as not to try them blindly.

There are several such mechanisms in Linux: select(2), poll(2), epoll(7). Each of them provides an opportunity to tell the OS kernel which file descriptors we are interested in, and the kernel will tell us which ones are ready for I / O. select(2) is deprecated and I will use poll(2).

Thread Implementation

At first, we will use boost::context to implement streams, but then we will get rid of it. The boost::context library allows us to switch contexts. It remains for us to write some wrapper over the context and write some simple thread scheduler.

The execution context (represented by boost::context) is the state of some execution yet, that is, the values ​​of the registers and the stack.

The implementation of the thread will be some kind of class that stores the actual context, the function that the thread performs, information about whether the execution was started, completed, or the thread is ready for execution.

FiberImpl class
class FiberImpl: public std::enable_shared_from_this<FiberImpl> {
// внутри всё приватное, потому что я для удобства напишу бёртку над 
// этой реализацией
private:
    explicit FiberImpl(const std::function<void()>& func);
    // тут ещё нужен, конечно, конструктор, принимающий function&&
    void join();
    bool isFinished() const;
    void start();
    bool isReady() const;
    friend class Fiber;
    friend class FiberManager;
    friend class CondVar;
    friend void sched_execution();
    void continue_executing();
    void suspend();

    std::function<void()> func;
    // контекст, который описывает состояние потока перед завершением
    continuation this_context;
    // контекст, выполнфвшийся перед выполнением потока
    continuation previous_context;
    // condition variable, на котором можно ждать завершения потока
    CondVar finish_cv;
    bool launched = false;
    bool finished = false;
    bool is_ready = false;
};

Of greatest interest is how the thread should be started. When the thread starts, call the callcc function from boost::context, which takes a lambda function and returns the context in which the passed function is executed (callcc is returned when the lambda function wants to switch context). The lambda function must accept the context from which it was called – with its help, it can switch back to it.

When the thread needs to be put on execution for the second time, you just need to execute the resume method of the context – it starts executing this context, and when it wants to switch back, the function will return the new state of this context.

starting a thread
void FiberImpl::continue_executing() {
    if (!launched) {
        launched = true;
        this_context = callcc([&](auto sink) {
            cerr << "starting func in new fiber\n";
            previous_context = std::move(sink);
            func();
            finished = true;
            finish_cv.notify_all();
            return std::move(previous_context);
        });
    } else {
        this_context = this_context.resume();
    }
}

Implementation of other methods FiberImpl pretty trivial.

trivial functions
extern FiberManager fiberManager;

extern std::shared_ptr<FiberImpl> current_fiber;

FiberImpl::FiberImpl(const std::function<void()> &func) {
    this->func = func;
}

void FiberImpl::join() {
    while (!finished) {
        finish_cv.wait();
    }
}

void FiberImpl::start() {
    fiberManager.registerFiber(this->shared_from_this());
    is_ready = true;
}

void FiberImpl::suspend() {
    previous_context = previous_context.resume();
}

void sched_execution() {
    current_fiber->suspend();
}

void startFiberManager() {
    fiberManager.work();
}

The thread manager must be able to store a list of threads that are ready to run and start those threads.

flow manager
using std::shared_ptr;

void startFiberManager();

class FiberManager {
    friend class Fiber;
    friend class FiberImpl;
    friend class CondVar;
    friend void sched_execution();
    void work();
    void registerFiber(const shared_ptr<FiberImpl>& fiber_ptr);

    list<shared_ptr<FiberImpl>> ready_fibers;

    friend void startFiberManager();
};

// обратите внимание, что менеждер хранит только готовые к выплнению 
// потоки. остальные должны храниться ещё где-то
FiberManager fiberManager;

std::shared_ptr<FiberImpl> current_fiber;

void FiberManager::work() {
    while (!ready_fibers.empty()) {
        auto iterator = this->ready_fibers.begin();
        while (iterator != ready_fibers.end()) {
            current_fiber = *iterator;
            if (current_fiber->isReady()) {
                current_fiber->continue_executing();
            }
            if (current_fiber->isFinished()) {
                iterator = ready_fibers.erase(iterator);
            } else {
                iterator++;
            }
        }
    }
}

void FiberManager::registerFiber(const shared_ptr<FiberImpl>& fiber_ptr) {
    ready_fibers.push_back(fiber_ptr);
}

Class FiberImpl not very convenient to use: you need to create shared_ptr, the function must be of type void(void)and still need to call .start() . Let’s write a simple wrapper over it.

wrapper
class Fiber {
public:
    template<typename Callable, typename... Args>
    explicit Fiber(const Callable& function, const Args&... args) {
        fiber_ptr = shared_ptr<FiberImpl>(new FiberImpl([&] () {
            function(args...);
        }));
        fiber_ptr->start();
    }

    void join() {
        fiber_ptr->join();
    }

    void detach() {
        fiberManager.work();
    }

    bool isFinished() {
        return fiber_ptr->isFinished();
    }

    bool isReady() {
        return fiber_ptr->isReady();
    }

private:
    shared_ptr<FiberImpl> fiber_ptr;
};

It remains to write some synchronization primitives.

condition variable
class FiberImpl;

class CondVar {
public:
    void wait();
    void notify_one();
    void notify_all();

private:
    std::vector<shared_ptr<FiberImpl>> waiters;
};

The interface is trivial. Inside we store a list of threads that are waiting on this condition variable.

extern FiberManager fiberManager;
extern std::shared_ptr<FiberImpl> current_fiber;

void CondVar::wait() {
    waiters.push_back(current_fiber);
    current_fiber->is_ready = false;
    current_fiber->suspend();
}

void CondVar::notify_one() {
    if (!waiters.empty()) {
        auto fiber_ptr = *waiters.rbegin();
        fiber_ptr->is_ready = true;
        waiters.pop_back();
    }
}

void CondVar::notify_all() {
    for (auto& fiber_ptr : waiters) {
        fiber_ptr->is_ready = true;
    }
    waiters.clear();
}

When a thread wants to wait, it is added to the list, marked as not ready to run, and suspended. When someone wants to wake up one or more threads, they are marked as ready to run and removed from the list.

Please note that so far, all our threads are executing in one OS thread, and they switch at points in time controlled by us, so there is no need to think about some kind of synchronization inside the condition variable.

The second thing you can pay attention to is that usually they wait on the condition variable by taking the mutex, and the condition variable releases it and then captures it again. However, in conditions of cooperative multitasking, we do not need a mutex at all. Also, in fact the system call futex, with the help of which the mutex is implemented, is just a kind of condition variable. Also, unlike std::condition_variablewe cannot have any spurious wake ups.

Nonblocking I/O

Now we need to write an implementation of waiting for file descriptors to be ready for I/O. There will be some separate thread that will execute the poll system call to get the information that the file descriptor is ready.

A thread that wants to wait for a file descriptor to be ready will create a condition variable, save its I/O wait request, and sleep on that condition variable.

implementation
struct FdRequest {
    CondVar cv;
    int fd;
    short events;
};

class Waiter {
public:
    Waiter();

    static int wait(int fd, short events);

    static void stop();

    static void loop();

private:

    CondVar cv;
    unordered_map<int, FdRequest*> map;
    bool stopped = false;
};
Waiter waiter;

int Waiter::wait(int fd, short events) {
    FdRequest fdRequest{.cv = CondVar(), .fd = fd, .events = events};
    waiter.map[fd] = &fdRequest;
    waiter.cv.notify_one();
    fdRequest.cv.wait();
    return fdRequest.events;
}

Waiter::Waiter() {
    std::cout << "waiter initialising" << std::endl;
}

[[maybe_unused]] Fiber fiber(Waiter::loop);

void Waiter::stop() {
    waiter.stopped = true;
}

void Waiter::loop() {
    while (!waiter.stopped) {
        while (waiter.map.empty()) {
            waiter.cv.wait();
        }
        std::vector<pollfd> request;
        request.reserve(waiter.map.size());
        for (auto& elem : waiter.map) {
            request.push_back(pollfd{.fd = elem.first, 
              .events = elem.second->events, 
              .revents = 0});
        }
        int ret = poll(&request[0], request.size(), 100);
        if (ret < 0) {
            printf("poll returned with error %s", strerror(errno));
            continue;
        }
        for (auto& elem : request) {
            if (elem.revents > 0) {
                waiter.map[elem.fd]->cv.notify_one();
                waiter.map.erase(elem.fd);
            }
        }
        sched_execution();
    }
}

We write the server

Now we can write the server. It will work like this: at startup, a thread will be launched that will create a socket, bind it to a port, and then start accepting connections from clients on it. However, before calling accept to accept connections, use a Waiter to wait for incoming connections.

When a new connection appears, a new thread will be launched, which will wait for input from the client, and then write back to the socket exactly what was read.

implementation
void worker(int fd) {
    printf("work called with fd: %d\n", fd);
    char buf[1024];
    while (true) {
        Waiter::wait(fd, POLLIN);
        ssize_t n = read(fd, buf, sizeof(buf));
        if (n == 0) {
            printf("client finished, leaving\n");
            return;
        }
        if (n < 0) {
            printf("in worker error: %s\n", strerror(errno));
            return ;
        }
        int wrote = 0;
        while (wrote < n) {
            Waiter::wait(fd, POLLOUT);
            ssize_t m = write(fd, buf + wrote, n - wrote);
            if (m < 0) {
                printf("in worker error: %s\n", strerror(errno));
                return ;
            }
            wrote += m;
        }
    }
}

int main() {
    Fiber main_fiber([] () {
        std::cout << "main enetered" << endl;
        Fiber global_fiber([]() {
            int socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if (socket_fd < 0) {
                printf("socket error: %s\n", strerror(errno));
                exit(0);
            }
            int ret = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
            if (ret == -1) {
                printf("fcntl error: %s\n", strerror(errno));
                exit(0);
            }
            sockaddr_in sin{};
            sin.sin_family = AF_INET;
            sin.sin_port = htons(8001);
            sin.sin_addr = in_addr{0};
            if (bind(socket_fd, reinterpret_cast<const sockaddr *>(&sin), sizeof(sin)) < 0) {
                printf("bind error: %s\n", strerror(errno));
                exit(0);
            }
            if (listen(socket_fd, 10) < 0) {
                printf("listen error: %s\n", strerror(errno));
                exit(0);
            }
            while (true) {
                printf("accepting\n");
                Waiter::wait(socket_fd, POLLIN);
                int client_fd = accept4(socket_fd, nullptr, nullptr, SOCK_NONBLOCK);
                Fiber thread(worker, client_fd);
            }
        });
    });
    startFiberManager();
    main_fiber.join();
}

We launch – it works! There is only one OS thread, but we serve many clients in parallel and write code for many threads.

Multithreading

In general, the approach in which all user-space threads are executed in one OS thread, firstly, does not scale, and secondly, leads to delays for all clients if servicing one of the clients requires significant processor time.

I tried to overcome this limitation by running the thread manager on multiple OS threads and preparing all the classes accordingly.

On the one hand, I didn’t have to write anything fundamentally new for this, but on the other hand, and on the other hand, I didn’t manage to completely debug it, so we won’t talk about it in more detail.

Getting rid of boost::context

Now it’s time to learn how to do everything yourself and get rid of boost::context. To do this, we need to create a structure in which we will store the context, write code to create it and to switch.

code
class Context {
public:
    static const ssize_t STACK_SIZE = 4096 * 2;

    Context() = default;
    Context(const Context&) = delete;
    Context& operator = (const Context&) = delete;
    Context(Context&& other);
    Context& operator = (Context&& other) noexcept;
    static Context create_context();
    ~Context();

    void setRip(unsigned long rip);

private:
    unsigned long rbx;
    unsigned long rsp;
    unsigned long rbp;
    unsigned long r12;
    unsigned long r13;
    unsigned long r14;
    unsigned long r15;
    unsigned long rip;
};

extern "C" {
/*
 * saves current context into old_context_dest and loads new_context
 */
extern void switch_context(Context* old_context_dest, Context* new_context, unsigned long first_arg = 0);
}

The switch_context function gets two contexts by pointer. The current context is saved at the first pointer, after which the context from the second argument starts to be executed, but before that the third argument (first_arg) is written to the% rdi register – this is necessary so that when the function is launched for the first time in the context, it could be passed to it first argument.

The implementation of the function will be in assembler.

switch_context:
// saving current context
    mov     %rbx, (%rdi)
    mov     %rsp, 8(%rdi)
    mov     %rbp, 16(%rdi)
    mov     %r12, 24(%rdi)
    mov     %r13, 32(%rdi)
    mov     %r14, 40(%rdi)
    mov     %r15, 48(%rdi)
// адрес, с которого эта функция была вызвана, записывается в поле
// rip, так чтобы при переключении на этот контекст, начал выполняться 
// код после выполнения switch_context
    mov     (%rsp), %rax
    mov     %rax, 56(%rdi)

    mov     %rsi, %rdi
// restoring other context
    mov     (%rdi), %rbx
    mov     8(%rdi), %rsp
    mov     16(%rdi), %rbp
    mov     24(%rdi), %r12
    mov     32(%rdi), %r13
    mov     40(%rdi), %r14
    mov     48(%rdi), %r15
    mov     56(%rdi), %rax
    mov     %rax, (%rsp)
    mov     %rdx, %rdi
// значение поля rip записывается на вершину стека, чтобы ret вернула 
// нас ровно туда
    ret

Creating a new context is trivial – you need to allocate memory for the stack and write to rsp.

Fiber local variables

Initially, the code of the written server could only work under Linux. When we got rid of boost, we limited ourselves to the x86/64 architecture. Now we will limit ourselves even more – the code will require a processor that supports the fsgsbase instructions and a fairly new version of Linux.

I have not yet fully understood how thread local variables work, so I apologize right away.

How do thread local variables work? I already wrote about this in a little more detail here and the code for adding support for thread local variables to the training OS is also given there. Now I’ll just say that by default the compiler accesses thread local variables by some shift from the value of the %fs segment register. Accordingly, in order for them to work, the code that starts the thread must allocate some memory for thread local storage, initialize it, and write the address of this memory to %fs.

But, as a first approximation, nothing prevents us from doing the same so that all variables declared as thread_localworked as fiber_local.

In fact, before user programs could write to %fs directly – only the kernel could do this, and for this the user program had to execute a system call. Then rdfsbase, wrfsbase, rdgsbase and wrfsbase instructions appeared in processors, but in order for them to work, the OS kernel must explicitly enable them, so you need a fairly modern kernel to use them.

So we need:

  • add the value %fs to the Context class

  • save and restore %fs on context switch in switch_context function

  • read the ELF file of the executable file to find out the size of TLS and how to initialize it.

  • When creating a context, allocate memory for TLS, initialize and write to %fs the address of this memory

However, it should be noted that thread_local variables are used not only by our code, but also by libstdc, so we need to figure out in detail how to make support for fiber local variables so that in the libraries with which our binary is linked, thread_local variables would also work. Or would it be nice if our fiber local variables were not declared like thread_localbut as fiber_localand the compiler would use not %fs for them, but %gs – because variables from libstdc may well be thread local, not fiber local

However, if you carefully follow the steps above, the fiber local variables will work. Well, isn’t it a miracle?

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *