This post explores some basic techniques for writing high performance stream processing code using scalaz-stream. But before getting to that, I'm going to start by looking at something even simpler: summing up 10,000 doubles which are already sitting in memory.

Functional programming is all well and good, but all that nice compositionality and code reuse comes at a price, at least in current language runtimes--we frequently end up having to operate on even primitive data using boxed representations. All else being equal, we'd much prefer to operate on integers and doubles in their unboxed form. When performed on the unboxed representations, adding two numbers requires zero heap allocation. In boxed form, adding two numbers requires dereferencing both values on the heap to obtain the unboxed primitives inside, performing the operation on the unboxed values, and stuffing the result into a newly allocated heap value. Even with modern bump-a-counter generational garbage collectors, it's a substantial performance hit when your baseline for the same operation is a *single machine instruction*.

How does boxed arithmetic compare to unboxed arithmetic for this simple task of summing 10,000 doubles? It's about an order of magnitude slower, in my simple tests (full code). Aside: for all the numbers in this post, I'm not doing serious profiling, just trying to get a rough ballpark using a simple timing function, see the code for details.

```
val nums: Array[Double] =
Array.tabulate(N)(_.toDouble)
val numsVec: Vector[Double] =
Vector.tabulate(N)(_.toDouble)
def sum(ds: Array[Double], start: Int, stop: Int): Double = {
var total = 0.0
var i: Int = 0
while (i < stop) { total += ds(i); i += 1 }
total
}
```

Summing these N elements via `sum(nums)`

, which runs fully unboxed, is more than 10x faster compared to doing boxed arithmetic via `numsVec.sum`

:

```
N: 10000
unboxed-in-memory: 2.3754119873046876E-5
boxed-in-memory: 4.5867919921875E-4
```

Yes, computers are fast. That is, my 1.86 GHz CPU can sum up about 10,000 / 2.37e-5, or 421 million doubles per second, consistent with each iteration taking just a few machine instructions. Another way of stating this is that my CPU can process over 3GB worth of data per second (412 million * 8 bytes per double), at least for this simple calculation. (As an aside, this amazing throughput is one reason why so many programs end up being I/O bound--a very large percentage of useful software consists of simple computations not all that much more complicated than summing some numbers up. Once the data for these computations is in memory and in the right form, performing the computation can be done very quickly. Since it's rare to have that much data just sitting around ready to be processed, we'll generally need to be pulling this data from external systems whose throughput is much more limited than what the CPU could theoretically handle.)

So the boxed summation takes about 10x longer. What can be done? We could attempt to *statically* inline or `@specialized`

away all polymorphism. This works in some limited use cases, but it's pretty brittle: to reap the benefits, your entire call chain must be specialized in this way. Want to use `Option`

somewhere in your code? If you do, you pay for boxing and unboxing when entering and leaving `Option`

. You could introduce `SpecializedOption`

and essentially replicate the API of `Option`

, but this obviously gets ugly. And of course the compiler provides little help if you go down this route--it will gleefully insert boxing and unboxing at boundaries between specialized/monomorphic and unspecialized code, without warning you.

A little thought reveals that we can do a bit better than `numsVec.sum`

, which is parameterized on a `Numeric`

that can only operate on values *two at a time* forcing reboxing *after each single number* is added to the total:

`def sum[B >: A](implicit num: Numeric[B]): B`

The critical function called by `sum`

is `Numeric.plus`

, which has the signature `def plus(x: A, y: A): A`

. Since this is the only function the polymorphic `sum`

has access to, it has no choice but to send boxed values to the `Numeric`

instance. Due to polymorphism, this must also return its result boxed. What we can do instead is avoid boxing any values until the end of the computation, by processing the collection in one or more chunks. Here's the code for processing a chunk:

```
def sumOf(ds: TraversableOnce[Double]): Double = {
var total = 0.0
ds.foreach { d => total += d }
total
}
```

