I am here to continue the series regarding Concurrent Collections in .NET. Today, we will discuss another Concurrent Collection called ConcurrentQueue in detail, and cover what, when, and how to use it.
If you want to understand Concurrent Collections in general and Concurrent Dictionary in detail, you have to go through my previous post at the following link.
So, let’s begin talking about ConcurrentQueue.
ConcurrentQueue is a thread safe FIFO data structure. It’s a specialized data structure and can be used in cases when we want to process data in a First In First Out manner.
Before jumping directly to ConcurrentQueue, we will start with basic Queue, and then, move towards a final solution. In this process, we will evaluate pros and cons of different approaches.
- Regular Queue with single thread
In the example below, generic queue is used to store order information. Also, GetOrders method is called in regular sync way.
- public static void TestQueueRegular()
- {
- var phoneOrders = new Queue<string>();
- GetOrders("Prakash", phoneOrders);
- GetOrders("Aradhana", phoneOrders);
-
- foreach (var order in phoneOrders)
- {
- Console.WriteLine("Phone Order: {0}", order);
- }
- }
-
- private static void GetOrders(string custName, Queue<string> phoneOrders)
- {
- for (int i = 0; i < 3; i++)
- {
- Thread.Sleep(100);
- string order = string.Format("{0} needs {1} phones", custName, i + 5);
- phoneOrders.Enqueue(order);
- }
- }
Since the GetOrders method is called in sync or one after another, the output is also printed similarly (i.e. first Prakash then Aradhana).
- Regular Queue with more than one thread
Now, let’s do the small change in the previous code, by making it async. For that, we have used a Task that will call GetOrders by two different threads.
- public static void TestQueueAsync()
- {
- var phoneOrders = new Queue<string>();
- Task t1 = Task.Run(() => GetOrders("Prakash", phoneOrders));
- Task t2 = Task.Run(() => GetOrders("Aradhana", phoneOrders));
- Task.WaitAll(t1, t2);
-
- foreach (var order in phoneOrders)
- {
- Console.WriteLine("Phone Order: {0}", order);
- }
- }
-
And, here is the result.
Exception? But why?
It is because Enqueue method of Queue is not designed to work with more than one thread parallelly.
Note: Multi-threading with regular Queue is unpredictable. It may work in some instances but if you try several times, you are likely to get exception, as above.
- Regular Queue with manual lock and more than one thread
I assume that you understood the problem, in the previous example, that Queue class and its methods are not designed to work with parallel threads.
So, what’s the solution?
The solution is to have some sort of thread synchronization, either manually or out of the box.
Let’s look at the manual way, first.
What is the manual way?
The famous lock keyword. OK, let’s see what and where to make a change, in the previous example.
- public static void TestLockedQueueAsync()
- {
- var phoneOrders = new Queue<string>();
- Task t1 = Task.Run(() => GetOrdersWithLock("Prakash", phoneOrders));
- Task t2 = Task.Run(() => GetOrdersWithLock("Aradhana", phoneOrders));
- Task.WaitAll(t1, t2);
-
- foreach (var order in phoneOrders)
- {
- Console.WriteLine("Phone Order: {0}", order);
- }
- }
-
- static object lockObj = new object();
- private static void GetOrdersWithLock(string custName, Queue<string> phoneOrders)
- {
- for (int i = 0; i < 3; i++)
- {
- Thread.Sleep(100);
- string order = string.Format("{0} needs {1} phones", custName, i + 5);
- lock (lockObj)
- {
- phoneOrders.Enqueue(order);
- }
- }
- }
And, here is the result.
Fine. So, no exception this time, after putting lock on the Enqueue method. But what about if Enqueue is called multiple times, would you use lock statement everywhere? Think…
- Concurrent Queue with more than one thread
And, here we go. The guy that automatically handles the concurrency is - Concurrent Queue.
Let’s have a look at what to change in order to work with it.
- public static void TestConcurrentQueueAsync()
- {
- var phoneOrders = new ConcurrentQueue<string>();
- Task t1 = Task.Run(() => GetOrdersForConcurrentQueue("Prakash", phoneOrders));
- Task t2 = Task.Run(() => GetOrdersForConcurrentQueue("Aradhana", phoneOrders));
- Task.WaitAll(t1, t2);
-
- foreach (var order in phoneOrders)
- {
- Console.WriteLine("Phone Order: {0}", order);
- }
- }
-
- private static void GetOrdersForConcurrentQueue(string custName, ConcurrentQueue<string> phoneOrders)
- {
- for (int i = 0; i < 3; i++)
- {
- Thread.Sleep(100);
- string order = string.Format("{0} needs {1} phones", custName, i + 5);
- phoneOrders.Enqueue(order);
- }
- }
Here’s the output.
As you can see that in order to manage concurrency, we no longer require manual locking. It’s more useful in situations when in a multi-threaded environment, we are dealing with Queue methods at several places; and, placing manual locking everywhere may lead to unmaintainable code.
Concurrent Queue has exposed several other methods. Let’s look at some of the commonly used ones.
- TryDequeue
If elements are present in the queue, TryDequeue removes the element from the beginning of the queue, sets the deleted element into the out variable, and returns true, else returns false.
Let’s look at the code and how to use it.
-
- Console.WriteLine("Total orders before Dequeue are: {0}", phoneOrders.Count);
-
- string myOrder;
- if (phoneOrders.TryDequeue(out myOrder))
- {
- Console.WriteLine("Order \"{0}\" has been removed", myOrder);
- }
- else
- {
- Console.WriteLine("Order queue is empty", myOrder);
- }
-
- Console.WriteLine("Total orders after Dequeue are: {0}", phoneOrders.Count);
Output
As you can see in the above output, the order has been removed.
- TryPeek
If elements are present in the queue, TryPeek fetches the element from the beginning of the queue without removing it, sets the fetched element into the out variable, and returns true, else it returns false.
Let’s look at the code how to use it.
-
- Console.WriteLine("Total orders before TryPeek are: {0}", phoneOrders.Count);
-
- if (phoneOrders.TryPeek(out myOrder))
- {
- Console.WriteLine("Order \"{0}\" has been retrieved", myOrder);
- }
- else
- {
- Console.WriteLine("Order queue is empty", myOrder);
- }
-
- Console.WriteLine("Total orders after TryPeek are: {0}", phoneOrders.Count);
Output
As you can see in the above output, the order has not been removed, instead just retrieved.
Conclusion
In the article, we saw what concurrent queue is and how it works. We started from a very basic queue example and moved towards concurrent queue. We looked at the problem with regular queue in multi-thread environment and also saw how with manual locking, concurrent queue can be mimicked. In the end, we have gone through some of the other commonly used methods of ConcurrentQueue.
You can also download the attached demo project (ConcurrentQueueDemo.zip) for going through the full source code used in the article.
Hope you have liked the article. Look forward for your comments/suggestions.