Sunday, September 18, 2011

Suspending Blocking Queues

Sometimes you might find it necessary to suspend a queue, or more precisely,  the handeling of queued items (usually messages).
In a multiple queue environment, you might need to synchronize between queues in order to handle items in a chronological fashion.
E.g. A request is being sent for queue 1, which generates to messages one for queue 1 and one for queue 2, however, the queue 2 response is received before the other one.
This might cause a problem if the queue 2 item requies a resource that is only available after the queue 1 message is handled.

So, one way to avoid such problems is to suspend the queues you know that need to wait until the active queue has finished it's job.
If we take the code from my previous post all we need to add is very little code.
We need an aggregation counter.

private int m_suspendCounter = 0;

public bool Suspended
        {
            get
            {
                return m_suspendCounter > 0;
            }
        }

We need a method to suspend the queue.
public void Suspend()
        {
            lock (m_syncRoot)
            {
                m_suspendCounter++;
            }
        }

We also need a method to release the queue.
public void Resume()
        {
            lock (m_syncRoot)
            {
                m_suspendCounter--;
                if (m_suspendCounter == 0)
                    Monitor.PulseAll(m_syncRoot);
            }
        }

Notice, that the queue is only released once the counter reaches Zero. This is because you might suspend the queue in a nested fashion, i.e. If Queue 1 needs to handle a nested query.
Also we need to remember to always check, before releasing the queue, if we are in a suspended mode.

public void Enqueue(T item)
        {
            lock (m_syncRoot)
            {
                m_queue.Enqueue(item);

                if (!Suspended)
                    Monitor.Pulse(m_syncRoot);
            }
        }

Note: In a multithreaded environment, one might consider adding the volatile modifier to the counter variable.