Sunday, September 18, 2011

Suspending Blocking Queues

Sometimes you might find it necessary to suspend a queue, or more precisely,  the handeling of queued items (usually messages).
In a multiple queue environment, you might need to synchronize between queues in order to handle items in a chronological fashion.
E.g. A request is being sent for queue 1, which generates to messages one for queue 1 and one for queue 2, however, the queue 2 response is received before the other one.
This might cause a problem if the queue 2 item requies a resource that is only available after the queue 1 message is handled.

So, one way to avoid such problems is to suspend the queues you know that need to wait until the active queue has finished it's job.
If we take the code from my previous post all we need to add is very little code.
We need an aggregation counter.

private int m_suspendCounter = 0;

public bool Suspended
        {
            get
            {
                return m_suspendCounter > 0;
            }
        }

We need a method to suspend the queue.
public void Suspend()
        {
            lock (m_syncRoot)
            {
                m_suspendCounter++;
            }
        }

We also need a method to release the queue.
public void Resume()
        {
            lock (m_syncRoot)
            {
                m_suspendCounter--;
                if (m_suspendCounter == 0)
                    Monitor.PulseAll(m_syncRoot);
            }
        }

Notice, that the queue is only released once the counter reaches Zero. This is because you might suspend the queue in a nested fashion, i.e. If Queue 1 needs to handle a nested query.
Also we need to remember to always check, before releasing the queue, if we are in a suspended mode.

public void Enqueue(T item)
        {
            lock (m_syncRoot)
            {
                m_queue.Enqueue(item);

                if (!Suspended)
                    Monitor.Pulse(m_syncRoot);
            }
        }

Note: In a multithreaded environment, one might consider adding the volatile modifier to the counter variable.

Wednesday, August 24, 2011

Blocking queues

I needed a blocking queue to synchronize between my producer and consumer threads.
This is the best way to do this in my opinion.

Assume for the this example that either we are inheriting from Queue<T> or that we are using Queue<T> within our new class.

We need a method to enqueue items as fast as possible.

public void Enqueue(T item)
{    
            lock (m_syncRoot)
            {
                m_queue.Enqueue(item);               
                Monitor.Pulse(m_syncRoot);
            }          
}

So we'll enqueue the item and pulse the waiting threads.
We lock the scope while doing this of course.

Now, we need a consumer, for my needs I prefer to dequeue all the items from the queue, but you can change the method to dequeue one by one.

public T[] DequeueAll()
        {   
                lock (m_syncRoot)
                {
                    if (m_queue.Count == 0)
                    {
                        Monitor.Wait(m_syncRoot);
                        return new T[] {};
                    }

                    Queue<T> localQueue = new Queue<T>(m_queue.Count);                 
                  
                    while (m_queue.Count > 0)
                        localQueue.Enqueue(m_queue.Dequeue());

                    return localQueue.ToArray();
                }
            
        }

Usually I tend to avoid inner method "returns", but this time it's imperative.
Notice that we exit our method when we are signaled, this will make sure that we enter the method again while properly locking the sync object (m_queue.syncRoot) and returning the waiting queue items.
Assume that we have an endless loop surrounding DequeueAll().

For a single dequeue, you could do this:

public T Dequeue()
        {
            lock (m_syncRoot)
            {
                if (m_queue.Count == 0)
                {
                    Monitor.Wait(m_syncRoot);
                    return m_queue.Dequeue();
                }

                T localItem = m_queue.Dequeue();

                return localItem;
            }

        }

To sum up:
We have a consumer waiting when there are no available items in the queue.
When another thread puts items in the queue it signals the consumer and it then elegantly exits the method only to reenter it to re-lock the queue properly.

Note: .net does not provide a syncRoot object in Queue<T> for some reason. In .net 4.0 they provide a whole new object called ConcurrentQueue<T>.


Next post I'll talk about adding suspend/resume states to the blocking queue.

First post

cout << "Hello World!"


My first post, wow I haven't programmed in c++ in years.
C# is really like a soft cushion of a language, it's a Nanny for lazy programmers.
It takes good care of you, it's an amazing achievement on Microsoft's behalf.
It's always growing and developing you can hardly keep up.