Fork-join framework
Programming with plain threads is tedious and error-prone. Furthermore, creating a new thread is not a cheap operation. Therefore, in practice we often want to use higher-level abstractions that hide these low-level aspects. On the other hand, parallel for-loops offer a convenient approach for parallelizing embarrassingly parallel problems and others. However, in some applications we need bit more control of how to do the parallelization.
In the following, we use the Java fork/join framework. It manages a thread pool so that new threads are not allocated all the time but a pool of them is allocated in the beginning. We can then just submit parallel tasks to the framework and it schedules them to the threads when threads become idle. Contrary to parallel for-loops, the framework also supports nested parallelism, meaning that tasks can submit new parallel tasks. The main thing we have to take care is that the tasks are independent.
Example
We can write our earlier “parallel median and average example” (recall the section Threads) with the Java fork/join framework as follows:
import java.util.concurrent._
import collection.JavaConverters._
// Create the fork-join pool of threads
val forkJoinPool = new ForkJoinPool()
// Construct the tasks
val medianTask = new RecursiveTask[Int] {
def compute = { a.sorted.apply(a.length / 2) }
}
val avgTask = new RecursiveTask[Int] {
def compute = { a.sum / a.length }
}
// Start the tasks
forkJoinPool.execute(medianTask)
forkJoinPool.execute(avgTask)
// Wait until they have finished and get the results
val (median, avg) = (medianTask.join(), avgTask.join())
To write more concise and Scala-style code,
we can write a simple wrapper object, call it par
,
that launches parallel fork-join pool tasks.
It provides two methods:
task(code)
creates and returns a new task that runs in parallel and executes the functioncode
. In order to get the return result of the function and/or to make sure that execution of the function has finished, one should call thejoin
method of the returned task. Calling thejoin
method blocks the calling task until the execution of the functioncode
has finished.
parallel(code1, code2)
executes the functionscode1
andcode2
in parallel and returns their return values once both have finished their execution.
Example
Our running example implemented with the task
method:
val medianTask = par.task {a.sorted.apply(a.length / 2)}
val avgTask = par.task {a.sum / a.length}
val (median,avg) = (medianTask.join(), avgTask.join())
As before, the tasks here are independent because they only read the array but don’t write to it.
Example: Nested parallelism
With the par.parallel
construction we can compute
the median and average in parallel with
val (median, avg) = par.parallel(
{a.sorted.apply(a.length / 2)},
{a.sum / a.length}
)
In nested parallelism, the tasks can spawn new tasks.
This is supported by par.parallel
and
we can compute the median, average, Euclidean norm and
the maximum difference between consecutive elements of an array
in parallel as follows.
val ((median, avg), (norm, maxDiff)) = par.parallel(
par.parallel({a.sorted.apply(a.length / 2)},
{a.sum / a.length}),
par.parallel({math.sqrt(a.map(x => x.toDouble*x).sum)},
{a.sliding(2).map(p => math.abs(p(1) - p(0))).max})
)
The source code for our par
wrapper object is as follows.
Notice the call-by-name constructions of the form code: => T
in the method arguments,
causing that the argument code is not executed before the method is called
but is passed in as a function.
object par:
import java.util.concurrent._
val forkJoinPool = ForkJoinPool()
/** Get the amount of parallelism (number of threads) in use. */
def getParallelism: Int = forkJoinPool.getParallelism()
/** Execute 'code' in parallel in a fork-join pool task.
* For the result, call the 'join' method of the returned task. */
def task[T](code: => T): ForkJoinTask[T] =
val task = new RecursiveTask[T] { def compute = code }
// Put the new task in the fork-join pool
Thread.currentThread match
case wt: ForkJoinWorkerThread => task.fork()
case _ => forkJoinPool.execute(task)
task
/** Execute the functions codeA and codeB in parallel and
* return their return values. */
def parallel[A, B](codeA: => A, codeB: => B): (A, B) =
val taskB = task(codeB) // Start executing codeB in parallel
val resultA = codeA // Execute codeA in this thread now
(resultA, taskB.join()) // Return when codeB has finished
Note: Warning!
There is a caveat in the interface!
As the argument functions are not evaluated immediately but later,
any var
passed as an argument or otherwise referenced in the code
may change value before the function is actually evaluated in a task.
For instance, the following
def fact(i: Int): Int = if(i == 1) 1 else i*fact(i-1)
val tasks = collection.mutable.ArrayBuffer[ForkJoinTask[Int]]()
var i = 1
while i <= 10 do
tasks += par.task(fact(i))
i += 1
println(tasks.map(_.join()))
does not correctly compute the first ten factorials in parallel but may output
ArrayBuffer(24, 24, 24, 24, 5040, 5040, 5040, 362880, 3628800, 39916800)
or something similar. This is because the variable i
may change its value before fact(i)
is executed in a task.
The following works correctly because inside the map
method,
i
is a val
:
def fact(i: Int): Int = if(i == 1) 1 else i*fact(i-1)
val tasks = (1 to 10).map(i => par.task(fact(i)) )
println(tasks.map(_.join()))
References
The interested reader is referred to the following articles that introduced the fork/join framework and explain how it can be implemented.
Robert D. Blumofe and Carles E. Leiserson: Scheduling Multithreaded Computations by Work Stealing. In Proc. FOCS 1994, IEEE, 1994. Pages 356–368. (Access through libproxy.aalto.fi)
Matteo Frigo, Charles E. Leiserson, and Keith H. Randall: The implementation of the Cilk-5 multithreaded language. In Proc. PLDI 1998, ACM, 1998. Pages 212–223.
Robert D. Blumofe and Charles E. Leiserson: Scheduling Multithreaded Computations by Work Stealing. Journal of the ACM 46(5):720–748, 1999.
Doug Lea: A Java fork/join framework. In Proc. Java 2000, ACM, 2000. Pages 36–43.