\(\newcommand{\Java}{\href{http://java.com/en/}{Java}}\) \(\newcommand{\Python}{\href{https://www.python.org/}{Python}}\) \(\newcommand{\CPP}{\href{http://www.cplusplus.com/}{C++}}\) \(\newcommand{\ST}[1]{{\Blue{\textsf{#1}}}}\) \(\newcommand{\PseudoCode}[1]{{\color{blue}\textsf{#1}}}\) \(%\newcommand{\subheading}[1]{\textbf{\large\color{aaltodgreen}#1}}\) \(\newcommand{\subheading}[1]{\large{\usebeamercolor[fg]{frametitle} #1}}\) \(\newcommand{\Blue}[1]{{\color{flagblue}#1}}\) \(\newcommand{\Red}[1]{{\color{aaltored}#1}}\) \(\newcommand{\Emph}[1]{\emph{\color{flagblue}#1}}\) \(\newcommand{\Engl}[1]{({\em engl.}\ #1)}\) \(\newcommand{\Pointer}{\raisebox{-1ex}{\huge\ding{43}}\ }\) \(\newcommand{\Set}[1]{\{#1\}}\) \(\newcommand{\Setdef}[2]{\{{#1}\mid{#2}\}}\) \(\newcommand{\PSet}[1]{\mathcal{P}(#1)}\) \(\newcommand{\Card}[1]{{\vert{#1}\vert}}\) \(\newcommand{\Tuple}[1]{(#1)}\) \(\newcommand{\Implies}{\Rightarrow}\) \(\newcommand{\Reals}{\mathbb{R}}\) \(\newcommand{\Seq}[1]{(#1)}\) \(\newcommand{\Arr}[1]{[#1]}\) \(\newcommand{\Floor}[1]{{\lfloor{#1}\rfloor}}\) \(\newcommand{\Ceil}[1]{{\lceil{#1}\rceil}}\) \(\newcommand{\Path}[1]{(#1)}\) \(%\newcommand{\Lg}{\lg}\) \(\newcommand{\Lg}{\log_2}\) \(\newcommand{\BigOh}{O}\) \(\newcommand{\Oh}[1]{\BigOh(#1)}\) \(\newcommand{\todo}[1]{\Red{\textbf{TO DO: #1}}}\) \(\newcommand{\NULL}{\textsf{null}}\) \(\newcommand{\Insert}{\ensuremath{\textsc{insert}}}\) \(\newcommand{\Search}{\ensuremath{\textsc{search}}}\) \(\newcommand{\Delete}{\ensuremath{\textsc{delete}}}\) \(\newcommand{\Remove}{\ensuremath{\textsc{remove}}}\) \(\newcommand{\Parent}[1]{\mathop{parent}(#1)}\) \(\newcommand{\ALengthOf}[1]{{#1}.\textit{length}}\) \(\newcommand{\TRootOf}[1]{{#1}.\textit{root}}\) \(\newcommand{\TLChildOf}[1]{{#1}.\textit{leftChild}}\) \(\newcommand{\TRChildOf}[1]{{#1}.\textit{rightChild}}\) \(\newcommand{\TNode}{x}\) \(\newcommand{\TNodeI}{y}\) \(\newcommand{\TKeyOf}[1]{{#1}.\textit{key}}\) \(\newcommand{\PEnqueue}[2]{{#1}.\textsf{enqueue}(#2)}\) \(\newcommand{\PDequeue}[1]{{#1}.\textsf{dequeue}()}\) \(\newcommand{\Def}{\mathrel{:=}}\) \(\newcommand{\Eq}{\mathrel{=}}\) \(\newcommand{\Asgn}{\mathrel{\leftarrow}}\) \(%\newcommand{\Asgn}{\mathrel{:=}}\) \(%\) \(% Heaps\) \(%\) \(\newcommand{\Downheap}{\textsc{downheap}}\) \(\newcommand{\Upheap}{\textsc{upheap}}\) \(\newcommand{\Makeheap}{\textsc{makeheap}}\) \(%\) \(% Dynamic sets\) \(%\) \(\newcommand{\SInsert}[1]{\textsc{insert}(#1)}\) \(\newcommand{\SSearch}[1]{\textsc{search}(#1)}\) \(\newcommand{\SDelete}[1]{\textsc{delete}(#1)}\) \(\newcommand{\SMin}{\textsc{min}()}\) \(\newcommand{\SMax}{\textsc{max}()}\) \(\newcommand{\SPredecessor}[1]{\textsc{predecessor}(#1)}\) \(\newcommand{\SSuccessor}[1]{\textsc{successor}(#1)}\) \(%\) \(% Union-find\) \(%\) \(\newcommand{\UFMS}[1]{\textsc{make-set}(#1)}\) \(\newcommand{\UFFS}[1]{\textsc{find-set}(#1)}\) \(\newcommand{\UFCompress}[1]{\textsc{find-and-compress}(#1)}\) \(\newcommand{\UFUnion}[2]{\textsc{union}(#1,#2)}\) \(%\) \(% Graphs\) \(%\) \(\newcommand{\Verts}{V}\) \(\newcommand{\Vtx}{v}\) \(\newcommand{\VtxA}{v_1}\) \(\newcommand{\VtxB}{v_2}\) \(\newcommand{\VertsA}{V_\textup{A}}\) \(\newcommand{\VertsB}{V_\textup{B}}\) \(\newcommand{\Edges}{E}\) \(\newcommand{\Edge}{e}\) \(\newcommand{\NofV}{\Card{V}}\) \(\newcommand{\NofE}{\Card{E}}\) \(\newcommand{\Graph}{G}\) \(\newcommand{\SCC}{C}\) \(\newcommand{\GraphSCC}{G^\text{SCC}}\) \(\newcommand{\VertsSCC}{V^\text{SCC}}\) \(\newcommand{\EdgesSCC}{E^\text{SCC}}\) \(\newcommand{\GraphT}{G^\text{T}}\) \(%\newcommand{\VertsT}{V^\textup{T}}\) \(\newcommand{\EdgesT}{E^\text{T}}\) \(%\) \(% NP-completeness etc\) \(%\) \(\newcommand{\Poly}{\textbf{P}}\) \(\newcommand{\NP}{\textbf{NP}}\) \(\newcommand{\PSPACE}{\textbf{PSPACE}}\) \(\newcommand{\EXPTIME}{\textbf{EXPTIME}}\)

Fork-join framework

Programming with plain threads is tedious and error-prone. Furthermore, creating a new thread is not a cheap operation. Therefore, in practice we often want to use higher-level abstractions that hide these low-level aspects. On the other hand, parallel for-loops offer a convenient approach for parallelizing embarrassingly parallel problems and others. However, in some applications we need bit more control of how to do the parallelization.

In the following, we use the Java fork/join framework. It manages a thread pool so that new threads are not allocated all the time but a pool of them is allocated in the beginning. We can then just submit parallel tasks to the framework and it schedules them to the threads when threads become idle. Contrary to parallel for-loops, the framework also supports nested parallelism, meaning that tasks can submit new parallel tasks. The main thing we have to take care is that the tasks are independent.

Example

We can write our earlier “parallel median and average example” (recall the section Threads) with the Java fork/join framework as follows:

import java.util.concurrent._
import collection.JavaConverters._

// Create the fork-join pool of threads
val forkJoinPool = new ForkJoinPool()
// Construct the tasks
val medianTask = new RecursiveTask[Int] {
  def compute = { a.sorted.apply(a.length / 2) }
}
val avgTask = new RecursiveTask[Int] {
  def compute = { a.sum / a.length }
}
// Start the tasks
forkJoinPool.execute(medianTask)
forkJoinPool.execute(avgTask)
// Wait until they have finished and get the results
val (median, avg) = (medianTask.join(), avgTask.join())

To write more concise and Scala-style code, we can write a simple wrapper object, call it par, that launches parallel fork-join pool tasks. It provides two methods:

  • task(code) creates and returns a new task that runs in parallel and executes the function code. In order to get the return result of the function and/or to make sure that execution of the function has finished, one should call the join method of the returned task. Calling the join method blocks the calling task until the execution of the function code has finished.

  • parallel(code1, code2) executes the functions code1 and code2 in parallel and returns their return values once both have finished their execution.

Example

Our running example implemented with the task method:

val medianTask = par.task {a.sorted.apply(a.length / 2)}
val avgTask = par.task {a.sum / a.length}
val (median,avg) = (medianTask.join(), avgTask.join())

As before, the tasks here are independent because they only read the array but don’t write to it.

Example: Nested parallelism

With the par.parallel construction we can compute the median and average in parallel with

val (median, avg) = par.parallel(
  {a.sorted.apply(a.length / 2)},
  {a.sum / a.length}
)

In nested parallelism, the tasks can spawn new tasks. This is supported by par.parallel and we can compute the median, average, Euclidean norm and the maximum difference between consecutive elements of an array in parallel as follows.

val ((median, avg), (norm, maxDiff)) = par.parallel(
  par.parallel({a.sorted.apply(a.length / 2)},
	       {a.sum / a.length}),
  par.parallel({math.sqrt(a.map(x => x.toDouble*x).sum)},
	       {a.sliding(2).map(p => math.abs(p(1) - p(0))).max})
)

The source code for our par wrapper object is as follows. Notice the call-by-name constructions of the form code: => T in the method arguments, causing that the argument code is not executed before the method is called but is passed in as a function.

object par:
  import java.util.concurrent._
  val forkJoinPool = ForkJoinPool()

  /** Get the amount of parallelism (number of threads) in use. */
  def getParallelism: Int = forkJoinPool.getParallelism()

  /** Execute 'code' in parallel in a fork-join pool task.
   * For the result, call the 'join' method of the returned task. */
  def task[T](code: => T): ForkJoinTask[T] =
    val task = new RecursiveTask[T] { def compute = code }
    // Put the new task in the fork-join pool
    Thread.currentThread match
      case wt: ForkJoinWorkerThread => task.fork()
      case _ => forkJoinPool.execute(task)
    task

  /** Execute the functions codeA and codeB in parallel and
   * return their return values. */
  def parallel[A, B](codeA: => A, codeB: => B): (A, B) =
    val taskB = task(codeB) // Start executing codeB in parallel
    val resultA = codeA     // Execute codeA in this thread now
    (resultA, taskB.join()) // Return when codeB has finished

Note: Warning!

There is a caveat in the interface! As the argument functions are not evaluated immediately but later, any var passed as an argument or otherwise referenced in the code may change value before the function is actually evaluated in a task. For instance, the following

def fact(i: Int): Int = if(i == 1) 1 else i*fact(i-1)
val tasks = collection.mutable.ArrayBuffer[ForkJoinTask[Int]]()
var i = 1
while i <= 10 do
  tasks += par.task(fact(i))
  i += 1
println(tasks.map(_.join()))

does not correctly compute the first ten factorials in parallel but may output

ArrayBuffer(24, 24, 24, 24, 5040, 5040, 5040, 362880, 3628800, 39916800)

or something similar. This is because the variable i may change its value before fact(i) is executed in a task.

The following works correctly because inside the map method, i is a val:

def fact(i: Int): Int = if(i == 1) 1 else i*fact(i-1)
val tasks = (1 to 10).map(i => par.task(fact(i)) )
println(tasks.map(_.join()))

References

The interested reader is referred to the following articles that introduced the fork/join framework and explain how it can be implemented.

Robert D. Blumofe and Carles E. Leiserson: Scheduling Multithreaded Computations by Work Stealing. In Proc. FOCS 1994, IEEE, 1994. Pages 356–368. (Access through libproxy.aalto.fi)

Matteo Frigo, Charles E. Leiserson, and Keith H. Randall: The implementation of the Cilk-5 multithreaded language. In Proc. PLDI 1998, ACM, 1998. Pages 212–223.

Robert D. Blumofe and Charles E. Leiserson: Scheduling Multithreaded Computations by Work Stealing. Journal of the ACM 46(5):720–748, 1999.

Doug Lea: A Java fork/join framework. In Proc. Java 2000, ACM, 2000. Pages 36–43.