5
\$\begingroup\$

A generic blocking queue has the following properties:

  1. It is thread-safe.
  2. It allows queuing and de-queuing of items of a certain type (T).
  3. If a de-queue operation is performed and the queue is empty, the de-queuing thread waits indefinitely for an item to be queued and then retrieves it (i.e. a de-queue operation cannot fail or return no item).

Assuming we want to implement a client application that simulates the operation of such a queue.

enter image description here

The application allows the user to do the following:

  1. Start a number of de-queuing worker threads by doing the following:

    a. Specify the number of worker threads (w).

    b. Press the "Start" button.

    By pressing the "Start" button, w worker threads are created and start de-queuing and executing the tasks in the queue. Completed tasks display their results in the results list. Each result shows the executing worker thread id and the result itself.

  2. Queue tasks by doing the following:

    a. Choose a task type.

    b. Initialize its parameters.

    c. Specify the number of queuing threads (n) and the number queued tasks (t) for each thread.

    d. Press the "Queue" button.

    The user can queue 3 types of tasks (of course, many more tasks could be created) :

    • A task that receives 2 integers as inputs, adds them and returns the result.

    • A task that receives 2 strings as inputs, concatenates them and returns the result.

    • A task that receives a string as an input and returns whether the string is a palindrome.

    Upon pressing the button, n threads are created, each creates t instances of the task and queues them all.

Some useful guidelines:

  1. Design the UI in WinForms or WPF, or whatever technology you're comfortable with.
  2. Separate the different application layers (BL, UI, etc.)
  3. Write code that allows easy addition of other task types to the system with minimal change to existing code.

The question is a request for code-review on the design of the classes, interfaces and logic of the queue. The GUI design-part is not related to the question and not concerned, but is supplied as part of the code base, which contains about 800 lines of code and could be found here.

I've chosen to use WinForms, since I'm not very familiar with other ways to design UI.

Yet, the GUI is not the real question here, and it's absolutely off the topic for review.

My idea was to divide the project into a few building blocks:

Contracts assembly: general interfaces defining tasks and its parameters and events.

ITask.cs

namespace Contracts
{
    public class SingleOperandInput 
    {
        public Object Operand;
    }

    public class TwoOperandsInput 
    {
        public Object Operand1;
        public Object Operand2;
    }

    public interface ITask
    {
        ResultBase ResultAction { get; set; }
        void Run();
    }
}

IResult.cs

namespace Contracts
{
    public interface IResult
    {
        void NotifyResult(Object result);
    }
}

ResultBase.cs

namespace Contracts
{
    public abstract class ResultBase : IResult
    {
        // A delegate type for hooking up change notifications.
        public delegate void ResultEventHandler(object sender, GeneralTaskEventArgs e);

        // An event that clients can use to be notified whenever
        // there's a result.
        public event ResultEventHandler Changed;
        public abstract void NotifyResult(object result);

        // Invoke the Changed event
        protected virtual void OnChanged(GeneralTaskEventArgs e)
        {
            if (Changed != null)
            {
                Changed(this, e);
            }
        }
    }
}

GeneralTaskEventArgs.cs

namespace Contracts
{
    public abstract class GeneralTaskEventArgs : EventArgs
    {
        public Int32 SourceThreadId;
        public Object Operand1;
        public Object Operand2;
    }
}

Tasks assembly: concrete task implementations.

Here's just one example, since all other implementations have the same look:

AddIntegers.cs

namespace Tasks
{
    public class AddIntegers : ITask
    {
        public ResultBase ResultAction { get; set; }
        private TwoOperandsInput _input;

        public AddIntegers(Object op1, Object op2, Object resultAction)
        {
            ResultAction = resultAction as ResultBase;
            _input = new TwoOperandsInput() { Operand1 = op1, Operand2 = op2 };
        }

        public void Run()
        {
            Int32 sum = Int32.Parse(_input.Operand1 as String) + Int32.Parse(_input.Operand2 as String);
            ResultAction.NotifyResult(sum);
        }
    }
}

Callbacks assembly - concrete task's callback implementations.

