8

As in my own answer to my own question, I have the situation whereby I am processing a large number of events which arrive on a queue. Each event is handled in exactly the same manner and each even can be handled independently of all other events.

My program takes advantage of the Scala concurrency framework and many of the processes involved are modelled as Actors. As Actors process their messages sequentially, they are not well-suited to this particular problem (even though my other actors are performing actions which are sequential). As I want Scala to "control" all thread creation (which I assume is the point of it having a concurrency system in the first place) it seems I have 2 choices:

  1. Send the events to a pool of event processors, which I control
  2. get my Actor to process them concurrently by some other mechanism

I would have thought that #1 negates the point of using the actors subsystem: how many processor actors should I create? being one obvious question. These things are supposedly hidden from me and solved by the subsystem.

My answer was to do the following:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Is there a better approach? Is this incorrect?

edit: A possibly better approach is:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
1
  • Although actors dont seem to support a pool of workers directly, this Q was helpful in exposing this deficiency. All the docs available to me, dont mention this explicitly.
    – ePharaoh
    Commented Jul 2, 2009 at 15:08

5 Answers 5

8

This seems like a duplicate of another question. So I'll duplicate my answer

Actors process one message at a time. The classic pattern to process multiple messages is to have one coordinator actor front for a pool of consumer actors. If you use react then the consumer pool can be large but will still only use a small number of JVM threads. Here's an example where I create a pool of 10 consumers and one coordinator to front for them.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

This code tests to see which consumer is available and sends a request to that consumer. Alternatives are to just randomly assign to consumers or to use a round robin scheduler.

Depending on what you are doing, you might be better served with Scala's Futures. For instance, if you don't really need actors then all of the above machinery could be written as

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
3
  • Thanks - I wasn't aware that you could invoke tasks on the same scheduler that the actor framework uses. I think the best approach is therefore to use Scheduler.execute(process(e)) Commented Jun 18, 2009 at 7:04
  • Also - yes; it's a very similar question (which I link to) but not quite the same. The first question was "are actors sequential?" whereas the second question was "As actors are sequential, how do I do X" Commented Jun 18, 2009 at 7:35
  • Incidentally: 0 to 10 contains 11 elements, not 10. Commented Oct 13, 2010 at 5:22
3

If the events can all be handled independently, why are they on a queue? Knowing nothing else about your design, this seems like an unnecessary step. If you could compose the process function with whatever is firing those events, you could potentially obviate the queue.

An actor essentially is a concurrent effect equipped with a queue. If you want to process multiple messages simultaneously, you don't really want an actor. You just want a function (Any => ()) to be scheduled for execution at some convenient time.

Having said that, your approach is reasonable if you want to stay within the actors library and if the event queue is not within your control.

Scalaz makes a distinction between Actors and concurrent Effects. While its Actor is very light-weight, scalaz.concurrent.Effect is lighter still. Here's your code roughly translated to the Scalaz library:

val eventProcessor = effect (x => process x)

This is with the latest trunk head, not yet released.

3
  • Thanks! They are on a "queue" purely because I'm sending them to an actor and an actor has a queue, which it processes sequentially. As the actors library is how I'm supposed to handle concurrency (*) in Scala, I'm trying to use it. Otherwise I'd just use ExecutorService.invokeAll. Commented Jun 17, 2009 at 16:16
  • See also my comment to jschen above. I've been writing concurrent code in Java for a long time and trying to find the correct boundary between using actors and, um, not using actors in a scala program which is expected to be concurrent. Commented Jun 17, 2009 at 16:18
  • 1
    Actors are not a panacea, and there's nothing that says you have to use actors if you want concurrency in Scala. It's just a library and, in my opinion, an overly complicated one.
    – Apocalisp
    Commented Jun 17, 2009 at 16:42
1

This sounds like a simple consumer/producer problem. I'd use a queue with a pool of consumers. You could probably write this with a few lines of code using java.util.concurrent.

4
  • The whole point of using the scala actors library is that it can better map your code (written using actors) onto the concurrency available in the current operating environment. So if Scala thinks it's got 4 processors, perhaps it will create a backing thread pool for its actors with 4 workers. I gain nothing by creating my own separate thread pool to execute this work on - all I'll end up with is a load of unnecessary context switching. I'm perfectly aware of how to solve this in Java - I'm asking about how to solve it using the Scala actors library, hence the tags. Commented Jun 17, 2009 at 16:13
  • Sorry, I didn't realize this was an academic exercise with actors. I thought you wanted a good solution to the problem. "So if Scala thinks it's got 4 processors, perhaps it will create a backing thread pool for its actors with 4 workers." This is maybe two lines of code using java.util.concurrent which you can easily use from scala. I use it from jruby all the time.
    – jshen
    Commented Jun 17, 2009 at 16:54
  • Yes, great. But then when I deploy my program onto a 50-core beast, I've gone and hardcoded 2 threads into it. Or at the very least, it renders the way the actor subsystem works entirely pointless because it now doesn't know how many threads it can create. It's not an academic exercise, it's about understanding how to best solve a real world problem with a technology which I'm new to. Commented Jun 17, 2009 at 17:59
  • It is academic if there is a very good solution that you aren't interested in because you want to experiment with some new idea. This is the definition of academic! I never suggested you hardcode the number of threads. Here's a simplified line from a jruby program I am currently working on. JavaConcurrent::Executors.newFixedThreadPool(JavaLang::Runtime.getRuntime.availableProcessors)
    – jshen
    Commented Jun 30, 2009 at 17:03
1

The purpose of an actor (well, one of them) is to ensure that the state within the actor can only be accessed by a single thread at a time. If the processing of a message doesn't depend on any mutable state within the actor, then it would probably be more appropriate to just submit a task to a scheduler or a thread pool to process. The extra abstraction that the actor provides is actually getting in your way.

There are convenient methods in scala.actors.Scheduler for this, or you could use an Executor from java.util.concurrent.

1

Actors are much more lightweight than threads, and as such one other option is to use actor objects like Runnable objects you are used to submitting to a Thread Pool. The main difference is you do not need to worry about the ThreadPool - the thread pool is managed for you by the actor framework and is mostly a configuration concern.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Then to submit a message, say this:

submit(new MyEvent(x))

, which corresponds to

eventProcessor ! new MyEvent(x)

from your question.

Tested this pattern successfully with 1 million messages sent and received in about 10 seconds on a quad-core i7 laptop.

Hope this helps.

Not the answer you're looking for? Browse other questions tagged or ask your own question.