How about applying merge as both seqop
and comboop
?
val in = Array(
Map("a" -> 1, "b" -> 2),
Map("a" -> 11, "c" -> 4),
Map("b" -> 7, "c" -> 10)
)
def merge(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] =
m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
in.par.aggregate(Map[String, Int]())(merge, merge)
Update
You pass to aggregate
initial accumulator value(empty map) and two closures - seqop
and comboop
.
Parallel sequence splits in several partitions to be processed in parallel. Each partition is processed by successively applying seqop
to accumulator and array element.
def seqop(
accumulator: Map[String, Int],
element: Map[String, Int]): Map[String, Int] = merge(accumulator, element)
seqop
takes initial accumulator value and first array element and merges it. Next it takes previous result and next array element and so on until whole partition is merged in one map.
When every partition is merged in a separate map, these maps should be combined by applying comboop
. comboop
takes merged map from first partition and merged map from second partition and merges it together. Next it takes previous result and map from third partition and so on until all is merged in one map. This is the result of aggregate
.
def comboop(
m1: Map[String, Int],
m2: Map[String, Int]): Map[String, Int] = merge(m1, m2)
It is just coincidence that seqop
and comboop
are the same. In general they differs in logic and signatures.