Meaning, what to do when a task finishes. A mapping between concrete tasks to concrete could be done in run-time, so they aren't coupled to the tasks assembly in any way!

AddIntegersResult.cs

namespace Callbacks
{
    public class AddIntegersEventArgs : GeneralTaskEventArgs
    {
        public readonly Int32 Result;
        public AddIntegersEventArgs(Int32 sourceThreadId, Int32 result)
        {
            SourceThreadId = sourceThreadId;
            Result = result;
        }
        public override string ToString()
        {
            return String.Format("Sum is {0}", Result);
        }
    }

    public class AddIntegersResult : ResultBase
    {
        public override void NotifyResult(object result)
        {
            AddIntegersEventArgs args = new AddIntegersEventArgs(Int32.Parse(Thread.CurrentThread.Name), (Int32)result);
            OnChanged(args);
        }
    }
}

Blocking queue assembly - implements the task blocking queue.

TaskQueue.cs

namespace BlockingTaskQueue
{
    public static class TaskQueue
    {
        private static Int32 _threadCount = 0;
        private static ConcurrentQueue<ITask> _queue = new ConcurrentQueue<ITask>();
        private static List<Thread> _dequeueThreads = new List<Thread>(100);

        public static void Enqueue(ITask task)
        {
            if (task == null)
            {
                throw new ArgumentNullException("task is null.");
            }

            _queue.Enqueue(task);
        }


        public static void Dequeue(Int32 workers)
        {
            for (Int32 i = 0; i < workers; i++)
            {
                Thread worker = new Thread(new ThreadStart(DequeueThreadFunc));
                 // Worker threads will not keep an application running after all foreground threads have exited.
                worker.IsBackground = true;
                worker.Name = Interlocked.Increment(ref _threadCount).ToString();
                _dequeueThreads.Add(worker);
                worker.Start();
            }
        }

        public static void DequeueThreadFunc()
        {
            ITask task;

            while (true)
            {
                if (_queue.IsEmpty)
                { // Wait indefinitely until there is something to de-queue.
                    Thread.Sleep(100);
                }
                else if (_queue.TryDequeue(out task))
                {
                    task.Run();
                }
            }
        }
    }
}

GUI assembly - The designer code to create the form isn't included.

SimulatorForm.cs

namespace BlockingTaskQueue
{
    public partial class MainForm : Form
    {
        public delegate void NewResultDelegate(GeneralTaskEventArgs args);
        public NewResultDelegate _resultDelegate;

        // Maps task descriptions to their concrete class from the tasks assembly.
        private Dictionary<String, TaskObject> _taskDescriptionToObject;

        private Dictionary<GeneralTaskEventArgs, String> _concreteEventArgsToTaskDescription;

        //// TODO: extend to many assemblies in the configuration.
        private Assembly _tasksAssembly;
        private Assembly _callbacksAssembly;

        public MainForm()
        {
            InitializeComponent();

            _tasksAssembly = Assembly.LoadFrom(ConfigurationManager.AppSettings["TasksAssembly"]);
            _callbacksAssembly = Assembly.LoadFrom(ConfigurationManager.AppSettings["CallbacksAssembly"]);

            // TODO: implement a prefix for each task description, don't use Count!
            _taskDescriptionToObject = new Dictionary<string, TaskObject>(ConfigurationManager.AppSettings.Count);
            _concreteEventArgsToTaskDescription = new Dictionary<Type, string>(ConfigurationManager.AppSettings.Count);

            foreach (var key in ConfigurationManager.AppSettings.AllKeys)
            {
                if (key != "TasksAssembly" && key != "CallbacksAssembly")
                {
                    String[] concreteTypeNamesAndOperandsRequired = ConfigurationManager.AppSettings[key].Split(',');
                    Type taskConcreteType = _tasksAssembly.GetType(concreteTypeNamesAndOperandsRequired[0]);
                    Type taskCallbackType = _callbacksAssembly.GetType(concreteTypeNamesAndOperandsRequired[1]);
                    Type taskConcreteEventArgsType = _callbacksAssembly.GetType(concreteTypeNamesAndOperandsRequired[2]);

                    _taskDescriptionToObject.Add(key, new TaskObject(taskConcreteType, taskCallbackType, Int32.Parse(concreteTypeNamesAndOperandsRequired[3]),
                        "true" == concreteTypeNamesAndOperandsRequired[4]));
                    _concreteEventArgsToTaskDescription.Add(taskConcreteEventArgsType, key);
                    tasksComboBox.Items.Add(key);
                }
            }

            _resultDelegate = new NewResultDelegate(AddResultToList);
        }

