Daily Archives: December 9, 2013

Concurrent collections (basics)

Accessing collections can be tricky in a multithreaded environment. You can use “lock()” to access data from different threads. C# 4.0 did introduce concurrent collections, they do not require explicit synchronization. Be careful using these. I conducted many performance tests and concurrent collections turned out to be slower quite often. In fact you have to benchmark your personal approach with the amount and frequency of data that you expect. Only then you can tell what kind of collection should be used.

BlockingCollection

Imagine some data is arriving at your PC and you would like to add that data to a queue, so that the buffer does not overflow. The data is then processed as soon as the CPUs have free capacities. BlockingCollection is a wrapper for other collections. The default collection type is ConcurrentQueue. The BlockingCollection waits (blocks) for data to arrive in case it is empty.

As you know I love writing short code pieces rather than explaining a lot. By looking at the results you can understand the mechanism easily.

public static void BlockingCollection1() {
    BlockingCollection<int> lCollection = new BlockingCollection<int>();

    Task.Factory.StartNew(() => {
        for (int i = 0; i < 5; i++) {
            lCollection.Add(i);
            Thread.Sleep(200);
        }

        lCollection.CompleteAdding();
    });

    Task.Factory.StartNew(() => {
        try {
            while (true) Console.WriteLine(lCollection.Take());
        }
        catch (Exception e) {
            Console.WriteLine("exception thrown: " + e.Message);
        }
    });

    Console.ReadLine();
} //

example output:
0
1
2
3
4
exception thrown: The collection argument is empty and has been marked as complete with regards to additions.

Each number appears with a roughly 200 millisecond delay after the previous one. lCollection.CompleteAdding() then throws an exception and tells that no more elements are going to follow. This can be useful to end a task that is in a waiting-state due to Take().

public static void BlockingCollection2() {
    BlockingCollection<int> lCollection = new BlockingCollection<int>();

    Task.Factory.StartNew(() => {
        for (int i = 0; i < 5; i++) {
            lCollection.Add(i);
            Thread.Sleep(200);
        }

        lCollection.CompleteAdding();  // comment it out for testing purposes
    });


    foreach (int i in lCollection.GetConsumingEnumerable()) Console.WriteLine(i);    

    Console.ReadLine();
} //

The output of above code is behaving the same way, just the exception is not thrown.
And when commenting “lCollection.CompleteAdding();” out, the behavior does not visibly change. But there is one big difference. The foreach does not know that you have finished adding to the BlockingCollection. Therefore “foreach” (in combination with .GetConsumingEnumerable()) acts like an endless loop and the program never arrives at Console.Readline().
Telling your program that you have completed adding to the collection is vital.

Let’s avoid “.GetConsumingEnumerable()” and see what happens:

public static void BlockingCollection3() {
    BlockingCollection<int> lCollection = new BlockingCollection<int>();

    Task.Factory.StartNew(() => {
        for (int i = 0; i < 5; i++) {
            lCollection.Add(i);
            Thread.Sleep(200);
        }

        lCollection.CompleteAdding();
    });


    //Thread.Sleep(2000);
    foreach (int i in lCollection) Console.WriteLine(i);

    Console.ReadLine();
} //

Holy cow! The program does not wait for the collection to be filled. The program does not print any output. Uncomment “Thread.Sleep(2000);” and the collection is properly filled at the time the program prints the output. The numbers 1 to 4 are printed in one shot, no delay in between them.


ConcurrentBag

As there is no queue involved in the ConcurrentBag class, it can and does implement IEnumerabe. TryTake() does get the next value. The command is using an “out” parameter, which is pretty smart. You can test the validity of the result and get the result itself at the same time. That is quite convenient in multithreading. Otherwise you would have to first test and then take the value. But to do such, you would have to lock() the collection. And this is not needed here.

ConcurrentBag allows duplicate entries. The order is arbitrary, do not expect it to behave like a list. You can add numbers 0 to 5 and they can eg. print in reverse order. But this does not necessarily happen.

There is one method that is not very useful in a multithreaded environment. TryPeek() can be found in some collections. But when you use that method you cannot be sure that it is still there when you try to get the value. You would have to use lock() again. And this in fact contradicts the idea of concurrent collections.

public static void ConcurrentBag1() {
    ConcurrentBag<int> lCollection = new ConcurrentBag<int>();

    Task.Factory.StartNew(() => {
        for (int i = 0; i < 5; i++) {
            lCollection.Add(i);
            Thread.Sleep(200);
        }
    });

    //Thread.Sleep(500);
    //Thread.Sleep(2000);

    int lResult;
    while (lCollection.TryTake(out lResult)) {
        Console.WriteLine(lResult);
    }
            
    Console.ReadLine();
} //

The program does not wait for the collection to be filled. If you are lucky you will see a “0” printed on screen. TryTake() is not blocking (waiting) at all. Play with the comments and have a look at the results. The longer you wait the more numbers show up.

public static void ConcurrentBag2() {
    ConcurrentBag<int> lCollection = new ConcurrentBag<int>();
    for (int i = 0; i < 10; i++) lCollection.Add(i);
    foreach (int r in lCollection) Console.WriteLine(r);
 
    Console.ReadLine();
} //

Above is a quick demonstration of the IEnumerable. Note that the order of the output can be arbitrary.

The basic behavior of concurrent collection has been explained now. Stacks and Queues are not new ideas in C#.
So I will just list the main points:

ConcurrentStack
– Push()/PushRange() to add data
– TryPop()/TryPopRange() to get data
– Last in first out (LIFO)

ConcurrentQueue
– Enqueue() to add data
– TryDequeue() to get data
– First in first out (FIFO)

ConcurrentDictionary

Also the Dictionary is well known in C#. The concurrent version of it can atomically add, get and update entries. Atomic means that operations start and finish as single steps and without interferance of any other threads.

TryAdd returns true if the new entry was added successfully. If the key already exists, this method returns false.
TryUpdate compares the existing value for the specified key with a specified value, and if they are equal, updates the key with a third value.
AddOrUpdate adds an entry if the key does not already exist, or updates if the key already exists.
GetOrAdd adds an entry if the key does not already exist.

public static void ConcurrentDictionary1() {
    ConcurrentDictionary<string, int> lCollection = new ConcurrentDictionary<string, int>();
    if (lCollection.TryAdd("a", 1)) Console.WriteLine("successfully added entry: a/1");
    PrintConcurrentDictionary(lCollection);
    if (lCollection.TryUpdate("a", 2, 1)) Console.WriteLine("updated value to 2");
    PrintConcurrentDictionary(lCollection);
    lCollection["a"] = 3;
    PrintConcurrentDictionary(lCollection);
    int x = lCollection.AddOrUpdate("a", 4, (s, i) => i * i);
    PrintConcurrentDictionary(lCollection);
    int y = lCollection.GetOrAdd("b", 5);
    PrintConcurrentDictionary(lCollection);

    Console.ReadLine();
} //

example output:
successfully added entry: a/1
—————————————–
a = 1
updated value to 2
—————————————–
a = 2
—————————————–
a = 3
—————————————–
a = 9
—————————————–
b = 5
a = 9