A generic blocking queue has the following properties:
- It is thread-safe.
- It allows queuing and de-queuing of items of a certain type (T).
- If a de-queue operation is performed and the queue is empty, the de-queuing thread waits indefinitely for an item to be queued and then retrieves it (i.e. a de-queue operation cannot fail or return no item).
Assuming we want to implement a client application that simulates the operation of such a queue.
The application allows the user to do the following:
Start a number of de-queuing worker threads by doing the following:
a. Specify the number of worker threads (w).
b. Press the "Start" button.
By pressing the "Start" button, w worker threads are created and start de-queuing and executing the tasks in the queue. Completed tasks display their results in the results list. Each result shows the executing worker thread id and the result itself.
Queue tasks by doing the following:
a. Choose a task type.
b. Initialize its parameters.
c. Specify the number of queuing threads (n) and the number queued tasks (t) for each thread.
d. Press the "Queue" button.
The user can queue 3 types of tasks (of course, many more tasks could be created) :
A task that receives 2 integers as inputs, adds them and returns the result.
A task that receives 2 strings as inputs, concatenates them and returns the result.
A task that receives a string as an input and returns whether the string is a palindrome.
Upon pressing the button, n threads are created, each creates t instances of the task and queues them all.
Some useful guidelines:
- Design the UI in WinForms or WPF, or whatever technology you're comfortable with.
- Separate the different application layers (BL, UI, etc.)
- Write code that allows easy addition of other task types to the system with minimal change to existing code.
The question is a request for code-review on the design of the classes, interfaces and logic of the queue. The GUI design-part is not related to the question and not concerned, but is supplied as part of the code base, which contains about 800 lines of code and could be found here.
I've chosen to use WinForms, since I'm not very familiar with other ways to design UI.
Yet, the GUI is not the real question here, and it's absolutely off the topic for review.
My idea was to divide the project into a few building blocks:
Contracts assembly: general interfaces defining tasks and its parameters and events.
ITask.cs
namespace Contracts
{
public class SingleOperandInput
{
public Object Operand;
}
public class TwoOperandsInput
{
public Object Operand1;
public Object Operand2;
}
public interface ITask
{
ResultBase ResultAction { get; set; }
void Run();
}
}
IResult.cs
namespace Contracts
{
public interface IResult
{
void NotifyResult(Object result);
}
}
ResultBase.cs
namespace Contracts
{
public abstract class ResultBase : IResult
{
// A delegate type for hooking up change notifications.
public delegate void ResultEventHandler(object sender, GeneralTaskEventArgs e);
// An event that clients can use to be notified whenever
// there's a result.
public event ResultEventHandler Changed;
public abstract void NotifyResult(object result);
// Invoke the Changed event
protected virtual void OnChanged(GeneralTaskEventArgs e)
{
if (Changed != null)
{
Changed(this, e);
}
}
}
}
GeneralTaskEventArgs.cs
namespace Contracts
{
public abstract class GeneralTaskEventArgs : EventArgs
{
public Int32 SourceThreadId;
public Object Operand1;
public Object Operand2;
}
}
Tasks assembly: concrete task implementations.
Here's just one example, since all other implementations have the same look:
AddIntegers.cs
namespace Tasks
{
public class AddIntegers : ITask
{
public ResultBase ResultAction { get; set; }
private TwoOperandsInput _input;
public AddIntegers(Object op1, Object op2, Object resultAction)
{
ResultAction = resultAction as ResultBase;
_input = new TwoOperandsInput() { Operand1 = op1, Operand2 = op2 };
}
public void Run()
{
Int32 sum = Int32.Parse(_input.Operand1 as String) + Int32.Parse(_input.Operand2 as String);
ResultAction.NotifyResult(sum);
}
}
}
Callbacks assembly - concrete task's callback implementations.
Meaning, what to do when a task finishes. A mapping between concrete tasks to concrete could be done in run-time, so they aren't coupled to the tasks assembly in any way!
AddIntegersResult.cs
namespace Callbacks
{
public class AddIntegersEventArgs : GeneralTaskEventArgs
{
public readonly Int32 Result;
public AddIntegersEventArgs(Int32 sourceThreadId, Int32 result)
{
SourceThreadId = sourceThreadId;
Result = result;
}
public override string ToString()
{
return String.Format("Sum is {0}", Result);
}
}
public class AddIntegersResult : ResultBase
{
public override void NotifyResult(object result)
{
AddIntegersEventArgs args = new AddIntegersEventArgs(Int32.Parse(Thread.CurrentThread.Name), (Int32)result);
OnChanged(args);
}
}
}
Blocking queue assembly - implements the task blocking queue.
TaskQueue.cs
namespace BlockingTaskQueue
{
public static class TaskQueue
{
private static Int32 _threadCount = 0;
private static ConcurrentQueue<ITask> _queue = new ConcurrentQueue<ITask>();
private static List<Thread> _dequeueThreads = new List<Thread>(100);
public static void Enqueue(ITask task)
{
if (task == null)
{
throw new ArgumentNullException("task is null.");
}
_queue.Enqueue(task);
}
public static void Dequeue(Int32 workers)
{
for (Int32 i = 0; i < workers; i++)
{
Thread worker = new Thread(new ThreadStart(DequeueThreadFunc));
// Worker threads will not keep an application running after all foreground threads have exited.
worker.IsBackground = true;
worker.Name = Interlocked.Increment(ref _threadCount).ToString();
_dequeueThreads.Add(worker);
worker.Start();
}
}
public static void DequeueThreadFunc()
{
ITask task;
while (true)
{
if (_queue.IsEmpty)
{ // Wait indefinitely until there is something to de-queue.
Thread.Sleep(100);
}
else if (_queue.TryDequeue(out task))
{
task.Run();
}
}
}
}
}
GUI assembly - The designer code to create the form isn't included.
SimulatorForm.cs
namespace BlockingTaskQueue
{
public partial class MainForm : Form
{
public delegate void NewResultDelegate(GeneralTaskEventArgs args);
public NewResultDelegate _resultDelegate;
// Maps task descriptions to their concrete class from the tasks assembly.
private Dictionary<String, TaskObject> _taskDescriptionToObject;
private Dictionary<GeneralTaskEventArgs, String> _concreteEventArgsToTaskDescription;
//// TODO: extend to many assemblies in the configuration.
private Assembly _tasksAssembly;
private Assembly _callbacksAssembly;
public MainForm()
{
InitializeComponent();
_tasksAssembly = Assembly.LoadFrom(ConfigurationManager.AppSettings["TasksAssembly"]);
_callbacksAssembly = Assembly.LoadFrom(ConfigurationManager.AppSettings["CallbacksAssembly"]);
// TODO: implement a prefix for each task description, don't use Count!
_taskDescriptionToObject = new Dictionary<string, TaskObject>(ConfigurationManager.AppSettings.Count);
_concreteEventArgsToTaskDescription = new Dictionary<Type, string>(ConfigurationManager.AppSettings.Count);
foreach (var key in ConfigurationManager.AppSettings.AllKeys)
{
if (key != "TasksAssembly" && key != "CallbacksAssembly")
{
String[] concreteTypeNamesAndOperandsRequired = ConfigurationManager.AppSettings[key].Split(',');
Type taskConcreteType = _tasksAssembly.GetType(concreteTypeNamesAndOperandsRequired[0]);
Type taskCallbackType = _callbacksAssembly.GetType(concreteTypeNamesAndOperandsRequired[1]);
Type taskConcreteEventArgsType = _callbacksAssembly.GetType(concreteTypeNamesAndOperandsRequired[2]);
_taskDescriptionToObject.Add(key, new TaskObject(taskConcreteType, taskCallbackType, Int32.Parse(concreteTypeNamesAndOperandsRequired[3]),
"true" == concreteTypeNamesAndOperandsRequired[4]));
_concreteEventArgsToTaskDescription.Add(taskConcreteEventArgsType, key);
tasksComboBox.Items.Add(key);
}
}
_resultDelegate = new NewResultDelegate(AddResultToList);
}
private void startDequeueButton_Click(object sender, EventArgs e)
{
Int32 dequeueThreads;
// Sanity checks.
if (!Int32.TryParse(dequeueThreadsTextBox.Text, out dequeueThreads))
{
throw new ArgumentException("Expected a number as number of de-queuing worker threads.");
}
if (dequeueThreads < 1)
{
throw new ArgumentException("Expected at least 1 de-queuing worker threads.");
}
BlockingTaskQueue.TaskQueue.Dequeue(dequeueThreads);
}
private void queueButton_Click(object sender, EventArgs e)
{
Int32 queueingThreads;
Int32 tasksPerThread;
// Sanity checks.
if (!Int32.TryParse(queueThreadsTextBox.Text, out queueingThreads))
{
throw new ArgumentException("Expected a number as number of queuing worker threads.");
}
if (queueingThreads < 1)
{
throw new ArgumentException("Expected at least 1 queuing worker threads.");
}
if (!Int32.TryParse(tasksPerThreadTextBox.Text, out tasksPerThread))
{
throw new ArgumentException("Expected a number as the number of tasks each queuing thread will enqueue.");
}
if (tasksPerThread < 1)
{
throw new ArgumentException("Expected at least 1 tasks to be enqueued by each thread.");
}
// Create instances of the task and the result-action.
Object taskInstance = null;
Object resultInstance = null;
TaskObject taskObj = _taskDescriptionToObject[tasksComboBox.Text];
// Instantiate the concrete result class
if (!taskObj.RequiresInput)
{
resultInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Value);
}
if (taskObj.OperandsRequired == 1)
{
if (taskObj.RequiresInput)
{
SingleOperandInput soi = new SingleOperandInput() { Operand = operand1TextBox.Text };
resultInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Value, new Object[] { soi });
}
taskInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Key, new Object[] { operand1TextBox.Text, resultInstance });
}
else if (taskObj.OperandsRequired == 2)
{
if (taskObj.RequiresInput)
{
TwoOperandsInput toi = new TwoOperandsInput() { Operand1 = operand1TextBox.Text, Operand2 = operand2TextBox.Text };
resultInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Value, new Object[] { operand1TextBox.Text, operand2TextBox.Text });
}
taskInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Key, new Object[] { operand1TextBox.Text, operand2TextBox.Text, resultInstance });
}
else
{
throw new NotImplementedException("Not supporting more than 2 operands.");
}
((ITask)taskInstance).ResultAction.Changed += NewResult;
// Enqueue the task the amount of times specified, using the amount of threads specified.
EnqueueConcurrent(queueingThreads, tasksPerThread, taskInstance);
}
private void NewResult(object state, GeneralTaskEventArgs args)
{
if (InvokeRequired)
{
Invoke(_resultDelegate, new object[] { args });
}
else
{
_resultDelegate(args);
}
}
private void AddResultToList(GeneralTaskEventArgs args)
{
outputListBox.Items.Add(String.Format("{0} Result (Worker thread #{1})", _concreteEventArgsToTaskDescription[args.GetType()], args.SourceThreadId));
outputListBox.Items.Add(args.ToString());
}
private void EnqueueConcurrent(Int32 queueingThreads, Int32 tasksPerThread, Object task)
{
for (Int32 i = 0; i < queueingThreads; i++)
{
Thread queueingThread = new Thread(new ParameterizedThreadStart(EnqueueTasksThreadFunc));
EnqueingObject enqueueObj = new EnqueingObject() { Task = task, TasksToEnqueue = tasksPerThread };
queueingThread.Start(enqueueObj);
}
}
public static void EnqueueTasksThreadFunc(Object state)
{
EnqueingObject enqueueObj = state as EnqueingObject;
if (enqueueObj == null)
{
throw new ArgumentNullException();
}
for (Int32 i = 0; i < enqueueObj.TasksToEnqueue; i++)
{
// If this isn't a real ITask, a run-time exception will be thrown.
BlockingTaskQueue.TaskQueue.Enqueue((ITask)enqueueObj.Task);
}
}
private void tasksComboBox_SelectedIndexChanged(object sender, EventArgs e)
{
Int32 operandsRequired = _taskDescriptionToObject[tasksComboBox.Text].OperandsRequired;
if (operandsRequired == 1)
{
if (operand2Label.Visible)
{
operand2Label.Visible = false;
operand2TextBox.Visible = false;
}
}
else if (operandsRequired == 2)
{
if (!operand2Label.Visible)
{
operand2Label.Visible = true;
operand2TextBox.Visible = true;
}
}
}
}
}
The complete working code base could be found in my implementation's github repository.
Consumers of the contracts assembly may define their own tasks, callbacks and specific events as wished.
The Tasks assembly contains concrete task's implementation,
whilst the Callbacks assembly contain concrete callback's implementations,
but one concrete task isn't coupled to another concrete callback.
That is to say, during run time, whoever decides to map between a Task implementation and a Callback implementation, may choose a different mapping in the configuration.
All the consumer has to do in order to provide tasks with callback ability using a blocking queue is to bring a compiled Tasks assembly and Callbacks assembly, and provide their assembly file paths/assembly-signature in the configuration.
This is what I did in the SimulatorView form.
So the GUI's configuration,
App.config
<configuration>
<appSettings>
<add key="TasksAssembly" value="C:\Code\BlockingTaskQueue\TasksImplementation\bin\Debug\Tasks.dll"/>
<add key="CallbacksAssembly" value="C:\Code\BlockingTaskQueue\Callbacks\bin\Debug\Callbacks.dll"/>
<!-- key: defines the description of the task.
value: constructed as TASK_CLASS_NAME, CALLBACK_CLASS_NAME, EVENTARGS_CLASS_NAME, NUM_OPERANDS, REQUIRES_INPUT_BOOLEAN -->
<add key="Add Numbers" value="Tasks.AddIntegers,Callbacks.AddIntegersResult,Callbacks.AddIntegersEventArgs,2,false"/>
<add key="Concatenate Strings" value="Tasks.ConcatenateStrings,Callbacks.ConcatenateStringsResult,Callbacks.ConcatenateStringsEventArgs,2,false"/>
<add key="Is Palindrome" value="Tasks.PalindromChecker,Callbacks.PalindromCheckerResult,Callbacks.PalindromCheckerEventArgs,1,true"/>
</appSettings>
</configuration>
Which simply means that the both assemblies (Tasks, Callbacks) will be loaded
during run time and a mapping will be created such that "Add Numbers" operation
will activate a Tasks.AddIntegers task, that will end up calling a Callbacks.AddIntegersResult callback and use a Callbacks.AddIntegersEventArgs to supply data about the result.
Whoever subscribes on this event may use it's overriden ToString method to get the actual result from the whole operation.
I've chosen to use reflection in my implementation, although many IoC implementations such as
- Spring.NET
- Castle Windsor
- StructureMap
- Autofac
- Unity
- Ninject
(the list is actually pretty partial, look there to elaborate)
could probably save a programmer some time get going with reflection...