0

Using higher order methods in Scala, I can perform element wise operation on given collection as below

def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)

and using imperative way, I can do same operation much faster than fun1

def fun2(a1: Array[Double], a2: Array[Double]): Array[Double] = {
  val res = new Array[Double](a1.length)
  var i = 0
  while (i < a1.length) {
    res(i) = a1(i) + a2(i)
    i += 1
  }
  res
}

I want to write a parallel function that perform same operation. Using Scala, Can I paralyze any of above functions? If not how can I write parallel function that achieves pure functional parallelism for element wise operations on collections?

1
  • 2
    For the first one you may use the parallel collections, for the second one you may just start many futures each one in charge of some subset of the array. Commented Jan 10, 2020 at 16:37

2 Answers 2

1
sergey$ SBT_OPTS="-Xmx2G -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xss2M  -Duser.timezone=GMT" sbt console
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=2G; support was removed in 8.0
[info] Loading global plugins from /Users/rsergey/.sbt/1.0/plugins
[info] Loading project definition from /Users/rsergey/project
[info] Set current project to rsergey (in build file:/Users/rsergey/)
[info] Starting scala interpreter...
Welcome to Scala 2.12.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231).
Type in expressions for evaluation. Or try :help.

scala> def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)
fun1: (l1: List[Double], l2: List[Double])List[Double]

scala> import scala.collection.parallel.immutable.ParSeq
import scala.collection.parallel.immutable.ParSeq

scala> def parFun1(l1: ParSeq[Double], l2: ParSeq[Double]) = l1.zip(l2).map{case (x,y)=>(x+y)}
parFun1: (l1: scala.collection.parallel.immutable.ParSeq[Double], l2: scala.collection.parallel.immutable.ParSeq[Double])scala.collection.parallel.immutable.ParSeq[Double]


