3
\$\begingroup\$

I have a below class in which add method will be called by multiple threads to populate messageByChannelReference concurrent hash map in a thread safe way.

For each Channel, I will have list of Message object and I want to process all these channels in parallel. And that is why I am using thread safe data structures ConcurrentHashMap and ConcurrentLinkedQueue. Here Channel is an enum in my code.

In the same class, I have a background thread which runs every 30 seconds and it calls sendAll method which processes multiple channels in parallel so basically idea is I want to send every 30 seconds whatever is there in the map.

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  // creating a ListeningExecutorService (Guava) by wrapping a normal ExecutorService (Java)
  private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors
      .newCachedThreadPool());        
  private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> messageByChannelReference =
                new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        sendAll();
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  // called only by single background thread
  private void sendAll() {  
    ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels = messageByChannelReference
                        .getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());           

    List<ListenableFuture<Void>> list = new ArrayList<>();
    for (Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels
        .entrySet()) {
      final Channel channel = entry.getKey();
      final ConcurrentLinkedQueue<Message> messages = entry.getValue();
      ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
        public Void call() throws Exception {
          send(channel, messages);
          return null;
        }
      });
      list.add(future);
    }

    ListenableFuture<List<Void>> combinedFutures = Futures.allAsList(list);
    try {
      List<Void> allChannels = combinedFutures.get();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    } catch (ExecutionException ex) {
      // log error
    }
  }

  private void send(final Channel channel, final ConcurrentLinkedQueue<Message> messages) {
    while (!messages.isEmpty()) {
      Message message = messages.poll();
      // ....
      // process this and send to database
    }
  }

  // called by multiple threads to populate the map
  public void add(final Channel channel, final Message message) {
    ConcurrentMap<Channel, ConcurrentLinkedQueue<Message>> messageHolderByChannel =
        messageByChannelReference.get();
    ConcurrentLinkedQueue<Message> messageHolder = messageHolderByChannel.get(channel);
    if (messageHolder == null) {
      messageHolder = Queues.newConcurrentLinkedQueue();
      ConcurrentLinkedQueue<Message> currentMessageHolder =
          messageHolderByChannel.putIfAbsent(channel, messageHolder);
      if (currentMessageHolder != null)
        messageHolder = currentMessageHolder;
    }
    messageHolder.add(message);
  }
}

Question:

My problem is multiple producers and single consumer where multiple producer threads will call add method to populate my CHM map. And then I have a single background thread which is my consumer which will read data from this map and work on multiple channels in parallel.

Is this the right way to execute multiple channels in parallel? Let me know if I have missed any thread safety or any race conditions in my code. I don't need to block in my case, if queue is empty, I will return back and start again after 30 seconds. Right now I have a Void on callable but I will change that to Boolean in future.

I am working with Java 7.

\$\endgroup\$
0

3 Answers 3

5
\$\begingroup\$

Let's tackle the Threading and synchronization. It's a big task.

Just one word before hand: Immutability is the king synchronization.

Just putting objects in Thread-safe Java Collections does not protect the Objects. It only protects the Collection's functionality from Threading shenanigans (inserting, retrieving, removing will all keep the Collection in a valid state, but the Objects can be used and treated asynchronously).


AtomicReference

Its purpose is to hold an editable reference, in a way that any observer at any time see the most up-to-date reference. I.e., while someone edits the reference using set(), no thread can use get() and obtain a wrong reference (Without synchronization, it could either obtain the old one, the new one, a null bogus ref, etc.). In fact, if you look at its source, it is just a volatile holder. Its is handy when you need to edit a reference...

But you never do it!

So you can just drop it, and simply make messageByChannelReference an immutable (final, because it effectively is) ConcurrentHashMap. As a bonus we get rid of one AtomicReference<...> Layer, because those <> really started to stack up!


ConcurrentHashMap

In the same vein, ConcurrentHashMap protects its mapping so that it still holds when it is updated asynchronously. But not the keys, nor the values, are pretected from being used (i.e. methods invoked) asynchronously. Now, if you're using a Concurrent map, you're expecting somthing to change the mapping Channel --> Queue.

Your Channels are a constant (they are in an enum). So you never modify the Map's keys. The keys point to some Queues. Why drop the queues? Why not empty their content? Would you throw away your glass when you drank all the wine it contained? I wouldn't - I usually reuse my glass. So you can keep the Queue reference in place.

If you never change the mapping, you don't need a Concurrent Map, you need an immutable one, so you mustfind another implmentation. Thankfully you're already using Guava :D.

Just a word: to make the Map immutable, you have to fill it with data first. That means instancing all Queues from the start. I see you've lazily initialized those. Lazy initialisation is not always of any benefit, but is always resulting in obfuscated code (with lots if if(a=null)a=new A();). I recommend dropping the laziness here, as a few Queue references won't hurt anyone.

Quick recap: I'proposing that you replace:

private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> messageByChannelReference = new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());

