Processing Big Data on the Linux Shell
Recently, I needed to generate and process multi-billion record synthetic datasets for some data structure benchmarking. Rather than write a bunch of custom code to do this processing, I decided to try using shell scripts and standard system utilities. As it turns out, standard Unix utilities are surprisingly useful for larger-than-memory data processing.
Let's consider a simple workload: tagging with external shuffling. I have a large data file that contains a sequence of whitespace-delimited records, one to a line. I want to shuffle the records randomly, and then add a new field to each record containing a sequential number, indicating its position within the file.
For the purposes of this article, we'll use the following script to generate our datasets. The dataset will simply consist of integers counting up from 0.
#!/bin/bash# data_generatorif [[ $# -lt 1 ]]; thenecho "data_generator <count>" > /dev/stderrexit 1ficnt=$1for (( i=0; i<cnt; i++ )); doecho "$i"done
To generate a dataset containing 1000 records using this script, use the following command,
data_generator 1000 > records.dat
We'll start by devising a simple one-liner that will do the job, as long as the data fits within system memory (which is called internal shuffling). Then we will consider how to complete this task when the data won't all fit in memory at once, and some of it must be stored on disk (called external shuffling). Finally, we'll expand our solution to support parallel operation--which is likely to pay off when processing massive datasets.
Internal Shuffling
An earlier version of the section used
cat -n to affix line numbers. This added whitespace at the start of each line, which had to then be stripped out with sed(1)before the data could be used. I've since "seen the light" and switched to using the more appropriate nl(1) utility for numbering lines. For small data, we can shuffle and tag our data easily using standard
GNU utilties. If our dataset fits in system memory, we can use the
shuf(1) utility to permute the order of lines in the file
and the nl(1) utility to number the lines. We'll use the
-nln format argument to nl(1) to ensure that
the numbers don't have any leading whitespace.
shuf records.dat | nl -nln
However, this simple pipeline will quickly run into problems. The
shuf(1) command is not very good for working with lots of
data, as it must fully materialize its input in memory first. If we try
to shuffle too large of a data set, this pipeline will fail when
shuf(1) runs out of memory.
This constraint of needing to fully materialize the input in memory
is, luckily, pretty specific to shuf(1). Many
other common utilities do not require full materialization of their
inputs in memory. Which means that we can resolve this issue by
utilizing these other utilities to emulate the behavior of
shuf(1).
External Shuffling
The sort(1) utility supports external merge sorting out
of the box, meaning that it doesn't run into the same memory limitations
of shuf(1). So we can actually use sort(1) to
shuffle our data, using the decorate-sort-undecorate
idiom.
We can tag each record with a random number (decorate), do a sort of the records using that number (sort), and then drop the number (undecorate). The decorate and undecorate phases can be done record-at-a-time, so no materialization in memory is required, and the sorting can be done externally.
We can decorate the records using awk(1), which supports
floating point random number generation, pipe the result into
sort -n to do a numeric sort on this float field, and then
use cut(1) to keep only the original data, dropping the
random number. awk(1) will allow us to generate floats with
up to 31 digits of precision, which should give us plenty of randomness
for our shuffle.
awk 'BEGIN{srand();} {printf "%.31f\t%s\n", rand(), $1}' records.dat \
| sort -n | cut -f2 - | nl -nln
The resulting one-liner is complicated looking, but it will allow
processing data sets that are too large to fit in system memory, or on
systems that don't use GNU coreutils, and so don't have
shuf(1) (like OpenBSD). But, if we're willing to increase
the complexity a fair bit, we can develop a script which will perform
the same task much faster.
Parallel, External Shuffling
As we are ostensibly working on large sets of data, it will behoove
us to do things in parallel when we can. Multi-processing has a lot of
overhead, but with large enough datasets, this overhead is absolutely
worth it. In addition, sort(1) defaults to using
/tmp for writing temporary files while performing an
external sort. This directory is often mounted in memory using tmpfs or
similar, and so we may still run in to out-of-memory issues when using
the external one-liner above.
Luckily, there's a solution to this too. sort(1) has a
-m flag, which is used for merging sorted files. So, we can
manually implement part of the external merge-sort algorithm in the
shell, which will give better control over resource utilization and
parallelism.
The split(1) command will allow us to split a file into
many small pieces. We can split our decorated datafile into many pieces,
sort each of these pieces individually (in parallel), and then merge the
sorted files together using sort -m.
The file will be split based on a specified number of lines, so let's first calculate how many lines we need to get to a particular number of files (say, 10 for a small scale example),
rcount=10000 # number of records in the data filefcount=10 # number of subfiles to uselines=$(( rcount / fcount ))# Create a temporary directory for storing the subfilestmp_dir="TMP"mkdir "$tmp_dir"tmp_prefix="${tmp_dir}/x"awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat \| split -l "$lines" - "$tmp_prefix"
split(1) accepts a name prefix for its temporary files
as an argument, so our prefix above places the subfiles within
./TMP and will give each file a sequentially generated,
unique name beginning with x.
Now that we have the file decorated and split into subfiles, we want to sort each of these subfiles. We can use a for loop to sort each file,
for f in ${tmp_prefix}*; dosort -n "$f" > "${f}.sorted"done
Of course, this will do each file serially. Which solves the memory
issue with /tmp, but isn't really any faster. We can
exploit parallelism by launching the jobs in the background and adding a
wait command after the loop,
for f in ${tmp_prefix}*; dosort -n "$f" > "${f}.sorted" &donewait
But this then risks overwhelming our computer. Running too many concurrent jobs can result in worse performance than just doing it serially. So let's add a limit to how many concurrent sorts we can have going at once. The general framework that I use for doing this is pretty simple,
concjobs=3 # number of concurrent jobs.i=0 # counter of active jobsfor f in ${tmp_prefix}*; do(( i++ ))sort -n "$f" > "${f}.sorted" &if (( i >= concjobs )); thenwaiti=0fidonewait
This scheme will launch up to $concjobs sort operations
at once, and then wait until those have finished before launching any
more.
Once the subfiles are sorted, we can merge them all together again.
sort(1) will only open a small number of files at once,
even if the -m flag is specified, so if we run it with the
default settings, we may still run into memory issues in
/tmp. Thus, we should use the --batch-size
option to ensure that sort(1) opens all of the files to be
merged, plus a few extra for doing the merge itself. This becomes
increasingly important as fcount grows,
batch=$(( fcount + 10 ))sort --batch-size="$batch" -n -m "$tmp_dir/"*.sorted > merged_data
And then, finally, we strip off the decorator and line number the merged file,
cut -f2 merged_data | nl -nln > result.dat
The Final Script
Bringing this all together, replacing temporary files with pipes where possible, and automatically cleaning up the temporary files both after sorting each subfile, and after sorting everything, we have the following script,
#!/usr/bin/env bashrcount=10000 # number of records in the data filefcount=10 # number of subfiles to uselines=$(( rcount / fcount ))# Create a temporary directory for storing the subfilestmp_dir="./TMP"mkdir "$tmp_dir"# Decorate the file and then split it into subfiles for sortingtmp_prefix="${tmp_dir}/x"awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat \| split -l "$lines" - "$tmp_prefix"# sort the subfiles concurrentlyconcjobs=3 # number of concurrent jobs.i=0 # counter of active jobsfor f in ${tmp_prefix}*; do(( i++ )){ sort -n "$f" > "${f}.sorted" && rm "$f"; } &if (( i >= concjobs )); thenwaiti=0fidonewait# merge the sorted subfiles together, drop the sort field,# and add a line numberbatch=$(( fcount + 10 ))sort --batch-size="$batch" -n -m "$tmp_dir"/"*.sorted" \| cut -f2 - | nl -nln > result.dat# clean up any remaining temp filesrm -r "$tmp_dir"
There are, of course, many variations on this script. For example, this script only performs the sort in parallel, but does the decoration and undecoration serially. It may in fact be worth it to move these steps into the concurrent part of the script instead. With a lot of extra effort, we could even try performing the line numbering step in parallel, though that would require another split/merge pass and almost certainly would not be worth it.