ConcurrentBag in C#

How ConcurrentaBag works

Thread-local storage: local queues for each thread

ConcurrentBag is based on an interesting mechanism – local queues for each thread. That is, each thread that adds data to the collection has its own local queue. These local queues are not completely isolated, they can be used to pass data between threads, but the basic idea is that each thread works with its own local copy of the data.

Example:

ConcurrentBag bag = new ConcurrentBag();

Task.Factory.StartNew(() =>
{
    for (int i = 0; i < 5; i++)
    {
        bag.Add(i);
        Console.WriteLine($"Thread {Task.CurrentId} added {i}");
    }
});

Task.Factory.StartNew(() =>
{
    foreach (var item in bag)
    {
        Console.WriteLine($"Thread {Task.CurrentId} accessed {item}");
    }
});

Task.WaitAll();  // Ожидаем завершения задач

Here each thread adds elements to a local queue and can access the data. However, when elements are added by one thread and retrieved by another, ConcurrentBag uses a work-stealing algorithm.

work-stealing

Algorithm work-stealing makes this collection unique in terms of multi-threaded performance. The idea is that each thread adds elements to its queue, but if one thread finishes executing and its local queue empties, it can steal data from another thread's queue.

Let's imagine there is a pool of threads, and one thread has just become free, and other threads still have unfinished tasks. Instead of being idle, a free thread simply steals data from another thread by taking items from the end of its local queue. The entire process happens in the background and there is no need to synchronize anything additional.

Example:

ConcurrentBag bag = new ConcurrentBag();

Task.Factory.StartNew(() =>
{
    for (int i = 0; i < 10; i++)
    {
        bag.Add(i);
    }
});

Task.Factory.StartNew(() =>
{
    int result;
    while (bag.TryTake(out result))
    {
        Console.WriteLine($"Thread {Task.CurrentId} took {result}");
    }
});

Task.WaitAll();

So the second thread steals data from the collection added by the first thread. Here we see the method in action TryTakewhich takes elements from the collection, giving priority to the current thread's local queue. If there are no elements in it, the thread moves on to look for elements in other threads, stealing the data.

When problems arise

But not everything is as rosy as it might seem at first glance. Since ConcurrentBag is based on local queues and a stealing algorithm, problems may occur. racing situations and conflicts. For example, when one thread tries to steal elements from another, there may be times when both threads try to access the same element. To achieve this, ConcurrentBag uses various synchronization mechanisms to minimize conflicts.

When a thread is working with three or more elements in its local queue, no locking is required. However, if there are fewer than three elements in the queue, a locking mechanism is enabled to prevent races when other threads access the same data.

When you need to perform global operations, e.g. ToArray or Count,ConcurrentBag introduces a global lock at the collection ,level, freezing the structure and preventing it from being ,changed while these operations are in progress.

When a thread tries to steal data, it uses SpinWait – a small loop waiting for another thread to complete its operations on the local queue if there is a risk of conflict.

Operations on data in ConcurrentBag

Operation Add

Operation Add – this is the basis of everything.

Example:

ConcurrentBag bag = new ConcurrentBag();

Parallel.For(0, 10, (i) =>
{
    bag.Add(i);
    Console.WriteLine($"Thread {Task.CurrentId} added {i}");
});

Here each thread simply adds elements to its bag, and there is no blocking. This is the ConcurrentBag feature – local queues reduce the number of synchronizations.

TryTake

Operation TryTake is an attempt to take an element from a collection. If a thread tries to pick up an element, it first tries to do so from its local queue. If there is nothing there, the same mechanism turns on work-stealingwhich we have already talked about. The thread begins to steal elements from other threads, taking data from their local queues.

Example:

ConcurrentBag bag = new ConcurrentBag();

Parallel.For(0, 10, (i) => bag.Add(i));

int result;
if (bag.TryTake(out result))
{
    Console.WriteLine($"Thread {Task.CurrentId} took {result}");
}

So, if the current thread has elements in its local queue, they will be taken from there. If not, the thread will start checking whether it can steal elements from other threads.

TryPeek

TryPeek similar to TryTakebut with a difference: it does not remove the element from the collection. This method is good when you just need to look into the collection without interfering with its structure.