By:

private final ImmutableMap<Channel, ConcurrentLinkedQueue<Message>>> messageByChannel;
private Processor(){
    // ...Other stuff...
    ImmutableMap.Builder<Channel, ConcurrentLinkedQueue<Message>>> mapBuilder = ImmutableMap.Builder();
    for(Channel channel : Channel.getEnumConstants()){
        mapBuilder.put(channel, new ConcurrentLinkedQueue<Message>());
    }
    this.messageByChannel = mapBuilder.build();
}

What needs to be Thread safe, then?

Well, anything that is modified asynchronously (as we've seen, this excluded the Map reference, and the Map itself).

The messages are added to the queues asynchronously. So those queues must be made Thread safe. And they are! So... we're good, I think.


How does it work now?

You build a map, assign queues to Channels, then start all Threads.

A thread comes up to the map asking for a Queue to put his payload...
...The map returns the corresponding Queue (there's only one, ts mapping never changes, there's never a mistake to be made)...
...The Thread puts his Message in that Queue (The Queue is Thread safe, so it isn't broken by this unexpected action)...
...then continues its business.

From time to time an Executor Thread pops up...
...browses each of the Map's Queues (safely, no changes to the map or to the mapping)...
... And proceed to take (poll) messages, one-by-one (safely, those poll() are made on a Concurrent Queues)...
...and processes them.

Now, it's important to read the ConcurrentLinkedQueue's Javadoc, because a few points are important:

  • It is a thread-safe queue (good)
  • It does not permit the use of null elements (didn't see much null-checks! Some are required, maybe)
  • Its Iterators are weakly consistent (this is why I recommend using poll() repeatedly. poll() is guaranteed to never give the same object to two Threads at the same time)

Add Messages (Edit)

The method does not change much. One warning: do not call a Collection a Holder. Usually a Holder holds a singleton, like a wrapper for a piece of data or a reference.

// (Assuming messageByChannel is the map)
public void add(final Channel channel, final Message message) {
    messageByChannel.get(channel).add(message); // That's it!
    // No need for null-checking, we did the Queue instancing manually in Constructor
}

Send the messages: Which Thread? (Edit)

Right now the Executor Threads calls sendAll() on concurrent Futures, then halts. I don't really see why the Executor Threads couldn't prepare and send backages itself?

Like:

private void sendAll() {  
    for (Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
        final Channel channel = entry.getKey();
        final ConcurrentLinkedQueue<Message> messages = entry.getValue();
        send(channel, messages);
    }
}
private void send(final Channel channel, final ConcurrentLinkedQueue<Message> messages) {
    Message message;
    while (message = messages.poll()  != null) { // don't use isEmpty without synchronizing, the message could be gone between isEmpty() and poll() calls!
        // ....
        // process this and send to database
    }
}

Send the messages: Channel by Channel (Edit)

You stated you want the Channels to be sent asynchronously. You've elected to have one Thread run at intervals from an Executor, which spawns a few sender Threads everytime it runs.

Why not directly schedule one job per Channel? This way, you could interrupt one temporarily in production, or add more, or adjust the sending rate per channel.

(Edit 2) Here's how to do it from the Processor class :

private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private static final int POLLING_DELAY = 30;
private Processor() {
    int staggerDelay = 1 + POLLING_DELAY  / Channel.values().length;
    int channelNum = 0;
    for(Channel channel : Channel.getEnumConstants()){
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                sendChannel(targetChannel);
            }
        }, channelNum * staggerDelay++, POLLING_DELAY, TimeUnit.SECONDS);
    }
}

The send(Channel, Queue) method parameters are redundant, since the queues are uniquely assigned to the Channels (the mapping info is constant). I rewrote it as:

private void sendChannel(final Channel channel) {
    ConcurrentLinkedQueue<Message> messages = messageByChannels.get(channel);
    Message message;
    while ((message = messages.poll())  != null) { // Don't use isEmpty without synchronizing, the message could be gone between isEmpty() and poll() calls!
        // ....
        // process this and send to database
    }
}

There you go, you have one polling job per Channel (polling adjustable), and the provider Threads. No otherThreads needed. The executor service is still available to manage those jobs. I staggered the Polling Thread starts to prevent bottlenecking. Of course the sendChannel could be made smarter (like using batching etc.) but that is for later.