        private void startDequeueButton_Click(object sender, EventArgs e)
        {
            Int32 dequeueThreads;

            // Sanity checks.
            if (!Int32.TryParse(dequeueThreadsTextBox.Text, out dequeueThreads))
            {
                throw new ArgumentException("Expected a number as number of de-queuing worker threads.");
            }

            if (dequeueThreads < 1)
            {
                throw new ArgumentException("Expected at least 1 de-queuing worker threads.");
            }

            BlockingTaskQueue.TaskQueue.Dequeue(dequeueThreads);
        }

        private void queueButton_Click(object sender, EventArgs e)
        {
            Int32 queueingThreads;
            Int32 tasksPerThread;

            // Sanity checks.
            if (!Int32.TryParse(queueThreadsTextBox.Text, out queueingThreads))
            {
                throw new ArgumentException("Expected a number as number of queuing worker threads.");
            }
            if (queueingThreads < 1)
            {
                throw new ArgumentException("Expected at least 1 queuing worker threads.");
            }
            if (!Int32.TryParse(tasksPerThreadTextBox.Text, out tasksPerThread))
            {
                throw new ArgumentException("Expected a number as the number of tasks each queuing thread will enqueue.");
            }
            if (tasksPerThread < 1)
            {
                throw new ArgumentException("Expected at least 1 tasks to be enqueued by each thread.");
            }

            // Create instances of the task and the result-action.
            Object taskInstance = null;
            Object resultInstance = null;

            TaskObject taskObj = _taskDescriptionToObject[tasksComboBox.Text];

            // Instantiate the concrete result class
            if (!taskObj.RequiresInput)
            {
                resultInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Value);
            }

            if (taskObj.OperandsRequired == 1)
            {
                if (taskObj.RequiresInput)
                {
                    SingleOperandInput soi = new SingleOperandInput() { Operand = operand1TextBox.Text };
                    resultInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Value, new Object[] { soi });
                }
                taskInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Key, new Object[] { operand1TextBox.Text, resultInstance });
            }
            else if (taskObj.OperandsRequired == 2)
            {
                if (taskObj.RequiresInput)
                {
                    TwoOperandsInput toi = new TwoOperandsInput() { Operand1 = operand1TextBox.Text, Operand2 = operand2TextBox.Text };
                    resultInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Value, new Object[] { operand1TextBox.Text, operand2TextBox.Text });
                }
                taskInstance = Activator.CreateInstance(taskObj.ConcreteActionAndResult.Key, new Object[] { operand1TextBox.Text, operand2TextBox.Text, resultInstance });
            }
            else
            {
                throw new NotImplementedException("Not supporting more than 2 operands.");
            }

            ((ITask)taskInstance).ResultAction.Changed += NewResult;
            // Enqueue the task the amount of times specified, using the amount of threads specified.
            EnqueueConcurrent(queueingThreads, tasksPerThread, taskInstance);
        }

        private void NewResult(object state, GeneralTaskEventArgs args)
        {
            if (InvokeRequired)
            {
                Invoke(_resultDelegate, new object[] { args });
            }
            else
            {
                _resultDelegate(args);
            }
        }

        private void AddResultToList(GeneralTaskEventArgs args)
        {
            outputListBox.Items.Add(String.Format("{0} Result (Worker thread #{1})", _concreteEventArgsToTaskDescription[args.GetType()], args.SourceThreadId));
            outputListBox.Items.Add(args.ToString());
        }

        private void EnqueueConcurrent(Int32 queueingThreads, Int32 tasksPerThread, Object task)
        {
            for (Int32 i = 0; i < queueingThreads; i++)
            {
                Thread queueingThread = new Thread(new ParameterizedThreadStart(EnqueueTasksThreadFunc));
                EnqueingObject enqueueObj = new EnqueingObject() { Task = task, TasksToEnqueue = tasksPerThread };
                queueingThread.Start(enqueueObj);
            }
        }

        public static void EnqueueTasksThreadFunc(Object state)
        {
            EnqueingObject enqueueObj = state as EnqueingObject;
            if (enqueueObj == null)
            {
                throw new ArgumentNullException();
            }

            for (Int32 i = 0; i < enqueueObj.TasksToEnqueue; i++)
            {
                // If this isn't a real ITask, a run-time exception will be thrown.
                BlockingTaskQueue.TaskQueue.Enqueue((ITask)enqueueObj.Task);
            }
        }

        private void tasksComboBox_SelectedIndexChanged(object sender, EventArgs e)
        {
            Int32 operandsRequired = _taskDescriptionToObject[tasksComboBox.Text].OperandsRequired;
            if (operandsRequired == 1)
            {
                if (operand2Label.Visible)
                {
                    operand2Label.Visible = false;
                    operand2TextBox.Visible = false;
                }
            }
            else if (operandsRequired == 2)
            {
                if (!operand2Label.Visible)
                {
                    operand2Label.Visible = true;
                    operand2TextBox.Visible = true;
                }
            }
        }
    }
}

