I assume that the actOnItem
does some kind of a side-effect used by doFinalStep
. Since in parallel you have multiple side-effect groups, each of which you now want to process with doFinalStep
, you would have to track different side-effects for different groups in a, say, concurrent data structure. Without knowing exactly what the actOnItem
and doFinalStep
do, it's hard to convert this to a more functional style. You could do something like this:
class SF { /* whatever your sideeffect is */ }
val ac = new java.util.concurrent.atomic.AtomicInteger(0)
val sfmap = new java.util.concurrent.ConcurrentHashMap[Int, SF]()
def newSideeffectIndex() = {
val i = ac.fetchAndIncrement()
sfmap.put(i, new SF())
i
}
val mySeq = Seq[Item]()
mySeq.aggregate(-1)((u, x) => actOnItem(u, x), (u1, u2) => {
doFinalStep(u1)
doFinalStep(u2)
})
def actOnItem(u0: Int, x: Item) {
val u = if (u0 == -1) newSideeffectIndex() else u0
// do whatever you need to do with `x`
// ...
val sf = map.get(u)
// do something with `sf` - update it somehow based on `x`
u
}
def doFinalStep(u: Int) {
val sf = map.get(u)
if (sf != null) {
// do the final step here using `sf`
}
map.remove(u)
}
Explanation: each new partition starts with the aggregation value -1. The aggregating part (first closure) will initialize the aggregation value for the current partition if it detects -1. Initializing will pick a unique integer and create an SF object which holds the sideeffect. After that, it will process the current element and then update the sideeffect. At the reduction step you know that the partition has been dealt with, so you can do the final step for that partition - and remove it from the concurrent map.