0

So I have this chunk of code

dbs.foreach({
  var map = scala.collection.mutable.Map[String, mutable.MutableList[String]]()
  db =>
    val resultList = getTables(hive, db)
    map+=(db -> resultList)
})

What this does is loops through a list of dbs, does a show tables in db call for each db, then adds the db -> table to a map. How can this be done concurrently since there is about a 5 seconds wait time for the hive query to return?

update code --

def getAllTablesConcurrent(hive: JdbcHive, dbs: mutable.MutableList[String]): Map[String, mutable.MutableList[String]] = {
  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
  val futures = dbs.map {
    db =>
        Future(db, getTables(hive, db))
    }
  val map = Await.result( Future.sequence(futures), Duration(10, TimeUnit.SECONDS) ).toMap
  map
}
4
  • 3
    shouldn't the var map = ... be outside of the loop? Commented Jul 27, 2018 at 19:30
  • Please don't update your question with answer code. If you've found an answer sufficiently different from one already submitted then post it as an answer. You are permitted to answer your own question, even if you accept a different answer.
    – jwvh
    Commented Jul 30, 2018 at 19:46
  • @jwvh it's not anwser code, it's sample code that another user was replying to.
    – test acc
    Commented Jul 31, 2018 at 15:29
  • It's not answer code? That's even worse! A week/month/year from now someone looking for help with their concurrency problem will come across this question and see the code, labeled only "update", notice it is very much like the accepted answer code and wonder, "How come this worked for @testacc but not for me?"
    – jwvh
    Commented Jul 31, 2018 at 17:51

3 Answers 3

2

Don't use vars and mutable state, especially if you want concurrency.

 val result: Future[Map[String, Seq[String]] = Future
   .traverse(dbs) { name => 
       Future(name -> getTables(hive, name) )
   }.map(_.toMap)
1

if you want more control (how much time do you want to wait, how many threads do you want to use, what happens if all your threads are busy, etc) you can use ThreadPollExecutor and Future

  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val dbs = List("db1", "db2", "db3")

  val futures = dbs.map {
   name => Future(name, getables(hive, name))
  }

  val result = Await.result( Future.sequence(futures), Duration(TIMEOUT, TimeUnit.MILLISECONDS) ).toMap

just remember not to create a new ExecutionContext every time you need it

7
  • Best way to "remember not to create a new ExecutionContext every time you need it" is to just import scala.concurrent.ExecutionContext.Implicits.global instead of creating one ;)
    – Dima
    Commented Jul 29, 2018 at 12:59
  • I am having one issue with this code chunk. What ever is the last future to finish returns an empty value. getables(hive, name) returns a List object of strings. At random, whatever is the last future to finish, will have an empty list. Is there any fix to this? Edited my post with the exact code chunk I am using.
    – test acc
    Commented Jul 30, 2018 at 17:15
  • Does that happen randomly? How much time it takes to complete all the futures? (I saw you use 10 seconds) It isn't related but remember not to create a new ExecutionContext every time you need it. Import scala.concurrent.ExecutionContext.Implicits.global or create that val outside your method. Commented Jul 30, 2018 at 17:27
  • @SebastianCelestino It happens randomly, but always. I made a very small test sample to use. It has 3 objects it returns futures for, each takes 2~ seconds to complete(Just a sql call). Whatever is the last object to return will just return empty no matter what. I tried 20 futures, the same thing happened, the final future returned empty. The program also does not end when all execution is compete, my very last line of code is a println("finished") and it will just hang there for 5 minutes. Even if I set my timeout to 10 seconds , I tried with a longer and shorter timeout, no change.
    – test acc
    Commented Jul 31, 2018 at 15:28
  • @testacc I couldn't reproduce the error, here is a gist with my code [gist.github.com/rkpost/3fddd93fafa2313c63c6d7a4b9cdea6b] The program doesn't finish because there are non daemon threads inside the ThreadPool. You can solve that in three different ways 1) Using scala.concurrent.ExecutionContext.Implicits.global 2) Setting a custom ThreadFactory in order to crear daemon threads 3) Shuting down the Thread Pool Commented Jul 31, 2018 at 16:57
1

You can use .par on any Scala collection to perform the next transformation in parallel (using default parallelism which depends on number of cores).

Also - easier and cleaner to map into an (immutable) map instead of updating a mutable one.

val result = dbs.par.map(db => db -> getTables(hive, db)).toMap

To have more control on the number of concurrent threads used, see https://docs.scala-lang.org/overviews/parallel-collections/configuration.html

Not the answer you're looking for? Browse other questions tagged or ask your own question.