Many times we come across situations were need to
continuously execute a block of code
For instance reading data from network port (socket program),
or from RFID where we create secondary thread for continuously reading and
another thread for same data manipulation
Let me demonstrate with a basic read from keyboard (on
primary thread) and write (on secondary thread) in console application with
queue
using
System.Threading;
namespace
ConsoleApplication2
{
class Program
{
static Queue<string>
q = new Queue<string>();
static Thread ThreadObj;
static void ReadMethod()
{
while
(true)
{
if
(q.Count > 0)
{
//here we would manipulate
data on secondary thread
string
str = q.Dequeue();
Console.WriteLine(string.Format("Dequeued
item {0}", str));
}
}
}
static void Main(string[]
args)
{
ThreadObj = new Thread(new ThreadStart(ReadMethod));
ThreadObj.Start();
loc:
//here read data on
primary thread,for instance from network port
string
input = Console.ReadLine();
q.Enqueue(input);
goto
loc;
}
}
}
Where read method executes while loop infinite
times, with a check is made if queue has string item if yes write it and
continue with next iteration
obviously we need to avoid while loop to
continue execution until there is an enqueue happen on queue, to do that
consider the below example
class Program
{
static Queue<string>
q = new Queue<string>();
static Thread ThreadObj;
static object LockObj = new object();
static void ReadMethod()
{
while
(true)
{
if
(q.Count == 0)
lock
(LockObj)
{
//hold a lock on LockObj and wait for a signal if queue is empty
Monitor.Wait(LockObj);
}
string
str = q.Dequeue();
Console.WriteLine(string.Format("Dequeued
item {0}", str));
}
}
static void Main(string[]
args)
{
ThreadObj = new Thread(new ThreadStart(ReadMethod));
ThreadObj.Start();
loc:
string
input = Console.ReadLine();
q.Enqueue(input);
lock
(LockObj)
{
//signal
to continue on LockObj
Monitor.Pulse(LockObj);
}
goto
loc;
}
}
From the above code
ReadMethod while loop is blocked and put on wait until queue is Enqueued,
ReadMethod uses Monitor.Wait(LockObj) which
blocks the current thread execution on static object LockObj until receives a
signal using Monitor.Pulse(LockObj).
Note : Object
synchronization Monitor.Pulse and Monitor.Wait must be called from an synchronized
block of code hence we use lock keyword(similarly
you can user Monitor.Enter, Monitor.Exit methods or Synchronization
attribute to synchronize between threads).
Let us put the above code in
ready made form by extending .Net library class Queue
Take 1
sealed class AsyncQ<T>
: Queue<T>
{
#region Members
//Delegate
points to Readmethod
Action
ActnMethod;
//Object to
put a lock on
object
LockerObj = new object();
//secondary
thread
Thread
ThreadObj;
#endregion
#region Constructors
/// <summary>
/// Intialize object
/// </summary>
/// <param
name="actmethodsignature">Method
to execute on Enqueue</param>
public
AsyncQ(Action actmethodsignature)
{
this.ActnMethod
= actmethodsignature;
ThreadObj = new Thread(new ThreadStart(this.RunDequeue));
ThreadObj.Start();
}
#endregion
#region User Defined Methods
/// <summary>
/// Adds an object to
the end of the System.Collections.Generic.Queue<T>.
/// </summary>
/// <param
name="item">Item</param>
internal
new void
Enqueue(T item)
{
base.Enqueue(item);
lock
(LockerObj)
//signal
to continue on LockObj
Monitor.Pulse(LockerObj);
}
/// <summary>
/// Removes and returns
the object at the beginning of System.Collections.Generic.Queue<T>.
/// </summary>
/// <returns></returns>
internal
new T Dequeue()
{
return
base.Dequeue();
}
/// <summary>
/// runs on a secondary thread
/// </summary>
void
RunDequeue()
{
while
(true)
{
if
(this.Count == 0)
{
lock
(LockerObj)
//wait for a signal if queue is empty
Monitor.Wait(LockerObj);
}
//calls
ReadMethod
ActnMethod();
}
}
#endregion
}
class Program
{
static AsyncQ<string>
q = new AsyncQ<string>(Readmethod);
static void Readmethod()
{
string
str = q.Dequeue();
Console.WriteLine(string.Format("Dequeued
item {0}", str));
}
static void Main(string[]
args)
{
loc:
string
input = Console.ReadLine();
q.Enqueue(input);
goto
loc;
}
}
Take 2 with Asynchronous Callback
sealed class AsyncCallBackQ<T>
: Queue<T>
{
#region Members
//callback
delegate for ReadMethod
AsyncCallback
TargetMethod;
#endregion
//delegate to
call base.Enqueue
delegate
void EnqueueDeleg(T
item);
#region Constructors
/// <summary>
/// initialize object
/// </summary>
/// <param
name="targetmethod">callback
method</param>
public
AsyncCallBackQ(AsyncCallback targetmethod)
{
this.TargetMethod
= targetmethod;
}
#endregion
#region User Defined Methods
/// <summary>
/// Adds an object to
the end of the System.Collections.Generic.Queue<T>.
/// </summary>
/// <param
name="item">Item</param>
internal
new void
Enqueue(T item)
{
EnqueueDeleg
enqdel = new AsyncCallBackQ<T>.EnqueueDeleg(base.Enqueue);
//Add
item asynchronously and call back read method on finish(Creates secondary
thread implicitly)
IAsyncResult
Iresult = enqdel.BeginInvoke(item, this.TargetMethod,
null);
enqdel.EndInvoke(Iresult);
}
/// <summary>
/// Removes and returns
the object at the beginning of the System.Collections.Generic.Queue<T>.
/// </summary>
/// <returns></returns>
internal
new T Dequeue()
{
return
base.Dequeue();
}
#endregion
}
class Program
{
static AsyncCallBackQ<string>
q = new AsyncCallBackQ<string>(Readmethod);
static object Lockobj=new object();
static void Readmethod(IAsyncResult
result)
{
lock (Lockobj)
{
string
str = q.Dequeue();
Console.WriteLine(string.Format("Dequeued
item {0}", str));
}
}
static void Main(string[]
args)
{
loc:
string
input = Console.ReadLine();
q.Enqueue(input);
goto
loc;
}
}