Wednesday, August 24, 2011

Blocking queues

I needed a blocking queue to synchronize between my producer and consumer threads.
This is the best way to do this in my opinion.

Assume for the this example that either we are inheriting from Queue<T> or that we are using Queue<T> within our new class.

We need a method to enqueue items as fast as possible.

public void Enqueue(T item)
{    
            lock (m_syncRoot)
            {
                m_queue.Enqueue(item);               
                Monitor.Pulse(m_syncRoot);
            }          
}

So we'll enqueue the item and pulse the waiting threads.
We lock the scope while doing this of course.

Now, we need a consumer, for my needs I prefer to dequeue all the items from the queue, but you can change the method to dequeue one by one.

public T[] DequeueAll()
        {   
                lock (m_syncRoot)
                {
                    if (m_queue.Count == 0)
                    {
                        Monitor.Wait(m_syncRoot);
                        return new T[] {};
                    }

                    Queue<T> localQueue = new Queue<T>(m_queue.Count);                 
                  
                    while (m_queue.Count > 0)
                        localQueue.Enqueue(m_queue.Dequeue());

                    return localQueue.ToArray();
                }
            
        }

Usually I tend to avoid inner method "returns", but this time it's imperative.
Notice that we exit our method when we are signaled, this will make sure that we enter the method again while properly locking the sync object (m_queue.syncRoot) and returning the waiting queue items.
Assume that we have an endless loop surrounding DequeueAll().

For a single dequeue, you could do this:

public T Dequeue()
        {
            lock (m_syncRoot)
            {
                if (m_queue.Count == 0)
                {
                    Monitor.Wait(m_syncRoot);
                    return m_queue.Dequeue();
                }

                T localItem = m_queue.Dequeue();

                return localItem;
            }

        }

To sum up:
We have a consumer waiting when there are no available items in the queue.
When another thread puts items in the queue it signals the consumer and it then elegantly exits the method only to reenter it to re-lock the queue properly.

Note: .net does not provide a syncRoot object in Queue<T> for some reason. In .net 4.0 they provide a whole new object called ConcurrentQueue<T>.


Next post I'll talk about adding suspend/resume states to the blocking queue.

First post

cout << "Hello World!"


My first post, wow I haven't programmed in c++ in years.
C# is really like a soft cushion of a language, it's a Nanny for lazy programmers.
It takes good care of you, it's an amazing achievement on Microsoft's behalf.
It's always growing and developing you can hardly keep up.