Let's compare `sumOf(numsVec)`

to the other implementations. Think about what will happen--`total`

is an unboxed `Double`

. For each iteration through `ds`

, we only have to *unbox* the current value and add this unboxed value to the total, which requires no allocation. Only when we return `total`

in a polymorphic context will we be forced to rebox the result. We've gone from `N`

allocations, with allocation happening in our main loop, to (at most) *one* allocation. Unfortunately, this only shaves off between 10-40%:

```
N: 10000
unboxed-in-memory: 2.37884521484375E-5
boxed-in-memory: 4.681396484375E-4
chunked-boxed-in-memory: 3.057861328125E-4
```

Are we stuck? No. We can extend this chunking idea one step further back in the call chain. Rather than working with `Vector[Double]`

, in which every single element is individually boxed, we can work instead with a `Vector`

of chunks. (This is a similar idea to *ropes*, often represented as a finger tree of strings.) We can make chunks as big as needed to get the performance on par with the fully unboxed code:

```
val ranges: Vector[(Int,Int)] =
Vector() ++ (0 to N by 2000).sliding(2).map { case Seq(a,b) => (a,b) }
val chunkedNumsVec: Vector[(Array[Double], Int, Int)] =
ranges.map { case (start, stop) => (nums, start, stop) }
```

Factoring `chunkedNumsVec`

a bit, we can package up `(Array[Double], Int, Int)`

as a data type, `Slice`

, and use `Scala`

to specialize for different array types. We'll also add a version of `sum`

that operates on chunks, but *specializes to unboxed arrays at runtime*, falling back to working with boxed values only if needed:

```
case class Slice[@specialized A](
private overall: Array[A],
start: Int,
stop: Int) extends IndexedSeq[A] { ... }
def sumChunk(vs: IndexedSeq[Double]): Double = vs match {
case Slice(vs: Array[Double], i, j) => sum(vs, i, j)
case _ => sumOf(vs)
}
```

With this in place, we can now write a higher order function, `chunkedReduce`

, which reduces a `Vector[IndexedSeq[A]]`

to a single `A`

. Note that `chunkedReduce`

does not need to be specialized, it's just an ordinary higher-order function:

```
def chunkedReduce[A](v: Vector[IndexedSeq[A]])(f: IndexedSeq[A] => A): A =
f(v map f)
```

Now, `chunkedReduce(chunkedNumsVec)(sumChunk)`

will delegate the leaf level chunks to an unboxed summation function, and performance is within a factor of two of the original, totally unboxed implementation:

```
unboxed-in-memory: 2.478790283203125E-5
boxed-in-memory: 4.6490478515625E-4
chunked-boxed-in-memory: 3.12255859375E-4
deep-chunked-boxed-in-memory: 5.8258056640625E-5
```

So to summarize:

- We can eliminate repeated boxing by processing values in chunks and only reboxing at the end of processing each chunk. This gives a modest performance improvement.
- Generally speaking, chunking a computation moves the dominant cost to the "leaf-level" processing of individual chunks. In this regime we can get away with performing specialization
*at runtime*and*only at the leaves*. Because the dominant cost is processing of individual chunks, we aren't forced into trying to specialize the entire call chain statically, as would be necessary when the computational cost were more evenly distributed throughout the tree.

There is definitely some annoyance in having to deal with explicit chunking, but FP saves the day here by letting us factor out and generalize any patterns that start cropping up. For instance, `chunkedReduce`

is one very simple combinator we've discovered so far. We can imagine a whole suite of combinators for creating and working with chunked data in an efficient way.

### Scalaz-stream

This stuff isn't exactly brain surgery, but that's not a bad thing, is it? Let's use the simple ideas we just developed to discover techniques for writing high-performance scalaz-stream code.

Let's start by measuring the *overhead* of interpreting a scalaz-stream program. The interpreter of `Process`

, the function `Process.collect`