The complete working code base could be found in my implementation's github repository.

Consumers of the contracts assembly may define their own tasks, callbacks and specific events as wished.

The Tasks assembly contains concrete task's implementation,

whilst the Callbacks assembly contain concrete callback's implementations,

but one concrete task isn't coupled to another concrete callback.

That is to say, during run time, whoever decides to map between a Task implementation and a Callback implementation, may choose a different mapping in the configuration.

All the consumer has to do in order to provide tasks with callback ability using a blocking queue is to bring a compiled Tasks assembly and Callbacks assembly, and provide their assembly file paths/assembly-signature in the configuration.

This is what I did in the SimulatorView form.

So the GUI's configuration,

App.config

<configuration>
    <appSettings>
        <add key="TasksAssembly" value="C:\Code\BlockingTaskQueue\TasksImplementation\bin\Debug\Tasks.dll"/>
    <add key="CallbacksAssembly" value="C:\Code\BlockingTaskQueue\Callbacks\bin\Debug\Callbacks.dll"/>
    <!-- key: defines the description of the task.
         value: constructed as TASK_CLASS_NAME, CALLBACK_CLASS_NAME, EVENTARGS_CLASS_NAME, NUM_OPERANDS, REQUIRES_INPUT_BOOLEAN -->
        <add key="Add Numbers" value="Tasks.AddIntegers,Callbacks.AddIntegersResult,Callbacks.AddIntegersEventArgs,2,false"/>
        <add key="Concatenate Strings" value="Tasks.ConcatenateStrings,Callbacks.ConcatenateStringsResult,Callbacks.ConcatenateStringsEventArgs,2,false"/>
        <add key="Is Palindrome" value="Tasks.PalindromChecker,Callbacks.PalindromCheckerResult,Callbacks.PalindromCheckerEventArgs,1,true"/>
    </appSettings>
</configuration>

Which simply means that the both assemblies (Tasks, Callbacks) will be loaded

during run time and a mapping will be created such that "Add Numbers" operation

will activate a Tasks.AddIntegers task, that will end up calling a Callbacks.AddIntegersResult callback and use a Callbacks.AddIntegersEventArgs to supply data about the result.

Whoever subscribes on this event may use it's overriden ToString method to get the actual result from the whole operation.

I've chosen to use reflection in my implementation, although many IoC implementations such as

  • Spring.NET
  • Castle Windsor
  • StructureMap
  • Autofac
  • Unity
  • Ninject

(the list is actually pretty partial, look there to elaborate)

