Parallel External Sorting
This is first exercise in a series of exercises planned to explore distributed computation on a Rasp Pi cluster. Since this exercise is more about getting the ball rolling, it explores parallel computation on a desktop and a Rasp Pi.
Sort N integers that do not all fit in the memory of a single node with multiple compute units (processors/cores).
From the description, the solution is to use external sorting:
- Split the the collection into k chunks (of p integers) that each fit into memory of a single node, i.e., N=k*p.
- Sort each chunk.
- Merge the sorted chunks into sorted collection.
Assuming the three steps happen in sequence, the complexity of this solution is N + p*log(p) * k + N * f(k) where
- N units to scan and split the collection into chunks (step 1),
- k * p * log(p) units to sort k chunk of p integers each (step 2), and
- N * f(k) units to merge sorted chunks into the sorted collection (step 3).
While merging in step 3, the least integers in the remaining part of the remaining chunks will be compared to determine the next least integer in the entire collection; this costs f(k). This task can be accomplished by using a priority queue where the least integer has the highest priority. Specifically, the queue is initialized with the least integers from each chunk. The least integer (highest priority element) is removed from the queue and added to the sorted (output) collection. To facilitate the next comparison, the next least integer from chunk c to which the removed integer belonged to is added to the queue and the process is repeated. Consequently, f(k) = log(k) and the cost of step 3 will be N * log(k).
Since the nodes have multiple compute units, step 2 can be parallelized. If a node has l compute units, then the cost of step 2 reduces to k * p/l * log(p/l). Also, the l chunks sorted in parallel can be merged and written into a single file as opposed to being written into l files. With this change, the complexity of step 2 changes to k * (p/l * log(p/l) + p * log(l)) and the complexity of step 3 changes to N * log(k/l) as f(k) = log(k/l).
I implemented the above solution in Scala and tested it using ScalaCheck. While this was my first outing with Scala, it was rather easy. Scala library docs were pretty good. IntelliJ’s Scala plugin was helpful. And, I guess experience with functional programming, F#, and property-based testing helped :)
To evaluate the implementation, I created 4 sets of numbers containing 5M, 10M, 50M, and 100M numbers in the range -1e18 to 1e18. These sets took up 93MB, 185MB, 925MB, and 1.9GB of space on the disk, respectively.
The target machines were
- Rasp Pi 3B with 1.2GHz 4-core Broadcom BCM2837 CPU, 1GB of RAM, and Class 10 SD card
- Desktop with 2.8GHz 8-core Intel i7 CPU, 16GB of RAM, and SSD drive
Given the hardware configuration of the machines, to force the implementation to process the integers in chunks, the max heap size of the JVM was constrained to 200M, 400M, and 800M.
To observe how the implementation scales with the number of compute units, the number of sorters was constrained to 1, 2, 4, and 8 (only on the desktop) along with respective size factors of 104, 64, 48, and 40 that determine the number of integers per sorter (= heap size / # of sorters / size factors).
The script to execute the implementation under all of the above configuration is available here.
In terms of compute, a single Rasp Pi is no match to a Desktop
Following are the (wall clock) run times on the desktop and Rasp Pi for the various combinations of different number of integers, number of sorters, and max heap size.
Since the shapes of both graphs are very similar, the lengths of the y axes immediately imply parallelized external sorting on Rasp Pi 3B is at least 20x slower than on a present day desktop. This is due to at least two factors:
- The processor in the Rasp Pi is more than 2x slower than the processor in the Desktop — 1.2GHz vs 2.8GHz.
- The storage in the Rasp Pi is more than 10x slower than the storage in the Desktop — 20 MBps vs 400 MBps.
So, it is clear that a single Rasp Pi cannot beat a present day desktop.
That said, in the above graphs, similar performance improvement trends can be observed on both the desktop and the Rasp Pi as the max heap size increases and the number of sorters increases. While the performance on a desktop can be improved by using larger heap sizes, this is not possible on a Rasp Pi 3B as the max memory on a Rasp Pi 3B is limited to 1GB. However, a cluster of Rasp Pis can be used to increase the number of sorters and, possibly, improve the performance of sorting (provided network throughput is not a bottleneck). Something to explore.
In Scala, Iterators and Streams have one stark difference
Coming to Scala after dabbling with streams in Java, I initially used Scala streams instead of iterators. However, this implementation kept running out of memory pretty fast. Upon some digging around, I figured that, while Scala streams are lazy like Java streams, Scala streams memoize previously computed values. Hence, the values computed by operations on streams (e.g., map) were unnecessarily hogging memory even when they were one-use values in this implementation. So, I switched over to iterators and memory usage was much better — more integers could be loaded into memory.
All that are named alike may not behave alike.
The number of longs that can be loaded into memory at a time is way lower than what should be possible
With 200MB heap size, I estimated around 10 million longs would fit into memory assuming each long value required 20 bytes of storage (generously including the memory for metadata). However, this was not the case. Based on the number of long values assigned to the only sorter in the single sorter case, each long value seemed to take up 104 bytes. To confirm this observation, I ran the following Scala script with max heap size of 200M and waited for it run out of memory.
The script OOMed after 4.19M longs were added to the array. This suggests a long value takes up ~50 bytes.
So, it seems my implementation of parallel external sorting can be further optimized to process 2x larger chunks.
Beyond that, I am still wondering about
Why do long values take up ~50 bytes in Scala/JVM when a long value inherently stores 8 bytes of information?
More numbers can be loaded into memory when more sorters are used.
Observe that the number of integers per sorter was calculated by dividing the max heap size by the number of sorters and the size factors. Ideally, the size factor would be an approximation of the number of bytes required to store an integer and it should not increase as the number of sorters increase (while the max heap size remains constant). However, this was not the case.
At max heap size of 200MB, ~2M integers were processed by the sorter when only one sorter was used, ~1.6M integers were processed by the sorter when two sorters were used, and ~1M integers were processed by the sorter when four sorters were used. Similar phenomenon occurred at 400MB and 800MB max heap sizes.
If you are thinking “wait, what??”, then so am I :) Without further exploration, I conjecture this is due to some combination of GC, my sorting implementation not making the optimal use of memory (see previous observation), and parallelism. That said, I am curious about other explanations for this phenomenon. Please leave a comment if you have one :)
So, what next?
Clearly, one Rasp Pi is no match to a present day desktop. No surprises. But, can a cluster of Rasp Pis match or better a present day desktop?
The implementation used in this experiment can be found on GitHub.