0

I am new to concurrent coding and trying to implement simple ThreadPool by myself. I found this implementation on a learning website(jenkov.com) and it seems to be working fine.

However, I think thread and isStopped should be at least volatile in PoolThreadRunnable class or Atomic variables as they are shared by two threads, the one running the method on the object(where isStopped=true and this.thread.interrupt() is called) and the one in which actual run code is being run (where we do this.thread=Thread.currentThread() and while(!isStopped()).

Is my understanding correct or am I missing something?

public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<PoolThreadRunnable> runnables = new ArrayList<>();
    private boolean isStopped = false;

    public ThreadPool(int noOfThreads, int maxNoOfTasks){
        taskQueue = new ArrayBlockingQueue(maxNoOfTasks);

        for(int i=0; i<noOfThreads; i++){
            PoolThreadRunnable poolThreadRunnable =
                    new PoolThreadRunnable(taskQueue);

            runnables.add(poolThreadRunnable);
        }
        for(PoolThreadRunnable runnable : runnables){
            new Thread(runnable).start();
        }
    }

    public synchronized void  execute(Runnable task) throws Exception{
        if(this.isStopped) throw
                new IllegalStateException("ThreadPool is stopped");

        this.taskQueue.offer(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThreadRunnable runnable : runnables){
            runnable.doStop();
        }
    }

    public synchronized void waitUntilAllTasksFinished() {
        while(this.taskQueue.size() > 0) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class PoolThreadRunnable implements Runnable {

    private Thread        thread    = null;
    private BlockingQueue taskQueue = null;
    private boolean       isStopped = false;

    public PoolThreadRunnable(BlockingQueue queue){
        taskQueue = queue;
    }

    public void run(){
        this.thread = Thread.currentThread();
        while(!isStopped()){
            try{
                Runnable runnable = (Runnable) taskQueue.take();
                runnable.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        //break pool thread out of dequeue() call.
        this.thread.interrupt();
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}
9
  • 2
    Rather than review and/or support, random code found on the Internet, I would suggest using a ThreadPoolExecutor when a Java thread pool is needed. Commented Jul 6 at 18:18
  • 2
    @ElliottFrisch – For production code I will agree. But for learning purposes is nothing better than implementing this kind of stuff by yourself.
    – tquadrat
    Commented Jul 6 at 18:30
  • 2
    The isStopped fields seem to be properly guarded by synchronized methods (note while (!isStopped()) is calling the method, not reading the field directly). The thread field, however, has a questionable assignment (the field is not volatile and the write to it in run is not guarded by a synchronized (this) block to match the synchronized doStop method where the field is read).
    – Slaw
    Commented Jul 6 at 18:32
  • Thanks @ElliottFrisch for the suggestion! I am aware of Executors API in java and would definitely use it in production code. This question is however for learning purpose and I am not actually using it in any production.
    – Siddharth
    Commented Jul 6 at 18:33
  • 1
    The code in my comment would be before the loop (in the run method).
    – Slaw
    Commented Jul 9 at 21:52

1 Answer 1

2

Both of the isStopped fields look to be properly guarded by synchronized methods. Note the while loop in the run() method calls the isStopped() method instead of reading the field directly. But the thread field has a questionable assignment. That field is not volatile and the write is not guarded by a synchronized block to match the synchronized doStop() method.

With how the rest of the class is implemented, the following would be a good fix:

@Override
public void run(){
    synchronized (this) {
        if (isStopped) {
            return;
        }
        this.thread = Thread.currentThread();
    }

    while(!isStopped()){
        try{
            Runnable runnable = (Runnable) taskQueue.take();
            runnable.run();
        } catch(Exception e){
            //log or otherwise report exception,
            //but keep pool thread alive.
        }
    }
}

Note: It's recommended to always use @Override when you think you are overriding a method. Adding that annotation forces the compiler to verify the method actually overrides something.

Why synchronize on this? Because a synchronized instance method implicitly synchronizes on this, and it's important to synchronize on the same object if you want to create a proper happens-before relationship. Also, to avoid a potential NullPointerException, you'll probably want to change doStop() to:

public synchronized void doStop(){
    isStopped = true;
    if (thread != null) {
        //break pool thread out of dequeue() call.
        thread.interrupt();
    }
}

I had another question (albeit tangential) on what changes will be there for executing Callable instead. Would appreciate if you can point me to any reference if you know of any.

I don't know of an introductory tutorial off the top of my head. But the purpose of a Callable is to be able to return a result, which a Runnable cannot do. You would need your thread pool to return some object that represents the asynchronous result. In the Java Executor Framework, that is a Future. You can look at the implementations of ThreadPoolExecutor and FutureTask to see how they do it.

So, using the Java Executor Framework for inspiration, first you should create an interface akin to Future. This interface can be simpler since you are only trying to learn, not implement a fully featured API. You can add features as you desire.

import java.util.concurrent.ExecutionException;

public interface Result<V> {

  /**
   * Gets the value of this {@code Result}, blocking until the background task
   * is done if necessary.
   *
   * @return the value of the background task
   * @throws ExecutionException if the background task throws an exception; the
   * thrown exception will be the cause of the {@code ExecutionException}
   * @throws InterruptedException if the calling thread is interrupted
   */
  V get() throws ExecutionException, InterruptedException;

  /**
   * Tests if the background task is complete.
   *
   * @return {@code true} if the background task is complete, {@code false}
   * otherwise
   */
  boolean isDone();
}

Then you should create an implementation that also implements Runnable. This implementation will accept a Callable and be responsible for invoking it.

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ResultTask<V> implements Result<V>, Runnable {

  private final AtomicBoolean executed = new AtomicBoolean();

  private final Lock lock = new ReentrantLock();
  private final Condition isDone = lock.newCondition();

  private final Callable<V> callable;
  // these three fields are guarded by 'lock'
  private boolean done;
  private V result;
  private Throwable error;

  public ResultTask(Callable<V> callable) {
    this.callable = Objects.requireNonNull(callable);
  }

  @Override
  public V get() throws ExecutionException, InterruptedException {
    lock.lockInterruptibly();
    try {
      while (!done) {
        // Called in a loop to handle a so-called "spurious wakeup" of the thread
        isDone.await();
      }

      if (error != null) {
        throw new ExecutionException(error);
      }
      return result;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public boolean isDone() {
    lock.lock();
    try {
      return done;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void run() {
    if (!executed.compareAndSet(false, true)) {
      throw new IllegalStateException("task already executed");
    }

    try {
      // don't hold the lock while invoking the Callable
      V result = callable.call();
      complete(result, null);
    } catch (Throwable error) {
      complete(null, error);
    }
  }

  private void complete(V result, Throwable error) {
    lock.lock();
    try {
      this.result = result;
      this.error = error;
      done = true;
      // Signal all threads blocked on 'isDone.await()' to wake them up
      isDone.signalAll();
    } finally {
      lock.unlock();
    }
  }
}

Then your ThreadPool would take a Callable and wrap it in the a ResultTask. Since ResultTask implements Runnable, you can enqueue it in the task queue to be executed like normal. Then you return the ResultTask as a Result to the caller so the task's result can be queried at some point in the future.

public <V> Result<V> execute(Callable<V> callable) throws Exception {
  Objects.requireNonNull(callable);
  synchronized (this) {
    if (isStopped)
      throw new IllegalStateException("ThreadPool is stopped");

    ResultTask<V> task = new ResultTask<>(callable);
    taskQueue.offer(task);
    return task;
  }
}

Warning: The result of offer should probably be checked. See end of answer.

You can even change your execute(Runnable) method to be:

public Result<Void> execute(Runnable runnable) throws Exception {
  Objects.requireNonNull(runnable);
  return execute(() -> {
    runnable.run();
    return null;
  });
}

This would let the caller know when the Runnable completes, even though there won't be any value.


As an aside, if BlockingQueue is a java.util.concurrent.BlockingQueue, then note the following things:

  1. That interface is generic. You haven't parameterized it, which means you're using a raw type. Don't use raw types; use BlockingQueue<Runnable> and new ArrayBlockingQueue<>(...) (note the <>). This will also remove the need to cast to Runnable in your PoolThreadRunnable#run() method.

  2. The offer(E) method will return false if the element could not be added, such as because the queue is filled to capacity. You don't check this, which means a caller of execute could think the task was successfully enqueued even though it wasn't. If offer returns false, you should probably throw an exception. In the Java Executor Framework, it would be a RejectedExecutionException.

1
  • Thanks for the elaborate answer @Slaw! This pretty much sums up what I wanted to know. Have a good day!
    – Siddharth
    Commented Jul 12 at 17:58

Not the answer you're looking for? Browse other questions tagged or ask your own question.