, works by repeated pattern matching on the `Process`

it is evaluating, and it builds up a monadic expression (here, the monad used is `Task`

from `scalaz.concurrent`

) which must then be run. Even if the `Process`

isn't doing anything, there's some overhead which will be roughly constant per *step*, and which we can measure with a simple program like: `Process.fill(N)(1).run.run`

. This produces a lazy stream of `N`

elements, all `1`

, and traverses that stream, discarding the results. (Note: the first `run`

is just to traverse a `Process`

, ignoring its output, to get a `Task[Unit]`

, and the second `run`

actually forces that `Task`

)

```
N: 10000
naive-streaming-overhead: 0.008705078125
// recall
unboxed-in-memory: 2.3754119873046876E-5
boxed-in-memory: 4.5867919921875E-4
```

So, in the *best case scenario*, we can do about `10000 / .009`

, or around one million scalaz-stream steps per second.

There are various ways to think about how to make the overhead of scalaz-stream negligible. One is we simply need to do enough work at each step to dwarf the microsecond overhead added per step by scalaz-stream itself. This can be stated more generally: any time we compose two functions `f`

and `g`

, the throughput of `f(g(x))`

can be no better than the minimum of `f`

and `g`

's individual throughputs. Furthermore, if `g`

(the actual work done per step) is the bottleneck and has a cost much greater than `f`

(the overhead of scalaz-stream), there is no gain in optimizing `f`

because the cost of `g`

will continue to dominate the overall computation. Similarly, a scalaz-stream pipeline like `src |> f |> g`

will only run as fast as its slowest stage. And if that slowest stage is by far `src`

, there is no point in going through contortions to optimize `f`

and `g`

. This will be the case in a large number of realistic scenarios, where a pipeline's leftmost `Process`

will be some low throughput I/O bound computation whose cost dwarfs the negligible overhead added by `f`

, `g`

, and scalaz-stream itself.

Now that we've said all that, what if scalaz-stream legitimately is the bottleneck in a program, say if we're trying to write high-performance streaming numeric code? Applying what we've learned, we just need to batch the input into chunks big enough that the time required to actually process each chunk dwarfs the per-step microsecond cost added by scalaz-stream itself.

Let's return to our summation example. How many numbers can we sum, unboxed, in a microsecond? Earlier, I noted we can process about 421 million unboxed doubles per second, or 421 unboxed doubles per microsecond. So we need to make our chunk size significantly bigger than 421 (at a chunk size of 421, scalaz-stream would at least be doubling the running time). Let's go with 20k, and we'll bump N up to 100k for this run:

```
Process.ranges(0, N, 20000).
map { case (i,j) => Slice(nums, i, j) }.
map(sumChunk).
pipe(process1.sum[Double]).
run.run
```

`Process.ranges(0, N, 20000)`

generates `(0,20000), (20000,40000), ...`

up to `N`

. Notice, unfortunately, that after applying `sumChunk`

, we have no choice but to resort to boxed summation, using `process1.sum[Double]`

, which uses the `Numeric[Double]`

instance. I believe this accounts for the discrepancy in performance between the two:

```
N: 100000
unboxed-in-memory: 1.9091796875E-4
chunked-streaming: 5.5682373046875E-4
```

Making the chunk size smaller means more numbers must be added in boxed form, and the performance slows down further. What this suggests to me is that it could be beneficial for performance in scalaz-stream if *downstream* processes could request multiple values at once. Since scalaz-stream is parameterized on the request protocol, this is very doable without requiring major surgery to the library. It would mean creating more complicated versions of `pipe`

, `tee`

, and `wye`

, but there could be significant performance wins especially if we can play the same sort of tricks with runtime specialization to keep data unboxed even as it moves through multiple stages of a pipeline. This is an exciting thought--the ability to write high performance streaming numeric code, in a compositional style.

Gist for the code discussed in this post

## No comments:

Post a Comment