Example:

int peekedItem;
if (bag.TryPeek(out peekedItem))
{
    Console.WriteLine($"Thread {Task.CurrentId} peeked and saw {peekedItem}");
}

It is worth noting that ok Peek also no guaranteethat the elements will be returned in the order in which they were added.

ToArray and Count Methods: Why Avoid Them

Methods ToArray And Count ConcurrentBag runs global locks at the collection level. They are mainly used to create some data consistency, but such operations can significantly slow down the program.

When you call ToArraythe collection must be frozen to ensure that the data does not change while it is being copied into the array. The same goes for Count — to accurately count the number of elements, you need to block all threads to prevent the collection from changing during the calculation.

Example:

ConcurrentBag bag = new ConcurrentBag();

Parallel.For(0, 10, (i) => bag.Add(i));

int[] items = bag.ToArray();
Console.WriteLine($"Items in bag: {string.Join(", ", items)}");

Method ToArray may be useful for final data collection, but should be avoided.

Application examples

Multi-threaded object caching

Let's say you have a high-load web application that constantly creates and destroys objects – be they database connections, file descriptors, or just objects in memory. Creating objects from scratch is always expensive in terms of performance, so we create a pool of objects that can be reused.

class ConnectionPool
{
    private ConcurrentBag _connections = new ConcurrentBag();

    public DatabaseConnection GetConnection()
    {
        if (_connections.TryTake(out var connection))
        {
            Console.WriteLine($"Reusing connection {connection.Id}");
            return connection;
        }

        // Если нет доступных соединений, создаем новое
        var newConnection = new DatabaseConnection();
        Console.WriteLine($"Creating new connection {newConnection.Id}");
        return newConnection;
    }

    public void ReturnConnection(DatabaseConnection connection)
    {
        _connections.Add(connection);
        Console.WriteLine($"Returned connection {connection.Id} back to the pool");
    }
}

In this code ConcurrentBag used as a connection pool. When a thread requests a connection, it tries to take it from the bag. If there are no connections, a new one is created. When the work is completed, the connection is returned to the pool for reuse by other threads.

Asynchronous logging in a multi-threaded environment

You can use ConcurrentBag to store logs and collect them asynchronously. Let's say there is an application that writes logs from different streams, and then they are collected and written to a file or database.

Example:

class Logger
{
    private ConcurrentBag _logMessages = new ConcurrentBag();

    public void LogMessage(string message)
    {
        _logMessages.Add($"{DateTime.Now}: {message}");
    }

    public void WriteLogsToFile()
    {
        foreach (var message in _logMessages)
        {
            Console.WriteLine($"Writing log: {message}");
            // Пример записи в файл или базу данных
        }
    }
}

Here ConcurrentBag is used to store log messages that are added asynchronously from different threads. When it is time to collect logs (for example, every 10 minutes), data is taken from ConcurrentBag and written to a file or database.

Producer-Consumer Pattern

And now we come to perhaps the most important thing – Producer-Consumer. This is a classic pattern of multi-threaded applications, where one or more threads create data (Producers) and other threads process it (Consumers). In this case, ConcurrentBag is a good candidate for handling such tasks.

Example:

class TaskProcessor
{
    private ConcurrentBag _tasks = new ConcurrentBag();

    public void ProduceTasks()
    {
        for (int i = 0; i < 10; i++)
        {
            var task = new Action(() => Console.WriteLine($"Processing task {i}"));
            _tasks.Add(task);
            Console.WriteLine($"Task {i} added");
        }
    }

    public void ConsumeTasks()
    {
        while (_tasks.TryTake(out var task))
        {
            task();
            Console.WriteLine($"Task executed");
        }
    }
}

There are two methods here: ProduceTaskswhich adds tasks to the ConcurrentBag, and ConsumeTaskswhich picks up tasks and completes them.

You can learn more from ConcurrentBag read here.


I recommend everyone who is interested in the topic of working with data in C# to attend the open lesson “Linq in Practice”, which will take place on October 22.

After the webinar, you will be able to write your linq queries based on linq syntax and make your work more efficient by using comparators. You can sign up for a lesson on the “C# Developer” course page.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *