I have a below class in which add
method will be called by multiple threads to populate messageByChannelReference
concurrent hash map in a thread safe way.
For each Channel
, I will have list of Message
object and I want to process all these channels in parallel. And that is why I am using thread safe data structures ConcurrentHashMap
and ConcurrentLinkedQueue
. Here Channel
is an enum in my code.
In the same class, I have a background thread which runs every 30 seconds and it calls sendAll
method which processes multiple channels in parallel so basically idea is I want to send every 30 seconds whatever is there in the map.
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
// creating a ListeningExecutorService (Guava) by wrapping a normal ExecutorService (Java)
private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors
.newCachedThreadPool());
private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> messageByChannelReference =
new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
sendAll();
}
}, 0, 30, TimeUnit.SECONDS);
}
// called only by single background thread
private void sendAll() {
ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels = messageByChannelReference
.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());
List<ListenableFuture<Void>> list = new ArrayList<>();
for (Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels
.entrySet()) {
final Channel channel = entry.getKey();
final ConcurrentLinkedQueue<Message> messages = entry.getValue();
ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
public Void call() throws Exception {
send(channel, messages);
return null;
}
});
list.add(future);
}
ListenableFuture<List<Void>> combinedFutures = Futures.allAsList(list);
try {
List<Void> allChannels = combinedFutures.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
// log error
}
}
private void send(final Channel channel, final ConcurrentLinkedQueue<Message> messages) {
while (!messages.isEmpty()) {
Message message = messages.poll();
// ....
// process this and send to database
}
}
// called by multiple threads to populate the map
public void add(final Channel channel, final Message message) {
ConcurrentMap<Channel, ConcurrentLinkedQueue<Message>> messageHolderByChannel =
messageByChannelReference.get();
ConcurrentLinkedQueue<Message> messageHolder = messageHolderByChannel.get(channel);
if (messageHolder == null) {
messageHolder = Queues.newConcurrentLinkedQueue();
ConcurrentLinkedQueue<Message> currentMessageHolder =
messageHolderByChannel.putIfAbsent(channel, messageHolder);
if (currentMessageHolder != null)
messageHolder = currentMessageHolder;
}
messageHolder.add(message);
}
}
Question:
My problem is multiple producers and single consumer where multiple producer threads will call add
method to populate my CHM map. And then I have a single background thread which is my consumer which will read data from this map and work on multiple channels in parallel.
Is this the right way to execute multiple channels in parallel? Let me know if I have missed any thread safety or any race conditions in my code.
I don't need to block in my case, if queue is empty, I will return back and start again after 30 seconds. Right now I have a Void
on callable but I will change that to Boolean in future.
I am working with Java 7.