\$\endgroup\$
12
  • \$\begingroup\$ Thanks for your great feedback so in short I can drop atomic reference and use final concurrent hash map directly and then my add method will be modified accordingly as well right? Also in your usage example, you didn't mentioned about executing multiple channels in parallel? In my case, I want to execute multiple channels in parallel so whatever I am doing in my code is right and I should continue to use that? Or maybe I understood it wrong? \$\endgroup\$ Commented Feb 7, 2017 at 17:31
  • \$\begingroup\$ The hashmap doesn't need to be concurrent, I suggested an immutable map. The add method doesn't need much change (tho it must compile ^^) except to drop putIfAbsent thanks to the non-lazy init. For parallel channel polling I'll edit my answer later. \$\endgroup\$
    – MrBrushy
    Commented Feb 7, 2017 at 18:16
  • \$\begingroup\$ @user1950349 clarified Immutable map, added updated add function, and proposed Parallel Channel polling. Is that clear enough? \$\endgroup\$
    – MrBrushy
    Commented Feb 7, 2017 at 23:04
  • \$\begingroup\$ Yeah understood most of the things and looks very neat. Few follow up questions though: 1) Do I need both classes now - Processor and Channel as looks like you have mixed these two? In my case Channel was just simply an enum and Processor class I was using to populate message object for each channel and then in the same class I was starting a background thread to send all the messages for each channels. \$\endgroup\$ Commented Feb 8, 2017 at 1:43
  • \$\begingroup\$ Since after looking your Channel class, you are starting a background thread over there only and withn sendChannel method you use messageByChannels map which is defined in processor class so got confuse slightly. 2) If we just need one class, then what's the name of the class I should use which depicts the behavior that class is doing. 3) Also then add method will go to Enum as well (assuming we keep only enum class) if we decide to use one class? \$\endgroup\$ Commented Feb 8, 2017 at 1:44
4
\$\begingroup\$

There are 2 things that I am primarily concerned with here:

  1. messageByChannelReference
  2. ConcurrentLinkedQueue inside of a ConcurrentHashMap

Problem with #1

As I have interpreted your code, you seem to want to discard the current current hashmap and populate an empty one every time you send all the messages. However, the problem is that whenever you decide to sendAll, you must iterate over the entry set of a map that will be discarded. You may still maintain a reference of the discarded map whilst you are iterating over the weakly consistent entry set view. This means that you could have discarded the old map while still adding entries to it, and your iterator may not see those entries because it is weakly consistent.


Problem with #2

From what I understand, you are creating a lookup table (cache) that creates new entries if they do not already exist. This isn't necessarily a problem here, but I think that you have needlessly created a complicated tree of entries that can be vastly simplified and in fact not have to create a new linked queue that might be thrown away if the method loses the race.

Side note: I embarrassingly became confused due to the lack of computeIfAbsent before I realized that you needed to use Java 7.


How do you fix it

The solution is very simple, wrap both your Channel and the Message together in an immutable object and place it in a ConcurrentLinkedQueue. This will solve both of the aforementioned problems. The way I see it being implemented:

private final ConcurrentLinkedQueue<QueuedMessage> messages = new ConcurrentLinkedQueue<>();

public void sendAll() {
    QueuedMessage m;
    while ((m = messages.poll()) != null) {
        // ...
        send(m.getChannel(), m.getPayload());
        // ...
    }
}

public void add(Channel ch, Message payload) {
    messages.offer(new QueuedMessage(ch, payload));
}

class QueuedMessage {
    private final Channel channel;
    private final Message payload;
    public QueuedMessage(Channel ch, Message payload) {
        channel = ch;
        this.payload = payload;
    }

    // Getters
}

As you can see, much simpler, thread-safe, less esoteric and easier to read.

\$\endgroup\$
2
  • \$\begingroup\$ This is cleaner, but it lacks one functionality. OP provided code that allowed Channels to be emptied asychonously. In your solution, channels are mixed up, and sent all at once. \$\endgroup\$
    – MrBrushy
    Commented Feb 7, 2017 at 15:49
  • \$\begingroup\$ That's why I added the ..., I mean that OP can use what I provided as a starting point to add what functionality they need. \$\endgroup\$
    – user50505
    Commented Feb 7, 2017 at 16:00
2
\$\begingroup\$

If you don't need consumer to process Messages in the insertion order, or any particular order, you can simplify your code significantly.

All you need is blocking dequeue, 2-sided queue; Java offer LinkedBlockingDeque. LinkedBlockingDeque is indeed thread safe collection and optimized for heavy concurrent usage.

Create one LinkedBlockingDeque and distribute reference to it to all your producers and consumers. Producers are going to push messages on one end, and consumer(s) are going to pull messages on the other end.

You don't need list of Futures. Such solution is very memory wasteful and unnecessarily complex.

Generally, with any producer/consumer scenario, look for "blocking" variants of collections and synchronization objects. Consumers need to be efficiently blocked in case of empty queue.

Futures are synchronization object, but using them in huge numbers through huge list, that's not as efficient as using a blocking queue.

  while (!messages.isEmpty()) {
    Message message = messages.poll();

Yikes, this is bad, very bad. This form of "blocking" (waiting for message to arrive) is horribly inefficient, this is a potential CPU killer. Use blocking poll() call in LinkedBlockingDeque. No need for loops with that one.

All object shared between threads has to be thread safe. Objects passed from producer to consumer are exactly such objects, they have to be thread safe. Easiest way is to use immutable objects.

\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.