Why message queue size matters when programming with Actors?
Recently, I had to wrangle data from a 50GB file containing close to 34 million lines. This wrangling involved selecting lines based on some pattern and then pulling data from the selected lines.
After some digging around, I decided to use Actor model support in GPars. The basic structure of the solution was as follows.
- Create a collating actor that collates data from processing actors.
- Create N processing actors that will process a given line — search for the pattern, pull the data from the line, and send the data to the collating actor.
- Read each line from the file and asynchronously send it to one of the processing actors. (File reading thread)
Skimming thru GPars documentation, I got to a working script in about an hour. It ran pretty well on a small slice (~3 million lines) of the original file.
So, I ran the script on the original file. Interestingly (or as with most software effort), the execution consumed all of 8GB of heap space and ran for a while before completion. While it did the job, I was intrigued with the amount of consumed memory. The above solution was processing the lines without storing them in memory and the data pulled from the lines amounted to a few thousand words that were stored in a set container (no duplicates). So, as a good software developer, I went down the rabbit hole :)
After spending a few hours digging into GPars documentation, profiling the script with VisualVM, whipping up a parallel solution in Python (which helped me understand an important aspect of Futures in Python), and making noise about a “bug” in GPars, I figured out the issue (err, my bad).
I had not considered the size of the communication channel (queue) between the consumers (processing actors) and the producer (file reading thread).
In GPars, there is no limit on the size of the message queues of Actors. Further, the file reading thread was adding lines to the queues of processing actors faster than the processing actors could consume the lines from the queues.
So, the queues were acting likes buffers and the program was storing the lines in memory!! Arghhh!!!
The fix was to limit the size of queues. However, there was no support in GPars to limit the size of such queues. Instead, I tweaked the file reading thread: for every N bytes read from the file, the next line is sent synchronously to the processing actor. This fix kinda limits the size of all queues to N assuming all processing actors work equally fast (which is mostly true as each line is comparable and the processing is identical). And, in reality, the solution with this fix is working :)
Looking back, every programming model has some “tricky” aspect. If synchronization via locks and semaphores is such an aspect of thread-based programming, then getting the queue size right is such an aspect of actor-based programming. That said, programming with Actors is a way easier than programming with Threads. Everyone should try it :)
For them curious cats, the final solution is available as a gist.