scala> val l1 = Range(0,5000000).map(_.toDouble).toList
l1: List[Double] = List(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0, 116.0, 117.0, 118.0, 119.0, 120.0, 121.0, 122...

scala> val l2 = Range(-5000000, 0).map(_.toDouble).toList
l2: List[Double] = List(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999942.0, -4999941.0, -4999940.0, -4999939.0, -49...

scala> val l1par = l1.par
l1par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0,...

scala> val l2par = l2.par
l2par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999...

scala> def time[R](block: => R): R = {val t0 = System.nanoTime(); val result = block; val t1 = System.nanoTime(); println("Elapsed time: " + (t1 - t0) + "ns"); result }
time: [R](block: => R)R

scala> time { fun1(l1, l2) }
Elapsed time: 3928108671ns
res2: List[Double] = List(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -4999884.0, -4999882.0, -4999880.0, -4999878.0, -...


scala> time { parFun1(l1par, l2par) }
Elapsed time: 292256058ns
res5: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -49998...
2
  • why you used Zip instead of Zipped? As Zipped is faster than Zip.
    – Asif
    Commented Jan 11, 2020 at 6:51
  • @user12140540 I didn't know ParVector tuples have zipped available. Could share a code snippet demonstrating that? Commented Jan 13, 2020 at 19:58
1

For price of parallelism to pay-off the per-element workload should likely be sufficiently heavyweight. Below are benchmarks of few alternatives based on sequential collections, parallel collections, and Futures. We compare both when workload of operation is lightweight (adding two numbers) as well as when simulating a heavy operation with Thread.sleep(1):

  • sequentialArray: sequentially work on an Array (based on Travis)
  • futureArray: logically split an Array into chunks and have separate Futurea work on each chunk
  • parallelArray: use parallel collections to work on an Array (based on Luis, axel22)
  • parallelListZip: use parallel collections to zip and work on List (based on axel22)

Implementation:

@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class So59685582 {
  val simulateHeavyWorkload = ???
  val length = ???
  val as = Array.fill(length)(math.random)
  val bs = Array.fill(length)(math.random)

  def sequentialArray(as: Array[Double], bs: Array[Double]): Array[Double] = {
    val length = as.length
    val out = new Array[Double](length)
    var i = 0
    while (i < length) {
      if (simulateHeavyWorkload) Thread.sleep(1)
      out(i) = as(i) * bs(i)
      i += 1
    }
    out
  }

  def futureArray(as: Array[Double], bs: Array[Double], numThreads: Int): Array[Double] = {
    val length = as.length
    val out = new Array[Double](length)
    val chunkSize = length / numThreads
    val fs =
      (0 until numThreads).map { t =>
        var i = t * chunkSize
        val to = ((t + 1) * chunkSize)
        Future {
          while (i <= to) {
            if (simulateHeavyWorkload) Thread.sleep(1)
            out(i) = as(i) * bs(i)
            i += 1
          }
        }
      }
    Await.ready(Future.sequence(fs), Duration.Inf)
    out
  }

  def parallelArray(as: Array[Double], bs: Array[Double]): Array[Double] = {
    val length = as.length
    val out = new Array[Double](length)
    (0 until length).par.foreach { i =>
      if (simulateHeavyWorkload) Thread.sleep(1)
      out(i) = as(i) + bs(i)
    }
    out
  }

  def parallelListZip(as: List[Double], bs: List[Double]): List[Double] = {
    as.par.zip(bs.par).map { case (a, b) =>
      if (simulateHeavyWorkload) Thread.sleep(1)
      a + b
    }.to(List)
  }

  @Benchmark def _sequentialArray: Array[Double] = sequentialArray(as, bs)
  @Benchmark def _futureArray: Array[Double] = futureArray(as, bs, numThreads = 12)
  @Benchmark def _parallelArray: Array[Double] = parallelArray(as, bs)
  @Benchmark def _parallelListZip: List[Double] = parallelListZip(as.toList, bs.toList)
}

Results of sbt "jmh:run -i 10 -wi 10 -f 2 -t 1 bench.So59685582":

Test 1

val simulateHeavyWorkload = true
val length = 1000

[info] Benchmark                     Mode  Cnt  Score   Error  Units
[info] So59685582._futureArray      thrpt   20  9.251 ± 0.034  ops/s
[info] So59685582._parallelArray    thrpt   20  6.493 ± 0.175  ops/s
[info] So59685582._parallelListZip  thrpt   20  6.379 ± 0.117  ops/s
[info] So59685582._sequentialArray  thrpt   20  0.790 ± 0.007  ops/s

Test 2

val simulateHeavyWorkload = false
val length = 1000

[info] So59685582._futureArray      thrpt   20    27097.347 ±   369.995  ops/s
[info] So59685582._parallelArray    thrpt   20    17864.004 ±   163.846  ops/s
[info] So59685582._parallelListZip  thrpt   20     2942.416 ±   108.180  ops/s
[info] So59685582._sequentialArray  thrpt   20  1773303.066 ± 55856.225  ops/s

Test 3

val simulateHeavyWorkload = false
val length = 10000000

[info] Benchmark                     Mode  Cnt   Score   Error  Units
[info] So59685582._futureArray      thrpt   20  50.271 ± 1.444  ops/s
[info] So59685582._parallelArray    thrpt   20  53.998 ± 1.397  ops/s
[info] So59685582._parallelListZip  thrpt   20   0.167 ± 0.040  ops/s
[info] So59685582._sequentialArray  thrpt   20  55.183 ± 1.025  ops/s

Findings

  • When operation was lightweight sequential processing performed better or approximately equal to parallel processing, even with big size of 10000000 elements.
  • When size was small (1000) parallel processing was orders of magnitude slower on a lightweight operation
  • When operation was heavyweight parallel processing performed better than sequential within an order
  • futureArray performed best when number of threads was 12, which is the number of cores on my machine as per availableProcessors. More than that resulted in degraded performance.
  • parallelListZip which uses List, zip, and map had similar performance as parallelArray which uses mutability and while loop, when operation was heavyweight and size was not too big (1000).
3
  • 1
    I tried different ways and different collections including parallel too, I also notices that for smaller size collections, sequential version was fast than a parallelized one. And you showed this with benchmarks too.
    – Asif
    Commented Jan 11, 2020 at 18:56
  • In my scenario I have two arrays and I have to do element wise sum on these two for 1 million iterations and I founded that parallel version is not suitable for this problem
    – Asif
    Commented Jan 11, 2020 at 18:58
  • 1
    @user12140540 It is likely due to elementwise sum being too lightweight of an operation. Commented Jan 11, 2020 at 19:00

Not the answer you're looking for? Browse other questions tagged or ask your own question.