\(\) \(%\newcommand{\CLRS}{\href{https://mitpress.mit.edu/books/introduction-algorithms}{Cormen et al}}\) \(\newcommand{\CLRS}{\href{https://mitpress.mit.edu/books/introduction-algorithms}{Introduction to Algorithms, 3rd ed.} (\href{http://libproxy.aalto.fi/login?url=http://site.ebrary.com/lib/aalto/Doc?id=10397652}{online via Aalto lib})}\) \(\newcommand{\SW}{\href{http://algs4.cs.princeton.edu/home/}{Algorithms, 4th ed.}}\) \(%\newcommand{\SW}{\href{http://algs4.cs.princeton.edu/home/}{Sedgewick and Wayne}}\) \(\) \(\newcommand{\Java}{\href{http://java.com/en/}{Java}}\) \(\newcommand{\Python}{\href{https://www.python.org/}{Python}}\) \(\newcommand{\CPP}{\href{http://www.cplusplus.com/}{C++}}\) \(\newcommand{\ST}[1]{{\Blue{\textsf{#1}}}}\) \(\newcommand{\PseudoCode}[1]{{\color{blue}\textsf{#1}}}\) \(\) \(%\newcommand{\subheading}[1]{\textbf{\large\color{aaltodgreen}#1}}\) \(\newcommand{\subheading}[1]{\large{\usebeamercolor[fg]{frametitle} #1}}\) \(\) \(\newcommand{\Blue}[1]{{\color{flagblue}#1}}\) \(\newcommand{\Red}[1]{{\color{aaltored}#1}}\) \(\newcommand{\Emph}[1]{\emph{\color{flagblue}#1}}\) \(\) \(\newcommand{\Engl}[1]{({\em engl.}\ #1)}\) \(\) \(\newcommand{\Pointer}{\raisebox{-1ex}{\huge\ding{43}}\ }\) \(\) \(\newcommand{\Set}[1]{\{#1\}}\) \(\newcommand{\Setdef}[2]{\{{#1}\mid{#2}\}}\) \(\newcommand{\PSet}[1]{\mathcal{P}(#1)}\) \(\newcommand{\Card}[1]{{\vert{#1}\vert}}\) \(\newcommand{\Tuple}[1]{(#1)}\) \(\newcommand{\Implies}{\Rightarrow}\) \(\newcommand{\Reals}{\mathbb{R}}\) \(\newcommand{\Seq}[1]{(#1)}\) \(\newcommand{\Arr}[1]{[#1]}\) \(\newcommand{\Floor}[1]{{\lfloor{#1}\rfloor}}\) \(\newcommand{\Ceil}[1]{{\lceil{#1}\rceil}}\) \(\newcommand{\Path}[1]{(#1)}\) \(\) \(%\newcommand{\Lg}{\lg}\) \(\newcommand{\Lg}{\log_2}\) \(\) \(\newcommand{\BigOh}{O}\) \(%\newcommand{\BigOh}{\mathcal{O}}\) \(\newcommand{\Oh}[1]{\BigOh(#1)}\) \(\) \(\newcommand{\todo}[1]{\Red{\textbf{TO DO: #1}}}\) \(\) \(\newcommand{\NULL}{\textsf{null}}\) \(\) \(\newcommand{\Insert}{\ensuremath{\textsc{insert}}}\) \(\newcommand{\Search}{\ensuremath{\textsc{search}}}\) \(\newcommand{\Delete}{\ensuremath{\textsc{delete}}}\) \(\newcommand{\Remove}{\ensuremath{\textsc{remove}}}\) \(\) \(\newcommand{\Parent}[1]{\mathop{parent}(#1)}\) \(\) \(\newcommand{\ALengthOf}[1]{{#1}.\textit{length}}\) \(\) \(\newcommand{\TRootOf}[1]{{#1}.\textit{root}}\) \(\newcommand{\TLChildOf}[1]{{#1}.\textit{leftChild}}\) \(\newcommand{\TRChildOf}[1]{{#1}.\textit{rightChild}}\) \(\newcommand{\TNode}{x}\) \(\newcommand{\TNodeI}{y}\) \(\newcommand{\TKeyOf}[1]{{#1}.\textit{key}}\) \(\) \(\newcommand{\PEnqueue}[2]{{#1}.\textsf{enqueue}(#2)}\) \(\newcommand{\PDequeue}[1]{{#1}.\textsf{dequeue}()}\) \(\) \(\newcommand{\Def}{\mathrel{:=}}\) \(\) \(\newcommand{\Eq}{\mathrel{=}}\) \(\newcommand{\Asgn}{\mathrel{\leftarrow}}\) \(%\newcommand{\Asgn}{\mathrel{:=}}\) \(\) \(%\) \(% Heaps\) \(%\) \(\newcommand{\Downheap}{\textsc{downheap}}\) \(\newcommand{\Upheap}{\textsc{upheap}}\) \(\newcommand{\Makeheap}{\textsc{makeheap}}\) \(\) \(%\) \(% Dynamic sets\) \(%\) \(\newcommand{\SInsert}[1]{\textsc{insert}(#1)}\) \(\newcommand{\SSearch}[1]{\textsc{search}(#1)}\) \(\newcommand{\SDelete}[1]{\textsc{delete}(#1)}\) \(\newcommand{\SMin}{\textsc{min}()}\) \(\newcommand{\SMax}{\textsc{max}()}\) \(\newcommand{\SPredecessor}[1]{\textsc{predecessor}(#1)}\) \(\newcommand{\SSuccessor}[1]{\textsc{successor}(#1)}\) \(\) \(%\) \(% Union-find\) \(%\) \(\newcommand{\UFMS}[1]{\textsc{make-set}(#1)}\) \(\newcommand{\UFFS}[1]{\textsc{find-set}(#1)}\) \(\newcommand{\UFCompress}[1]{\textsc{find-and-compress}(#1)}\) \(\newcommand{\UFUnion}[2]{\textsc{union}(#1,#2)}\) \(\) \(\) \(%\) \(% Graphs\) \(%\) \(\newcommand{\Verts}{V}\) \(\newcommand{\Vtx}{v}\) \(\newcommand{\VtxA}{v_1}\) \(\newcommand{\VtxB}{v_2}\) \(\newcommand{\VertsA}{V_\textup{A}}\) \(\newcommand{\VertsB}{V_\textup{B}}\) \(\newcommand{\Edges}{E}\) \(\newcommand{\Edge}{e}\) \(\newcommand{\NofV}{\Card{V}}\) \(\newcommand{\NofE}{\Card{E}}\) \(\newcommand{\Graph}{G}\) \(\) \(\newcommand{\SCC}{C}\) \(\newcommand{\GraphSCC}{G^\text{SCC}}\) \(\newcommand{\VertsSCC}{V^\text{SCC}}\) \(\newcommand{\EdgesSCC}{E^\text{SCC}}\) \(\) \(\newcommand{\GraphT}{G^\text{T}}\) \(%\newcommand{\VertsT}{V^\textup{T}}\) \(\newcommand{\EdgesT}{E^\text{T}}\) \(\) \(%\) \(% NP-completeness etc\) \(%\) \(\newcommand{\Poly}{\textbf{P}}\) \(\newcommand{\NP}{\textbf{NP}}\) \(\newcommand{\PSPACE}{\textbf{PSPACE}}\) \(\newcommand{\EXPTIME}{\textbf{EXPTIME}}\)

Example: Implementing parallel count

As a simple example of using the Fork-join framework, let’s implement a parallel version of the count method found in Scala collections. It applies the argument predicate pred to all the elements in an array a and returns the number of elements that satisfy the predicate. That is, we want to build a parallel algorithm that computes the same as the standard Scala library method application

a.count(pred).

Observe that in the scala.collection.parallel.mutable library of Scala 2.12, this kind of parallel version is already implemented and one can use it with a.par.count(pred). But let’s build our own with the fork-join framework and see how it compares to the standard library implementation. Our sequential reference version used in the benchmarks, call it the “while-loops” version in the sequel, is as follows:

def count[A](a: Array[A], pred: A => Boolean): Int = {
  var count = 0
  var i = 0
  while(i < a.length) {
    if(pred(a(i))) count += 1
    i += 1
  }
  count
}

Assuming that each predicate evaluation takes constant time, the running time of the sequential version is \( \Theta(n) \).

A non-recursive parallel version

In our first parallel version, we simply

  1. divide the array of \( n \) elements into \( k \) roughly equal-sized chunks,

  2. compute, in parallel, the number of elements satisfying the predicate in each chunk, and

  3. return the sum of these sub-results.

_images/par-count-lin-8.png

Let’s study the work and span of this parallel version.

  • ​ If the chunk size is roughly \( d \) for some constant \( d \), then there are \( k \approx \frac{n}{d} \) chunks. Creating the tasks and computing the final sum thus take linear time \( \Theta(n/d) = \Theta(n) \). Evaluating the elements in each chunk in turn takes constant time \( \Theta(d) \). Thus the work is \( \Theta(n/d + d n/d + n / d) = \Theta(n) \) and the span is \( \Theta(n/d + d + n/d) = \Theta(n) \).

    As a summary: if one divides the array into too many chunks, then the times needed to launch the tasks and collect the results may start to play a significant role in the running times.

  • ​ On the other hand, if the number of chunks \( k \) is a constant, then evaluating the predicate in each chunk takes time \( \Theta(\frac{n}{k}) \). The work is now \( \Theta(k + k \frac{n}{k} + k) = \Theta(n) \) while the span is \( \Theta(k + \frac{n}{k} + k) = \Theta(n) \).

Thus in theory the algorithm is not very attractive as the span is linear in both cases. However, this simple version may work well in practise currently as the number of available cores is small compared to the typically large arrays we wish to handle. Suppose that we have \( P \) computing cores in use. What is a good value for \( k \), i.e., how many chunks should we take?

  • ​ If we take \( k = n \), then we get maximum parallelism available from our \( P \) threads but launching the tasks and job scheduling probably becomes too costly when \( n \) is large.

  • ​ If we take \( k = P \) and the computing time of the predicate is not always the same, then there is a risk that some tasks take much longer time than some others and we do not get maximum parallelism as some cores are idle while some are still working.

  • ​ Maybe a good compromise is to take \( k = 16 P \) (or \( n \) if \( n < 16 P \)) so that there are enough but not too many tasks.

The algorithm is easy to implement with our par.task construction.

def countPar[A](a: Array[A], pred: A => Boolean): Int = {
  val N = a.length
  val P: Int = par.getParallelism
  val nofTasks = N min P * 16
  val starts = (0 to nofTasks).map(i => (N * i) / nofTasks)
  val intervals = (starts zip starts.tail)
  // Start the parallel tasks
  val countTasks = intervals.map({
    case (start, end) => par.task {
      var i = start
      var count = 0  // A private variable for the sub-result
      while(i < end) {
	if(pred(a(i)))
	  count += 1
	i += 1
      }
      count  // Return the sub-result
    }
  })
  // Wait until all tasks have finished, get the sub-results
  val counts = countTasks.map(_.join())
  // Return the sum of the sub-results
  counts.sum
}

Incorrect: data races

To illustrate a typical mistake made in implementing concurrent programs, consider the incorrect Scala program variant below. As the result variable is shared between the concurrent tasks, it contains a data race and will most likely give an incorrect result.

def countParBad[A](a: Array[A], pred: A => Boolean): Int = {
  val N = a.length
  val P: Int = par.getParallelism
  val nofTasks = N min P * 16
  val starts = (0 to nofTasks).map(i => (N * i) / nofTasks)
  val intervals = (starts zip starts.tail)
  var result = 0  // BAD: a non-atomic variable shared by tasks
  val countTasks = intervals.map({
    case (start, end) => par.task {
      var i = start
      while(i < end) {
	if(pred(a(i)))
	  result += 1  // BAD: a non-atomic update
	i += 1
      }
    }
  })
  // Wait until all tasks have finished
  countTasks.foreach(_.join())
  result // Return the most-likely-INCORRECT result
}

Potentially inefficient: atomic updates

In this example, the data race problem discussed above could be fixed by using atomic variable updates (see, for instance, Atomic Variables in the Java Tutorials or the atomic class in the C++ standard library).

def countParAtomic[A](a: Array[A], pred: A => Boolean): Int = {
  val N = a.length
  val P: Int = par.getParallelism
  val nofTasks = N min P * 16
  val starts = (0 to nofTasks).map(i => (N * i) / nofTasks)
  val intervals = (starts zip starts.tail)
  val result = new java.util.concurrent.atomic.AtomicInteger()
  val countTasks = intervals.map({
    case (start, end) => par.task {
      var i = start
      while(i < end) {
	if(pred(a(i))) result.addAndGet(1) // Atomically increment result by one
	i += 1
      }
    }
  })
  // Wait until all tasks have finished
  countTasks.foreach(_.join())
  result.get()
}

However, the atomic updates require some syncronization and thus can incur a considerable performance penalty if the atomic updates are done very often. As an example, the plot below compares the performance of this atomic update version to that of the sequential and parallel versions above on inputs that cause very frequent atomic updates. The difference would not be that radical on inputs where the predicate evaluation takes more time and fewer of the input elements actually fulfill the predicate, though.

_images/count-even-2019-atomic.jpg

Potentially inefficient: false sharing

To illustrate a second potential source of inefficiency called false sharing, consider the Scala version below. The code does not have data races as the sums are computed in different elements in the results array. However, parallel tasks may write to neighbouring indices of the results array. This may cause inoptimal performance for the following reason. When a core reads a value from the memory, the value is read to a cache memory that is faster and closer to the core than the main memory. The cache entries, called cache lines, do not cache individual bytes or words of the main memory but larger chunks. For instance, in the current Intel processors, the size of a cache line is 64 bytes, which equals to 16 32-bit integers, for instance. If different cores write to the same cache line, a cache coherence protocal has to maintain their consistency and this causes extra communication between the cores. In our example, different tasks may write to the same cache line if they write to neighbouring indices of the results array.

def countParFalseSharing[A](a: Array[A], pred: A => Boolean): Int = {
  val N = a.length
  val P: Int = par.getParallelism
  val nofTasks = N min P * 16
  val starts = (0 to nofTasks).map(i => (N * i) / nofTasks)
  val intervals = (starts zip starts.tail);
  val results = Array.ofDim[Int](nofTasks)  // Results of the tasks
  val countTasks = intervals.zipWithIndex.map({
    case ((start, end), index) => par.task {
      var i = start
      while(i < end) {
	if(pred(a(i)))
	  results(index) += 1 // No data race but may write to the same cache line
	i += 1
      }
    }
    })
  // Wait until all tasks have finished
  countTasks.foreach(_.join())
  // Return the sum of the subresults
  results.sum
}

The plot below shows that false sharing can cause significant performance penalty if writes to the same cache line are very frequent. Again, the performance penalty would not be that large on inputs that cause less frequent writes to the results array (predicate evaluation takes more time or less elements satisfy the predicate).

_images/count-even-2019-false-sharing.jpg

Our first parallel version did not have this false sharing problem because each sub-sum is computed in a private variable count, which is stored in a register or in the stack memory of the executing thread.

A recursive version

We can also make a recursive version. It follows the divide-and-conquer paradigm:

  • ​ If the array is small enough, compute the result sequentially.

  • ​ Otherwise, split the array in two equally-size sub-arrays.

    Then, recursively and in parallel, compute the results for the two sub-arrays.

    Finally, return the sum of the two sub-array results.

_images/par-count-rec-8.png

In this recursive version, the creation of new tasks as well as the final sum computation are also parallelized. If we split the array until there is only one element left, the worst-case span can be obtained from the recurrence $$T(1) = c \text{ and } T(n) = d+ \max\Set{T(\Floor{n/2}),T(\Ceil{n/2})}$$ where \( c \) is the time taken for the predicate computation and \( d \) is some constant. Substituting \( n = 2^k \) for some non-negative \( k \), we get \( T(2^k) = d + T(2^{k-1}) = … = kd + c \). Thus the worst-case span is \( T(n) = c + d\Lg{n} = \Theta(c + \Lg{n}) \). If the predicate can be evaluated in constant time, it is possible to count the number of elements satisfying the predicate in logarithmic time when \( \Theta(n) \) computing resource are available.

The work is also \( \Theta(n+cn)=\Theta(cn) \) because the recursive divisions and the final sum computation are done with \( \Theta(n) \) steps.

We can implement the recursive version with the par.parallel methods instead of par.task. In practise, we do not perform full recursion but stop when “small enough” sub-arrays are considered. Thus we have to decide how large is a “small enough” sub-array:

  • ​ It cannot be \( 1 \) on large arrays as then the parallel forks take too much time

  • ​ Neither should it be \( n / P \) if the predicate computing times can vary

  • ​ Let’s use \( n / (16 P) \) as a compromise

def countParRec[A](a: Array[A], pred: A => Boolean): Int = {
  val P: Int = par.getParallelism
  val smallArrayThreshold: Int = 1 max a.length / (P * 16)
  def inner(start: Int, end: Int): Int = {
    if(end - start <= smallArrayThreshold) {
      // Small sub-arrays are handled sequentially
      var count = 0
      var i = start
      while(i <= end) {
	if(pred(a(i))) count += 1
	i += 1
      }
      count
    } else {
      // Longer sub-arrays are split and handled (recursively) in parallel
      val mid = start + (end - start) / 2
      val (countLeft, countRight) = par.parallel(
	inner(start, mid),
	inner(mid+1, end)
      )
      // Return the sum of the sub-array results
      countLeft + countRight
    }
  }
  inner(0, a.length-1)
}

Experimental evaluation

Let’s benchmark the Scala codes given above against themselves and the methods Array.count(pred) and Array.par.count(pred) in the Scala library. Again, we use a Linux Ubuntu machine with Intel Xeon E31230 quad-core CPU running at 3.2GHz and report wallclock times.

In the first benchmark family, we count the number of even integers in random integer arrays. As can be seen in the plot below, we obtain a speedup of \( \approx 1.7 \) when compared to the sequential “while-loop” version on large arrays. In this case, the predicate evaluation is very fast and the task creation, scheduling and joining times can play a role in that the speedup is not close to four. Our non-recursive and recursive parallel versions perform very similarly as expected: at the end, they split the array in roughly same number of equally-sized chunks and parallizing the task creation phase does not help too much when the number of available cores (and thus the number of chunks) is relatively small. On this kind of benchmark, the count method of Scala collections performs slightly worse, probably because its code is build by using generic templates to avoid code duplication.

_images/count-even-2018.jpg

As a second benchmark family, let’s see what happens if we count the amount of prime numbers in an array of non-negative random numbers. The applied non-primality test is the straightforward one checking whether there is an integer \( i \in [0 .. \lceil\sqrt{x}\rceil] \) dividing \( x \). The results below show a speedup of almost 4 with all the parallel versions. The explanation is that here the time is spent mostly in the primality tests and the task creation times etc do not play that significant role.

_images/count-primes-2018.jpg