Processing Big Data on the Linux Shell


Recently, I needed to generate and process multi-billion record synthetic datasets for some data structure benchmarking. Rather than write a bunch of custom code to do this processing, I decided to try using shell scripts and standard system utilities. As it turns out, standard Unix utilities are surprisingly useful for larger-than-memory data processing.

Let's consider a simple workload: tagging with external shuffling. I have a large data file that contains a sequence of whitespace-delimited records, one to a line. I want to shuffle the records randomly, and then add a new field to each record containing a sequential number, indicating its position within the file.

For the purposes of this article, we'll use the following script to generate our datasets. The dataset will simply consist of integers counting up from 0.

#!/bin/bash
# data_generator
if [[ $# -lt 1 ]]; then
    echo "data_generator <count>" > /dev/stderr
    exit 1
fi
 
cnt=$1
for (( i=0; i<cnt; i++ )); do
    echo "$i"
done

To generate a dataset containing 1000 records using this script, use the following command,

data_generator 1000 > records.dat

We'll start by devising a simple one-liner that will do the job, as long as the data fits within system memory (which is called internal shuffling). Then we will consider how to complete this task when the data won't all fit in memory at once, and some of it must be stored on disk (called external shuffling). Finally, we'll expand our solution to support parallel operation--which is likely to pay off when processing massive datasets.

Internal Shuffling


An earlier version of the section used cat -n to affix line numbers. This added whitespace at the start of each line, which had to then be stripped out with sed(1)before the data could be used. I've since "seen the light" and switched to using the more appropriate nl(1) utility for numbering lines.

For small data, we can shuffle and tag our data easily using standard GNU utilties. If our dataset fits in system memory, we can use the shuf(1) utility to permute the order of lines in the file and the nl(1) utility to number the lines. We'll use the -nln format argument to nl(1) to ensure that the numbers don't have any leading whitespace.

shuf records.dat | nl -nln 

However, this simple pipeline will quickly run into problems. The shuf(1) command is not very good for working with lots of data, as it must fully materialize its input in memory first. If we try to shuffle too large of a data set, this pipeline will fail when shuf(1) runs out of memory.

This constraint of needing to fully materialize the input in memory is, luckily, pretty specific to shuf(1). Many other common utilities do not require full materialization of their inputs in memory. Which means that we can resolve this issue by utilizing these other utilities to emulate the behavior of shuf(1).

External Shuffling

The sort(1) utility supports external merge sorting out of the box, meaning that it doesn't run into the same memory limitations of shuf(1). So we can actually use sort(1) to shuffle our data, using the decorate-sort-undecorate idiom.

We can tag each record with a random number (decorate), do a sort of the records using that number (sort), and then drop the number (undecorate). The decorate and undecorate phases can be done record-at-a-time, so no materialization in memory is required, and the sorting can be done externally.

We can decorate the records using awk(1), which supports floating point random number generation, pipe the result into sort -n to do a numeric sort on this float field, and then use cut(1) to keep only the original data, dropping the random number. awk(1) will allow us to generate floats with up to 31 digits of precision, which should give us plenty of randomness for our shuffle.

awk 'BEGIN{srand();} {printf "%.31f\t%s\n", rand(), $1}' records.dat \ 
      | sort -n | cut -f2 - | nl -nln

The resulting one-liner is complicated looking, but it will allow processing data sets that are too large to fit in system memory, or on systems that don't use GNU coreutils, and so don't have shuf(1) (like OpenBSD). But, if we're willing to increase the complexity a fair bit, we can develop a script which will perform the same task much faster.

Parallel, External Shuffling

As we are ostensibly working on large sets of data, it will behoove us to do things in parallel when we can. Multi-processing has a lot of overhead, but with large enough datasets, this overhead is absolutely worth it. In addition, sort(1) defaults to using /tmp for writing temporary files while performing an external sort. This directory is often mounted in memory using tmpfs or similar, and so we may still run in to out-of-memory issues when using the external one-liner above.

Luckily, there's a solution to this too. sort(1) has a -m flag, which is used for merging sorted files. So, we can manually implement part of the external merge-sort algorithm in the shell, which will give better control over resource utilization and parallelism.

The split(1) command will allow us to split a file into many small pieces. We can split our decorated datafile into many pieces, sort each of these pieces individually (in parallel), and then merge the sorted files together using sort -m.

The file will be split based on a specified number of lines, so let's first calculate how many lines we need to get to a particular number of files (say, 10 for a small scale example),

rcount=10000 # number of records in the data file
fcount=10 # number of subfiles to use
lines=$(( rcount / fcount ))
 
# Create a temporary directory for storing the subfiles 
tmp_dir="TMP"
mkdir "$tmp_dir"
 
tmp_prefix="${tmp_dir}/x"
awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat \
  | split -l "$lines" - "$tmp_prefix"

split(1) accepts a name prefix for its temporary files as an argument, so our prefix above places the subfiles within ./TMP and will give each file a sequentially generated, unique name beginning with x.

Now that we have the file decorated and split into subfiles, we want to sort each of these subfiles. We can use a for loop to sort each file,

for f in ${tmp_prefix}*; do
    sort -n "$f" > "${f}.sorted"
done

Of course, this will do each file serially. Which solves the memory issue with /tmp, but isn't really any faster. We can exploit parallelism by launching the jobs in the background and adding a wait command after the loop,

for f in ${tmp_prefix}*; do
    sort -n "$f" > "${f}.sorted" &
done
 
wait

But this then risks overwhelming our computer. Running too many concurrent jobs can result in worse performance than just doing it serially. So let's add a limit to how many concurrent sorts we can have going at once. The general framework that I use for doing this is pretty simple,

concjobs=3 # number of concurrent jobs.
i=0 # counter of active jobs
for f in ${tmp_prefix}*; do
    (( i++ ))
    sort -n "$f" > "${f}.sorted" &
 
    if (( i >= concjobs )); then
        wait
        i=0
    fi
done
 
wait

This scheme will launch up to $concjobs sort operations at once, and then wait until those have finished before launching any more.

Once the subfiles are sorted, we can merge them all together again. sort(1) will only open a small number of files at once, even if the -m flag is specified, so if we run it with the default settings, we may still run into memory issues in /tmp. Thus, we should use the --batch-size option to ensure that sort(1) opens all of the files to be merged, plus a few extra for doing the merge itself. This becomes increasingly important as fcount grows,

batch=$(( fcount + 10 ))
sort --batch-size="$batch" -n -m "$tmp_dir/"*.sorted > merged_data

And then, finally, we strip off the decorator and line number the merged file,

cut -f2 merged_data | nl -nln > result.dat

The Final Script

Bringing this all together, replacing temporary files with pipes where possible, and automatically cleaning up the temporary files both after sorting each subfile, and after sorting everything, we have the following script,

#!/usr/bin/env bash
rcount=10000 # number of records in the data file
fcount=10 # number of subfiles to use
lines=$(( rcount / fcount ))
 
# Create a temporary directory for storing the subfiles 
tmp_dir="./TMP"
mkdir "$tmp_dir"
 
# Decorate the file and then split it into subfiles for sorting
tmp_prefix="${tmp_dir}/x"
awk 'BEGIN{srand();} {printf "%0.31f\t%s\n", rand(), $1;}' records.dat \
  | split -l "$lines" - "$tmp_prefix"
 
# sort the subfiles concurrently
concjobs=3 # number of concurrent jobs.
i=0 # counter of active jobs
for f in ${tmp_prefix}*; do
    (( i++ ))
    { sort -n "$f" > "${f}.sorted" && rm "$f"; } &
 
    if (( i >= concjobs )); then
        wait
        i=0
    fi
done
 
wait
 
# merge the sorted subfiles together, drop the sort field,
# and add a line number
batch=$(( fcount + 10 ))
sort --batch-size="$batch" -n -m "$tmp_dir"/"*.sorted" \
  | cut -f2 - | nl -nln > result.dat
 
# clean up any remaining temp files
rm -r "$tmp_dir"

There are, of course, many variations on this script. For example, this script only performs the sort in parallel, but does the decoration and undecoration serially. It may in fact be worth it to move these steps into the concurrent part of the script instead. With a lot of extra effort, we could even try performing the line numbering step in parallel, though that would require another split/merge pass and almost certainly would not be worth it.