Concurrent Collections in .NET: ConcurrentQueue - Part Two

I am here to continue the series regarding Concurrent Collections in .NET. Today, we will discuss another Concurrent Collection called ConcurrentQueue in detail, and cover what, when, and how to use it.

If you want to understand Concurrent Collections in general and Concurrent Dictionary in detail, you have to go through my previous post at the following link.

So, let’s begin talking about ConcurrentQueue.

ConcurrentQueue is a thread safe FIFO data structure. It’s a specialized data structure and can be used in cases when we want to process data in a First In First Out manner.

Before jumping directly to ConcurrentQueue, we will start with basic Queue, and then, move towards a final solution. In this process, we will evaluate pros and cons of different approaches.

  • Regular Queue with single thread

    In the example below, generic queue is used to store order information. Also, GetOrders method is called in regular sync way.
    1. public static void TestQueueRegular()  
    2.         {  
    3.             var phoneOrders = new Queue<string>();  
    4.             GetOrders("Prakash", phoneOrders);  
    5.             GetOrders("Aradhana", phoneOrders);  
    6.   
    7.             foreach (var order in phoneOrders)  
    8.             {  
    9.                 Console.WriteLine("Phone Order: {0}", order);  
    10.             }  
    11.         }  
    12.   
    13. private static void GetOrders(string custName, Queue<string> phoneOrders)  
    14.         {  
    15.             for (int i = 0; i < 3; i++)  
    16.             {  
    17.                 Thread.Sleep(100);  
    18.                 string order = string.Format("{0} needs {1} phones", custName, i + 5);  
    19.                 phoneOrders.Enqueue(order);  
    20.             }  
    21.         } 
    Since the GetOrders method is called in sync or one after another, the output is also printed similarly (i.e. first Prakash then Aradhana).

    output

  • Regular Queue with more than one thread

    Now, let’s do the small change in the previous code, by making it async. For that, we have used a Task that will call GetOrders by two different threads.
    1. public static void TestQueueAsync()  
    2.         {  
    3.             var phoneOrders = new Queue<string>();  
    4.             Task t1 = Task.Run(() => GetOrders("Prakash", phoneOrders));  
    5.             Task t2 = Task.Run(() => GetOrders("Aradhana", phoneOrders));  
    6.             Task.WaitAll(t1, t2);  
    7.   
    8.             foreach (var order in phoneOrders)  
    9.             {  
    10.                 Console.WriteLine("Phone Order: {0}", order);  
    11.             }  
    12.         }  
    13. //Calls the same GetOrders method as in the previous example. You may also refer the attached demo project for full source code. 
    And, here is the result.

    output

    Exception? But why?

    It is because Enqueue method of Queue is not designed to work with more than one thread parallelly.

    Note: Multi-threading with regular Queue is unpredictable. It may work in some instances but if you try several times, you are likely to get exception, as above.

  • Regular Queue with manual lock and more than one thread

    I assume that you understood the problem, in the previous example, that Queue class and its methods are not designed to work with parallel threads.

    So, what’s the solution?

    The solution is to have some sort of thread synchronization, either manually or out of the box.

    Let’s look at the manual way, first.

    What is the manual way?

    The famous lock keyword. OK, let’s see what and where to make a change, in the previous example.
    1. public static void TestLockedQueueAsync()  
    2.         {  
    3.             var phoneOrders = new Queue<string>();  
    4.             Task t1 = Task.Run(() => GetOrdersWithLock("Prakash", phoneOrders));  
    5.             Task t2 = Task.Run(() => GetOrdersWithLock("Aradhana", phoneOrders));  
    6.             Task.WaitAll(t1, t2);  
    7.   
    8.             foreach (var order in phoneOrders)  
    9.             {  
    10.                 Console.WriteLine("Phone Order: {0}", order);  
    11.             }  
    12.         }  
    13.   
    14. static object lockObj = new object();  
    15.         private static void GetOrdersWithLock(string custName, Queue<string> phoneOrders)  
    16.         {              
    17.             for (int i = 0; i < 3; i++)  
    18.             {  
    19.                 Thread.Sleep(100);  
    20.                 string order = string.Format("{0} needs {1} phones", custName, i + 5);  
    21.                 lock (lockObj)  
    22.                 {  
    23.                     phoneOrders.Enqueue(order);  
    24.                 }  
    25.             }  
    26.         }  
    And, here is the result.

    output

    Fine. So, no exception this time, after putting lock on the Enqueue method. But what about if Enqueue is called multiple times, would you use lock statement everywhere? Think…

  • Concurrent Queue with more than one thread

    And, here we go. The guy that automatically handles the concurrency is - Concurrent Queue.

    Let’s have a look at what to change in order to work with it.
    1. public static void TestConcurrentQueueAsync()  
    2.         {  
    3.             var phoneOrders = new ConcurrentQueue<string>();  
    4.             Task t1 = Task.Run(() => GetOrdersForConcurrentQueue("Prakash", phoneOrders));  
    5.             Task t2 = Task.Run(() => GetOrdersForConcurrentQueue("Aradhana", phoneOrders));  
    6.             Task.WaitAll(t1, t2);  
    7.   
    8.             foreach (var order in phoneOrders)  
    9.             {  
    10.                 Console.WriteLine("Phone Order: {0}", order);  
    11.             }  
    12.         }  
    13.   
    14. private static void GetOrdersForConcurrentQueue(string custName, ConcurrentQueue<string> phoneOrders)  
    15.         {  
    16.             for (int i = 0; i < 3; i++)  
    17.             {  
    18.                 Thread.Sleep(100);  
    19.                 string order = string.Format("{0} needs {1} phones", custName, i + 5);  
    20.                 phoneOrders.Enqueue(order);  
    21.             }  
    22.         }  
    Here’s the output.

    output

    As you can see that in order to manage concurrency, we no longer require manual locking. It’s more useful in situations when in a multi-threaded environment, we are dealing with Queue methods at several places; and, placing manual locking everywhere may lead to unmaintainable code.

    Concurrent Queue has exposed several other methods. Let’s look at some of the commonly used ones.

  • TryDequeue

    If elements are present in the queue, TryDequeue removes the element from the beginning of the queue, sets the deleted element into the out variable, and returns true, else returns false.

    Let’s look at the code and how to use it.
    1. //Please refer the attached demo project for full source code  
    2. Console.WriteLine("Total orders before Dequeue are: {0}", phoneOrders.Count);              
    3.             //TryDequeue, Deletes the item from beginning of queue.  
    4.             string myOrder;  
    5.             if (phoneOrders.TryDequeue(out myOrder))  
    6.             {  
    7.                 Console.WriteLine("Order \"{0}\" has been removed", myOrder);  
    8.             }  
    9.             else  
    10.             {  
    11.                 Console.WriteLine("Order queue is empty", myOrder);  
    12.             }  
    13.   
    14.             Console.WriteLine("Total orders after Dequeue are: {0}", phoneOrders.Count);  
    Output

    output

    As you can see in the above output, the order has been removed.

  • TryPeek

    If elements are present in the queue, TryPeek fetches the element from the beginning of the queue without removing it, sets the fetched element into the out variable, and returns true, else it returns false.

    Let’s look at the code how to use it.
    1. //Please refer the attached demo project for full source code  
    2. Console.WriteLine("Total orders before TryPeek are: {0}", phoneOrders.Count);  
    3.             //TryPeek              
    4.             if (phoneOrders.TryPeek(out myOrder))  
    5.             {  
    6.                 Console.WriteLine("Order \"{0}\" has been retrieved", myOrder);  
    7.             }  
    8.             else  
    9.             {  
    10.                 Console.WriteLine("Order queue is empty", myOrder);  
    11.             }  
    12.   
    13.             Console.WriteLine("Total orders after TryPeek are: {0}", phoneOrders.Count);  
    Output

    output

As you can see in the above output, the order has not been removed,  instead just retrieved.

Conclusion

In the article, we saw what concurrent queue is and how it works. We started from a very basic queue example and moved towards concurrent queue. We looked at the problem with regular queue in multi-thread environment and also saw how with manual locking, concurrent queue can be mimicked. In the end, we have gone through some of the other commonly used methods of ConcurrentQueue.

You can also download the attached demo project (ConcurrentQueueDemo.zip) for going  through the full source code used in the article.

Hope you have liked the article. Look forward for your comments/suggestions.

Up Next
    Ebook Download
    View all
    Learn
    View all