Processing Big Data on the Linux Shell
It’s not the fastest, but it does work
Recently, I needed to generate and process multi-billion record synthetic datasets for some data structure benchmarking. Rather than write a bunch of custom code to do this processing, I decided to try using shell scripts and standard system utilities. As it turns out, standard Unix utilities are surprisingly useful for larger-than-memory data processing.
Let’s consider a simple workload: tagging with external shuffling. I have a large data file that contains a sequence of whitespace-delimited records, one to a line. I want to shuffle the records randomly, and then add a new field to each record containing a sequential number, indicating its position within the file.
For the purposes of this post, we’ll use the following script to generate our datasets. The dataset will simply consist of integers counting up from 0.
#!/bin/bash
# data_generator
if [[ $# -lt 1 ]]; then
echo "data_generator <count>" > /dev/stderr
exit 1
fi
cnt=$1
for (( i=0; i<cnt; i++ )); do
echo "$i"
done
To generate a dataset containing 1000 records using this script, use the following command,
$ data_generator 1000 > records.dat
We’ll start by devising a simple one-liner that will do the job, as long as the data fits within system memory (which is called internal shuffling). Then we will consider how to complete this task when the data won’t all fit in memory at once, and some of it must be stored on disk (called external shuffling). Finally, we’ll expand our solution to support parallel operation–which is likely to pay off when processing massive datasets.
Internal Shuffling
For small data, we can shuffle and tag our data easily using standard
GNU utilties. If our dataset fits in system memory, we can use the
shuf(1)
utility to permute the order of lines in the file, cat(1)
to add line numbers, and then clean up some of the leading white-space
left by cat(1)
using sed(1)
.
$ shuf records.dat | cat -n | sed 's/^ *//g'
However, this simple pipeline will quickly run into problems. The shuf(1)
command is not very good for working with lots of data, as it must fully
materialize its input in memory first. If we try to shuffle too large
of a data set, this pipeline will fail when shuf(1)
runs out of memory.
This constraint of needing to fully materialize the input in memory
is, luckily, pretty specific to shuf(1)
. As we’ll see, the other
common utilities do not require full materialization of their inputs in
memory. Which means that we can resolve this issue by utilizing these
other utilities to emulate the behavior of shuf(1)
.
External Shuffling
The sort(1)
utility supports external merge sorting out of the
box, meaning that it doesn’t run into the same memory limitations of
shuf(1)
. So we can actually use sort(1)
to shuffle our data, using the
decorate-sort-undecorate idiom.
We can tag each record with a random number (decorate), do a sort of the records using that number (sort), and then drop the number (undecorate). The decorate and undecorate phases can be done record-at-a-time, so no materialization in memory is required, and the sorting can be done externally.
We can decorate the records using awk(1)
, which supports floating
point random number generation, pipe the result into sort -n
to do
a numeric sort on this float field, and then use cut(1)
to keep only
the original data, dropping the random number. awk(1)
will allow us to
generate floats with up to 31 digits of precision, which should give us
plenty of randomness for our shuffle.
$ awk 'BEGIN{srand();} {printf "%.31f\t%s\n", rand(), $1}' records.dat | \
sort -n | cut -f2 - | cat -n | sed 's/^ *//g'
The resulting one-liner is complicated looking, but it will allow
processing data sets that are too large to fit in system memory, or on
systems that don’t use GNU coreutils, and so don’t have shuf(1)
(like
OpenBSD). But, if we’re willing to increase the complexity a fair bit,
we can develop a script which will perform the same task much faster.
Parallel, External Shuffling
As we are ostensibly working on large sets of data, it will behoove
us to do things in parallel when we can. Multi-processing has a lot of
overhead, but with large enough datasets, this overhead is absolutely
worth it. In addition, sort(1)
defaults to using /tmp
for writing
temporary files while performing an external sort. This directory is
often mounted in memory using tmpfs or similar, and so we may still run
in to out-of-memory issues when using the external one-liner above.
Luckily, there’s a solution to this too. sort(1)
has a -m
flag, which
is used for merging sorted files. So, we can manually implement part of
the external merge-sort algorithm in the shell, which will give better
control over resource utilization and parallelism.
The split(1)
command will allow us to split a file into many small
pieces. We can split our decorated datafile into many pieces, sort each
of these pieces individually (in parallel), and then merge the sorted
files together using sort -m
.
The file will be split based on a specified number of lines, so let’s first calculate how many lines we need to get to a particular number of files (say, 10 for a small scale example),
rcount=10000 # number of records in the data file
fcount=10 # number of subfiles to use
lines=$(( rcount / fcount ))
# Create a temporary directory for storing the subfiles
tmp_dir="TMP"
mkdir "$tmp_dir"
tmp_prefix="${tmp_dir}/x"
awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat | \
split -l "$lines" - "$tmp_prefix"
split(1)
accepts a name prefix for its temporary files as an argument,
so our prefix above places the subfiles within ./TMP
and will give
each file a sequentially generated, unique name beginning with x
.
Now that we have the file decorated and split into subfiles, we want to sort each of these subfiles. We can use a for loop to sort each file,
for f in ${tmp_prefix}*; do
sort -n "$f" > "${f}.sorted"
done
Of course, this will do each file serially. Which solves the memory issue
with /tmp
, but isn’t really any faster. We can exploit parallelism by
launching the jobs in the background and adding a wait
command after
the loop,
for f in ${tmp_prefix}*; do
sort -n "$f" > "${f}.sorted" &
done
wait
But this then risks overwhelming our computer. Running too many concurrent jobs can result in worse performance than just doing it serially. So let’s add a limit to how many concurrent sorts we can have going at once. The general framework that I use for doing this is pretty simple,
concjobs=3 # number of concurrent jobs.
i=0 # counter of active jobs
for f in ${tmp_prefix}*; do
(( i++ ))
sort -n "$f" > "${f}.sorted" &
if (( i >= concjobs )); then
wait
i=0
fi
done
wait
This scheme will launch up to $concjobs
sort operations at once,
and then wait until those have finished before launching any more.
Once the subfiles are sorted, we can merge them all together again.
sort(1)
will only open a small number of files at once, even if the
-m
flag is specified, so if we run it with the default settings,
we may still run into memory issues in /tmp
. Thus, we should use the
--batch-size
option to ensure that sort(1)
opens all of the files to
be merged, plus a few extra for doing the merge itself. This becomes
increasingly important as fcount
grows,
batch=$(( fcount + 10 ))
sort --batch-size="$batch" -n -m "$tmp_dir/"*.sorted > merged_data
And then, finally, we strip off the decorator and line number the merged file,
$ cut -f2 merged_data | cat -n | sed 's/^ *//g' > result.dat
The Final Script
Bringing this all together, replacing temporary files with pipes where possible, and automatically cleaning up the temporary files both after sorting each subfile, and after sorting everything, we have the following script,
#!/usr/bin/env bash
rcount=10000 # number of records in the data file
fcount=10 # number of subfiles to use
lines=$(( rcount / fcount ))
# Create a temporary directory for storing the subfiles
tmp_dir="./TMP"
mkdir "$tmp_dir"
# Decorate the file and then split it into subfiles for sorting
tmp_prefix="${tmp_dir}/x"
awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat \
| split -l "$lines" - "$tmp_prefix"
# sort the subfiles concurrently
concjobs=3 # number of concurrent jobs.
i=0 # counter of active jobs
for f in ${tmp_prefix}*; do
(( i++ ))
{ sort -n "$f" > "${f}.sorted" && rm "$f"; } &
if (( i >= concjobs )); then
wait
i=0
fi
done
wait
# merge the sorted subfiles together, drop the sort field,
# and add a line number
batch=$(( fcount + 10 ))
sort --batch-size="$batch" -n -m "$tmp_dir"/*.sorted \
| cut -f2 - | cat -n | sed 's/^ *//g' > result.dat
# clean up any remaining temp files
rm -r "$tmp_dir"
There are, of course, many variations on this script. For example, this script only performs the sort in parallel, but does the decoration and undecoration serially. It may in fact be worth it to move these steps into the concurrent part of the script instead. With a lot of extra effort, we could even try performing the line numbering step in parallel, though that would require another split/merge pass and almost certainly would not be worth it.
Conclusion
With a bit of effort, we’ve managed to arrive at an parallel, external script for shuffling and tagging data. However, it isn’t clear how much better this far more complex system is compared to the one liners that we started with for internal and external shuffling. In a future post on this topic, I’m going to benchmark these scripts, as well as some of the variations mentioned in the previous paragraph, on a variety of computers to determine just how well this script works, and which variations are the best in terms of performance. So, if you’re interested, stay tuned for that!