17

I have an iteration vals: Iterable[T] and a long-running function without any relevant side effects: f: (T => Unit). Right now this is applied to vals in the obvious way:

vals.foreach(f)

I would like the calls to f to be done concurrently (within reasonable limits). Is there an obvious function somewhere in the Scala base library? Something like:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

While f is reasonably long running, it is short enough that I don't want the overhead of invoking a thread for each call, so I am looking for something based on a thread pool.

7 Answers 7

23

Many of the answers from 2009 still use the old scala.actors.Futures._, which are no longer in the newer Scala. While Akka is the preferred way, a much more readable way is to just use parallel (.par) collections:

vals.foreach { v => f(v) }

becomes

vals.par.foreach { v => f(v) }

Alternatively, using parMap can appear more succinct though with the caveat that you need to remember to import the usual Scalaz*. As usual, there's more than one way to do the same thing in Scala!

0
13

Scalaz has parMap. You would use it as follows:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

This will equip every functor (including Iterable) with a parMap method, so you can just do:

vals.parMap(f)

You also get parFlatMap, parZipWith, etc.

10

I like the Futures answer. However, while it will execute concurrently, it will also return asynchronously, which is probably not what you want. The correct approach would be as follows:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }
6
  • 8
    Be careful that vals is a strict collection -- if it's lazy (and in Scala 2.7 this includes the Range class), the futures won't be created until each one is needed by foreach, and nothing will happen in parallel.
    – Ken Bloom
    Commented Nov 18, 2009 at 5:34
  • I suppose we could solve that problem by injecting another foreach call between the map and the current foreach. Thus: vals map { x => future { f(x) } } foreach { x => x } foreach { _() } Commented Nov 18, 2009 at 15:43
  • That would be a map we have to inject, not another foreach? And it is not clear to me that the map of a lazy collection is strict. The safest way may be to call toArray. Commented Nov 20, 2009 at 22:50
  • You're right, foreach was (obviously) the wrong thing to inject since it returns Unit. My bad! :-) The map function on lazy collections is almost always non-strict, so we can either call toList (or toArray), or we can project and then force: (vals map { x => future { f(x) } } projection).force foreach { _() }. I don't know whether that's better than simply toList, but it is certainly different. Commented Nov 22, 2009 at 2:01
  • What does it mean when you say it "returns asynchronously"? Does it imply that it is non-blocking? (and why would that be a problem?)
    – Jus12
    Commented Aug 5, 2014 at 14:24
3

I had some issues using scala.actors.Futures in Scala 2.8 (it was buggy when I checked). Using java libs directly worked for me, though:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}
2

I'd use scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t)))
2

The latest release of Functional Java has some higher-order concurrency features that you can use.

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

And then...

par.parMap(vals, f)

Remember to shutdown the pool.

0

You can use the Parallel Collections from the Scala standard library. They're just like ordinary collections, but their operations run in parallel. You just need to put a par call before you invoke some collections operation.

import scala.collection._

val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString

Not the answer you're looking for? Browse other questions tagged or ask your own question.