Processing Big Data on the Linux Shell

It’s not the fastest, but it does work

Recently, I needed to generate and process multi-billion record synthetic datasets for some data structure benchmarking. Rather than write a bunch of custom code to do this processing, I decided to try using shell scripts and standard system utilities. As it turns out, standard Unix utilities are surprisingly useful for larger-than-memory data processing.

Let’s consider a simple workload: tagging with external shuffling. I have a large data file that contains a sequence of whitespace-delimited records, one to a line. I want to shuffle the records randomly, and then add a new field to each record containing a sequential number, indicating its position within the file.

For the purposes of this post, we’ll use the following script to generate our datasets. The dataset will simply consist of integers counting up from 0.

#!/bin/bash
# data_generator
if [[ $# -lt 1 ]]; then
    echo "data_generator <count>" > /dev/stderr
    exit 1
fi

cnt=$1
for (( i=0; i<cnt; i++ )); do
    echo "$i"
done

To generate a dataset containing 1000 records using this script, use the following command,

$ data_generator 1000 > records.dat

We’ll start by devising a simple one-liner that will do the job, as long as the data fits within system memory (which is called internal shuffling). Then we will consider how to complete this task when the data won’t all fit in memory at once, and some of it must be stored on disk (called external shuffling). Finally, we’ll expand our solution to support parallel operation–which is likely to pay off when processing massive datasets.

Internal Shuffling

For small data, we can shuffle and tag our data easily using standard GNU utilties. If our dataset fits in system memory, we can use the shuf(1) utility to permute the order of lines in the file, cat(1) to add line numbers, and then clean up some of the leading white-space left by cat(1) using sed(1).

$ shuf records.dat | cat -n | sed 's/^ *//g'

However, this simple pipeline will quickly run into problems. The shuf(1) command is not very good for working with lots of data, as it must fully materialize its input in memory first. If we try to shuffle too large of a data set, this pipeline will fail when shuf(1) runs out of memory.

This constraint of needing to fully materialize the input in memory is, luckily, pretty specific to shuf(1). As we’ll see, the other common utilities do not require full materialization of their inputs in memory. Which means that we can resolve this issue by utilizing these other utilities to emulate the behavior of shuf(1).

External Shuffling

The sort(1) utility supports external merge sorting out of the box, meaning that it doesn’t run into the same memory limitations of shuf(1). So we can actually use sort(1) to shuffle our data, using the decorate-sort-undecorate idiom.

We can tag each record with a random number (decorate), do a sort of the records using that number (sort), and then drop the number (undecorate). The decorate and undecorate phases can be done record-at-a-time, so no materialization in memory is required, and the sorting can be done externally.

We can decorate the records using awk(1), which supports floating point random number generation, pipe the result into sort -n to do a numeric sort on this float field, and then use cut(1) to keep only the original data, dropping the random number. awk(1) will allow us to generate floats with up to 31 digits of precision, which should give us plenty of randomness for our shuffle.

$ awk 'BEGIN{srand();} {printf "%.31f\t%s\n", rand(), $1}' records.dat | \
  sort -n | cut -f2 - | cat -n | sed 's/^ *//g'

The resulting one-liner is complicated looking, but it will allow processing data sets that are too large to fit in system memory, or on systems that don’t use GNU coreutils, and so don’t have shuf(1) (like OpenBSD). But, if we’re willing to increase the complexity a fair bit, we can develop a script which will perform the same task much faster.

Parallel, External Shuffling

As we are ostensibly working on large sets of data, it will behoove us to do things in parallel when we can. Multi-processing has a lot of overhead, but with large enough datasets, this overhead is absolutely worth it. In addition, sort(1) defaults to using /tmp for writing temporary files while performing an external sort. This directory is often mounted in memory using tmpfs or similar, and so we may still run in to out-of-memory issues when using the external one-liner above.

Luckily, there’s a solution to this too. sort(1) has a -m flag, which is used for merging sorted files. So, we can manually implement part of the external merge-sort algorithm in the shell, which will give better control over resource utilization and parallelism.

The split(1) command will allow us to split a file into many small pieces. We can split our decorated datafile into many pieces, sort each of these pieces individually (in parallel), and then merge the sorted files together using sort -m.

The file will be split based on a specified number of lines, so let’s first calculate how many lines we need to get to a particular number of files (say, 10 for a small scale example),

rcount=10000  # number of records in the data file
fcount=10     # number of subfiles to use
lines=$(( rcount / fcount ))

# Create a temporary directory for storing the subfiles 
tmp_dir="TMP"
mkdir "$tmp_dir"

tmp_prefix="${tmp_dir}/x"
awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat | \
  split -l "$lines" - "$tmp_prefix"

split(1) accepts a name prefix for its temporary files as an argument, so our prefix above places the subfiles within ./TMP and will give each file a sequentially generated, unique name beginning with x.

Now that we have the file decorated and split into subfiles, we want to sort each of these subfiles. We can use a for loop to sort each file,

for f in ${tmp_prefix}*; do
    sort -n "$f" > "${f}.sorted"
done

Of course, this will do each file serially. Which solves the memory issue with /tmp, but isn’t really any faster. We can exploit parallelism by launching the jobs in the background and adding a wait command after the loop,

for f in ${tmp_prefix}*; do
    sort -n "$f" > "${f}.sorted" &
done

wait

But this then risks overwhelming our computer. Running too many concurrent jobs can result in worse performance than just doing it serially. So let’s add a limit to how many concurrent sorts we can have going at once. The general framework that I use for doing this is pretty simple,

concjobs=3 # number of concurrent jobs.
i=0 # counter of active jobs
for f in ${tmp_prefix}*; do
    (( i++ ))
    sort -n "$f" > "${f}.sorted" &

    if (( i >= concjobs )); then
        wait
        i=0
    fi
done

wait

This scheme will launch up to $concjobs sort operations at once, and then wait until those have finished before launching any more.

Once the subfiles are sorted, we can merge them all together again. sort(1) will only open a small number of files at once, even if the -m flag is specified, so if we run it with the default settings, we may still run into memory issues in /tmp. Thus, we should use the --batch-size option to ensure that sort(1) opens all of the files to be merged, plus a few extra for doing the merge itself. This becomes increasingly important as fcount grows,

batch=$(( fcount + 10 ))
sort --batch-size="$batch" -n -m "$tmp_dir/"*.sorted > merged_data

And then, finally, we strip off the decorator and line number the merged file,

$ cut -f2 merged_data | cat -n | sed 's/^ *//g' > result.dat

The Final Script

Bringing this all together, replacing temporary files with pipes where possible, and automatically cleaning up the temporary files both after sorting each subfile, and after sorting everything, we have the following script,

#!/usr/bin/env bash
rcount=10000  # number of records in the data file
fcount=10     # number of subfiles to use
lines=$(( rcount / fcount ))

# Create a temporary directory for storing the subfiles 
tmp_dir="./TMP"
mkdir "$tmp_dir"

# Decorate the file and then split it into subfiles for sorting
tmp_prefix="${tmp_dir}/x"
awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat \
  | split -l "$lines" - "$tmp_prefix"

# sort the subfiles concurrently
concjobs=3 # number of concurrent jobs.
i=0        # counter of active jobs
for f in ${tmp_prefix}*; do
    (( i++ ))
    { sort -n "$f" > "${f}.sorted" && rm "$f"; } &

    if (( i >= concjobs )); then
        wait
        i=0
    fi
done

wait

# merge the sorted subfiles together, drop the sort field,
# and add a line number
batch=$(( fcount + 10 ))
sort --batch-size="$batch" -n -m "$tmp_dir"/*.sorted \
  | cut -f2 - | cat -n | sed 's/^ *//g' > result.dat

# clean up any remaining temp files
rm -r "$tmp_dir"

There are, of course, many variations on this script. For example, this script only performs the sort in parallel, but does the decoration and undecoration serially. It may in fact be worth it to move these steps into the concurrent part of the script instead. With a lot of extra effort, we could even try performing the line numbering step in parallel, though that would require another split/merge pass and almost certainly would not be worth it.

Conclusion

With a bit of effort, we’ve managed to arrive at an parallel, external script for shuffling and tagging data. However, it isn’t clear how much better this far more complex system is compared to the one liners that we started with for internal and external shuffling. In a future post on this topic, I’m going to benchmark these scripts, as well as some of the variations mentioned in the previous paragraph, on a variety of computers to determine just how well this script works, and which variations are the best in terms of performance. So, if you’re interested, stay tuned for that!