Skip to main content
import com.google.common.collect.Maps;MapMaker;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implemenationimplementation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final int size;
  private final ConcurrentMap<K, Set<V>> cache=Maps.newConcurrentMapcache;
  private final ConcurrentMap<K, Object> locks;

  public ConcurrentMultiMap();
  {
  public void putthis(final32, K2);
 key, }

  public ConcurrentMultiMap(final Vint valueconcurrencyLevel)
  {
    Set<V> set=cache.getthis(keyconcurrencyLevel, 2);
  }

  ifpublic ConcurrentMultiMap(setfinal ==int nullconcurrencyLevel, final int factor)
  {
    size=concurrencyLevel * set=Setsfactor;
    cache=new MapMaker().newSetFromMapconcurrencyLevel(MapsconcurrencyLevel).<V,initialCapacity(concurrencyLevel).makeMap();
 Boolean>newConcurrentMap   locks=new MapMaker().concurrencyLevel(concurrencyLevel).initialCapacity(concurrencyLevel).weakKeys().weakValues().makeMap();
  }

  private Object getLock(final Set<V>K previousSet=cachekey){
    final Object object=new Object();
    Object lock=locks.putIfAbsent(key, setobject);
      if(previousSetlock !=== null){
        set=previousSet;lock=object;
      }
    }
   return set.add(value);lock;
  }

  public void putAllput(final K key, final Collection<V>V valuesvalue)
  {
    synchronized(getLock(key)){
      Set<V> set=cache.get(key);
      if(set == null){
        set=Sets.newSetFromMapnewHashSetWithExpectedSize(Mapssize);
        cache.<Vput(key, Boolean>newConcurrentMap(set);
      }
      set.add(value);
    }
  }

  public void putAll(final K key, final Collection<V> values)
  {
    synchronized(getLock(key)){
      Set<V> previousSet=cacheset=cache.putIfAbsentget(key, set);
      if(previousSetset !=== null){
        set=previousSet;set=Sets.newHashSetWithExpectedSize(size);
      }  cache.put(key, set);
      }
      set.addAll(values);
    }
  }

  public Set<V> remove(final K key)
  {
    synchronized(getLock(key)){
      return cache.remove(key);
    }
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implemenation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final ConcurrentMap<K, Set<V>> cache=Maps.newConcurrentMap();

  public void put(final K key, final V value)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.add(value);
  }

  public void putAll(final K key, final Collection<V> values)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.addAll(values);
  }

  public Set<V> remove(final K key)
  {
    return cache.remove(key);
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implementation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final int size;
  private final ConcurrentMap<K, Set<V>> cache;
  private final ConcurrentMap<K, Object> locks;

  public ConcurrentMultiMap()
  {
    this(32, 2);
  }

  public ConcurrentMultiMap(final int concurrencyLevel)
  {
    this(concurrencyLevel, 2);
  }

  public ConcurrentMultiMap(final int concurrencyLevel, final int factor)
  {
    size=concurrencyLevel * factor;
    cache=new MapMaker().concurrencyLevel(concurrencyLevel).initialCapacity(concurrencyLevel).makeMap();
    locks=new MapMaker().concurrencyLevel(concurrencyLevel).initialCapacity(concurrencyLevel).weakKeys().weakValues().makeMap();
  }

  private Object getLock(final K key){
    final Object object=new Object();
    Object lock=locks.putIfAbsent(key, object);
    if(lock == null){
      lock=object;
    }
    return lock;
  }

  public void put(final K key, final V value)
  {
    synchronized(getLock(key)){
      Set<V> set=cache.get(key);
      if(set == null){
        set=Sets.newHashSetWithExpectedSize(size);
        cache.put(key, set);
      }
      set.add(value);
    }
  }

  public void putAll(final K key, final Collection<V> values)
  {
    synchronized(getLock(key)){
      Set<V> set=cache.get(key);
      if(set == null){
        set=Sets.newHashSetWithExpectedSize(size);
        cache.put(key, set);
      }
      set.addAll(values);
    }
  }

  public Set<V> remove(final K key)
  {
    synchronized(getLock(key)){
      return cache.remove(key);
    }
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}
Typo.
Source Link

I had a requirement where I had to have a Map<Comparable, Set<Comparable>> where insertion on the Map be concurrent and also on the corresponding Set, but once a Key was consumed from the Map, it had to be deleted, think if as a Job running every two seconds which is consuming the whole Set<Comparable> from an specific Key but insertion be totally concurrent so that most values be buffered when the Job kicks in, here is my implementation:

Note: I use Guava's helper class Maps to create the concurrent Maps, also, this solution emulates Java concurrency in Practice Listing 5.19:

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implemenation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final ConcurrentMap<K, Set<V>> cache=Maps.newConcurrentMap();

  public void put(final K key, final V value)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.add(value);
  }

  public void putAll(final K key, final Collection<V> values)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.addAll(values);
  }

  public Set<V> remove(final K key)
  {
    return cache.remove(key);
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}

I had a requirement where I had to have a Map<Comparable, Set<Comparable>> where insertion on the Map be concurrent and also on the corresponding Set, but once a Key was consumed from the Map, it had to be deleted, think if as a Job running every two seconds which is consuming the whole Set<Comparable> from an specific Key but insertion be totally concurrent so that most values be buffered when the Job kicks in, here is my implementation:

Note: I use Guava's helper class Maps to create the concurrent Maps:

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implemenation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final ConcurrentMap<K, Set<V>> cache=Maps.newConcurrentMap();

  public void put(final K key, final V value)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.add(value);
  }

  public void putAll(final K key, final Collection<V> values)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.addAll(values);
  }

  public Set<V> remove(final K key)
  {
    return cache.remove(key);
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}

I had a requirement where I had to have a Map<Comparable, Set<Comparable>> where insertion on the Map be concurrent and also on the corresponding Set, but once a Key was consumed from the Map, it had to be deleted, think if as a Job running every two seconds which is consuming the whole Set<Comparable> from an specific Key but insertion be totally concurrent so that most values be buffered when the Job kicks in, here is my implementation:

Note: I use Guava's helper class Maps to create the concurrent Maps, also, this solution emulates Java concurrency in Practice Listing 5.19:

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implemenation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final ConcurrentMap<K, Set<V>> cache=Maps.newConcurrentMap();

  public void put(final K key, final V value)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.add(value);
  }

  public void putAll(final K key, final Collection<V> values)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.addAll(values);
  }

  public Set<V> remove(final K key)
  {
    return cache.remove(key);
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}
Source Link

I had a requirement where I had to have a Map<Comparable, Set<Comparable>> where insertion on the Map be concurrent and also on the corresponding Set, but once a Key was consumed from the Map, it had to be deleted, think if as a Job running every two seconds which is consuming the whole Set<Comparable> from an specific Key but insertion be totally concurrent so that most values be buffered when the Job kicks in, here is my implementation:

Note: I use Guava's helper class Maps to create the concurrent Maps:

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implemenation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final ConcurrentMap<K, Set<V>> cache=Maps.newConcurrentMap();

  public void put(final K key, final V value)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.add(value);
  }

  public void putAll(final K key, final Collection<V> values)
  {
    Set<V> set=cache.get(key);
    if(set == null){
      set=Sets.newSetFromMap(Maps.<V, Boolean>newConcurrentMap());
      final Set<V> previousSet=cache.putIfAbsent(key, set);
      if(previousSet != null){
        set=previousSet;
      }
    }
    set.addAll(values);
  }

  public Set<V> remove(final K key)
  {
    return cache.remove(key);
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}