1

i have a Seq of items and i need to do something on each item, and then perform a final step that doesn't need any input. i want to use par to speed this up: split up the Seq into partitions, and within each partition do something on each item and perform the final step for each partition. i want the final step to be run in the thread processing a particular partition. is there a way to do that? aggregate() doesn't seem to do the right thing.

here's some sample code:

// non-parallel case
val mySeq = Seq[Item]
mySeq foreach { actOnItem(_) }
doFinalStep()

// ideal par case
val mySeq = Seq[Item]
mySeq.par foreachThenDoFinalStepAfterPartition { actOnItem(_), doFinalStep }
7
  • Have you seen this stackoverflow.com/questions/1751953/… ? Commented Aug 26, 2011 at 18:45
  • Can you describe a bit more? Why do you have to do something special after each batch of elements has been processed?
    – axel22
    Commented Aug 26, 2011 at 18:55
  • @amir that link is not relevant for my use case. Commented Aug 26, 2011 at 19:03
  • @axel22 processing each element accumulates some data. i want to do something with that data at the end. i can easily move that around so that it's accumulating a return value, but i only want to process the accumulation once for each partition. Commented Aug 26, 2011 at 19:03
  • 1
    That sounds close to what aggregate() does ... what part of what aggregate is doing doesn't do what you want?
    – brandon
    Commented Aug 26, 2011 at 19:04

1 Answer 1

1

I assume that the actOnItem does some kind of a side-effect used by doFinalStep. Since in parallel you have multiple side-effect groups, each of which you now want to process with doFinalStep, you would have to track different side-effects for different groups in a, say, concurrent data structure. Without knowing exactly what the actOnItem and doFinalStep do, it's hard to convert this to a more functional style. You could do something like this:

class SF { /* whatever your sideeffect is */ }

val ac = new java.util.concurrent.atomic.AtomicInteger(0)
val sfmap = new java.util.concurrent.ConcurrentHashMap[Int, SF]()
def newSideeffectIndex() = {
  val i = ac.fetchAndIncrement()
  sfmap.put(i, new SF())
  i
}

val mySeq = Seq[Item]()
mySeq.aggregate(-1)((u, x) => actOnItem(u, x), (u1, u2) => {
  doFinalStep(u1)
  doFinalStep(u2)
})

def actOnItem(u0: Int, x: Item) {
  val u = if (u0 == -1) newSideeffectIndex() else u0

  // do whatever you need to do with `x`
  // ...

  val sf = map.get(u)
  // do something with `sf` - update it somehow based on `x`

  u
}

def doFinalStep(u: Int) {
  val sf = map.get(u)

  if (sf != null) {
    // do the final step here using `sf`
  }

  map.remove(u)
}

Explanation: each new partition starts with the aggregation value -1. The aggregating part (first closure) will initialize the aggregation value for the current partition if it detects -1. Initializing will pick a unique integer and create an SF object which holds the sideeffect. After that, it will process the current element and then update the sideeffect. At the reduction step you know that the partition has been dealt with, so you can do the final step for that partition - and remove it from the concurrent map.

Not the answer you're looking for? Browse other questions tagged or ask your own question.