could probably save a programmer some time get going with reflection...

\$\endgroup\$
13
  • \$\begingroup\$ It looks like your question is “How would you do this?” That's not what this site is for, so your question is off-topic here. \$\endgroup\$
    – svick
    Commented May 22, 2013 at 23:11
  • \$\begingroup\$ No, my question isn't "How would you do this?". Since I've provided my code in my answer, github.com/rycle/BlockingTaskQueue \$\endgroup\$
    – rycle
    Commented May 23, 2013 at 8:19
  • \$\begingroup\$ Its a pure codereview question, its just that it takes some 15 different source files to be pasted here, so I put my implementation in a link to be reviewed, but of course if you wish to think of other ways to accomplish this, then you should, but please, share as another answer. \$\endgroup\$
    – rycle
    Commented May 23, 2013 at 8:31
  • \$\begingroup\$ That's not how this works. If you want some code reviewed, you need to include the code in your question. And a link isn't enough, you need to actually include your code there. If you have too much code to put it there, then it's too much code to review. \$\endgroup\$
    – svick
    Commented May 23, 2013 at 11:29
  • \$\begingroup\$ Is there a policy on how long it should be? Whoever understands and looks at this project could know that its the dividing of projects and logic that matters, rather than "how to implement a palindrome checking method", Which I respect, of course there are code review questions about single methods checking. But I think that a code reviewing site should be able to allow a slightly-bigger projects, that might contain even GUI. Of course, as users you may ignore/discourage/hate and move on, but as admins of this community, you're explicitly saying that you disallow that. How rude of you! \$\endgroup\$
    – rycle
    Commented May 23, 2013 at 11:55

2 Answers 2

2
\$\begingroup\$

This is a 10-in-1 question. This answer is taking a quick look at SimulatorForm.cs.

What you mention at the end of your [epic] post, that you've chosen reflection but that you could have used an enumeration of IoC containers instead, leaves me perplexed.

Inversion of Control implies letting go of control. Your MainForm has an empty constructor that loads assemblies, fetches static configurations and populates a combobox. I think that's a lot to do for a constructor. I don't think usage of reflection is warranted here, at all. In fact I find it just obfuscates the intent. Go ahead, inject your dependencies! Your usage of reflection isn't too far off from some all-powerful Service Locator that can instantiate just about anything.

Also maybe it's because I'm getting quite fond of commands, but I find you have a lot going on in those event handlers, beyond what would be the job of a presentation layer - i.e. deal with presentation concerns, delegating the actual work elsewhere. As a result, your code-behind feels bloated with mixed concerns:

  • tasksComboBox_SelectedIndexChanged is purely presentation stuff. That's good.
  • EnqueueTasksThreadFunc clearly belongs somewhere else, and it being public and static is kind of scary.
  • EnqueueConcurrent probably belongs in the same type as EnqueueTasksThreadFunc.
  • queueButton_Click and startDequeueButton_Click actually implement your "business logic".

I realize event handlers are named after the objects that they handle events for, but still I would rename them so as to keep a consistent PascalCasing for all methods. The objectName_EventName convention feels like VB6 - you're free to name you handlers as you like them, e.g. "OnQueueButtonClick"

\$\endgroup\$
1
  • \$\begingroup\$ definitely a 10-IN-1 question \$\endgroup\$
    – Malachi
    Commented Nov 26, 2013 at 19:38
2
\$\begingroup\$

About TaskQueue.cs

I'm not a big multithreading guru (actually not a guru at anything), so maybe it's playing a part here, but I don't get why this class has to be static, along with all of its members.

I think I would have tackled this class as a non-static class derived from ConcurrentQueue (which I believe has all the plumbing for thread-safety built-in).

There's something about static classes that just raises a flag in my mind.

About AddIntegers.cs

I find that's a rather bad name for a class, it looks like a method name (starts with a verb) - I know it's a task because I see AddIntegers : ITask, but it seems AddIntegersTask would be a [slightly] better name, especially if you make it a convention to end all classes that implement ITask with the word "Task".

\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.