Three Approaches to Multithreaded Job Submission at OSC

There are a few approaches for maximizing your throughput at OSC if you happen to have a multithreaded workflow.  In this demonstration, I assume you have a script with Python using some form of multithreading based on a shared memory model, like OpenMP or using the pprocess package in Python.  If you do not have a code, there is code for you to test the strategies below.

To present these job submission approaches, I provide an example using Python with LAPACK (driven by Intel MKL - this is automatically done for you if you load Python on OSC systems).  This approach was tested with Ruby, but should easily adapt to Oakley and Owens clusters.  Approach 1 is based on Bash, approach 2 is based on using the Moab/Torque scheduler, and approach 3 uses a combination of mpiexec (hydra) and parallel-command-processor but does not depend on the scheduler.  All approaches use the same concept of Bash arrays, so you are not bound to simple sequences, you can have arbitrary lists or even read a file with all your fancy variations for parameter studies.

First, here is the Python example to solve A*x = b

  1. A random matrix is generated for square matrix A, and also for solution vector b
  2. Solve for x

The code is presented below in 100 lines of example_lapack.py which you can toggle below.

example_lapack.py

"""
global imports for example_lapack.py
Copyright 2016 Ohio Supercomputer Center

Attributions:
This example was inspired by
    "Numerical Computing with Modern Fortran" (SIAM, 2013) by Hanson & Hopkins

"""
from __future__ import print_function
import argparse
import sys
import time
import logging
import numpy as np

try:
    import mkl
except ImportError:
    raise ImportError('This version of Python does not have "mkl", load with ' +
                      '"module load python/2.7.latest"')

try:
    from scipy.linalg.lapack import dgetrf, dgetrs
    from scipy.linalg.blas import dnrm2, dgemv
except ImportError:
    raise ImportError('This version of Python does not have access to a' +
                      'lower-level lapack/blas routine.')

def main():
    """
    This example computes the following;
        1. Random number generation to fill a matrix 'a' of dimension nxn and
        also for a matrix 'y' of dimension n
        2. Pre-solve a*y = b so that we
        have 'b'.  This uses dgemv.
        3. Perform LU factorization (dgetrf) on dense matrix 'a' and store
        matrix and pivots in 'lu' and 'piv'
        4. Solve for x given 'lu' and 'piv' arrays (dgetrs)
        5. Compute L2 norm of the difference between solution and known vectors
        divided by L2 normed to the known y.  This is to provide a single point
        measure of the relative error.

    Inputs: dimension of n

    Error checks: NONE currently
    """

    # log and send it to stderr.
    logging.basicConfig(level=logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument("dimension", type=int, default=5, nargs='?',
                        help="The dimension of square matrix A")
    parser.add_argument("threads", type=int, default=20, nargs='?',
                        help="The number of threads")
    # grab the options here from the command line
    args = parser.parse_args()
    n = args.dimension
    mkl.set_num_threads(args.threads)

    # begin timing random number matrix generation
    time_1 = time.time()

    logging.debug('Dimension of square n by n matrix is:' + str(n) + '\n')
    a = np.random.rand(n, n)
    y = np.random.rand(n)

    logging.debug('a:' + np.array_str(a) + '\n')
    logging.debug('y:' + np.array_str(y) + '\n')

    # begin timing LAPACK
    time_2 = time.time()

    try:
        b = dgemv(1, a, y)
    except AttributeError:
        # catch when python scipy blas does not have dgemv
        print('This version of Python does not have access to lower-level dgemv' +
              'routine.')
        sys.exit(1)

    logging.debug('b:' + np.array_str(b) + '\n')
    lu, piv, _ = dgetrf(a)  # lu factorization
    x, _ = dgetrs(lu, piv, b)  # solve for x

    logging.debug('x:' + np.array_str(x) + '\n')
    relerr = dnrm2(x-y) / dnrm2(y)

    # end timing LAPACK
    time_3 = time.time()

    print("Solved a matrix of size:", n, "using", mkl.get_max_threads(), "threads.")
    print('Relative Error:', relerr)
    print("--- Random Matrix Generation Time: %s seconds ---" % (time_2 - time_1))
    print("---                 Solution Time: %s seconds ---" % (time_3 - time_2))

# main script begin
if __name__ == "__main__":
    main()

Now, we test it:

python example_lapack.py --help
python example_lapack.py # solve using 5x5 matrix and max threads on node

If everything is good, you should see some output like:

Solved a matrix of size: 5 using 16 threads.
Relative Error: 9.34065631647e-17
--- Random Matrix Generation Time: 0.0035982131958 seconds ---
---                 Solution Time: 0.00995397567749 seconds ---

Now, that this example is functioning, we can grow this into a case study with varying dimension - 1K, 2K, all the way up to 10K.  Let's consider three approaches:

Approach 1: Using a Serial Loop

This one is easy, examine the script by toggling bash_loop.sh.  But it's clear that the weakness is that we live on a single node, and do not task 'parallelize' further.

Submit the job with qsub bash_loop.sh

The Python script should reside in the same present working directory.  You will see an output file of the form bash_loop.sh.o${PBS_JOBID} where $PBS_JOBID is a positive integer based on what the scheduler assigned.

bash_loop.sh

#!/bin/bash
#PBS -l walltime=10:00
#PBS -l nodes=1:ppn=16
# Append stderr to stdout
#PBS -j oe
#PBS -q debug

cd $PBS_O_WORKDIR
module load python/2.7.latest

array=$(seq 1000 1000 10000)
for case in $array; do
    echo
    echo python example_lapack.py $case
    python example_lapack.py $case
done

 

Approach 2: Using Job Task Arrays

This approach takes more effort, but the benefit is to use the scheduler to submit more jobs and leverage the array - the benefit is if you are okay with asynchronous results, whatever case it can fit it will submit  Note: there are advanced directives that can hold jobs based on others completing, but we will withhold discussing that option.  Approach 1 may be more suitable if you want clean serialized outputs.

Submit the job with qsub -t 0-9 bash_loop.sh.  We start at 0 because array indices in Bash start at 0.

The Python script should reside in the same present working directory.  You will see an output file of the form task_array.sh.o${PBS_JOBID}-${PBS_ARRAYID} where $PBS_JOBID is a positive integer based on what the scheduler assigned, and $PBS_ARRAYID is based on the task array values you set with the -t option. 

I dislike how this output is formed in this example, so we can just write another script to provide comparable output as seen in Approach 1 as a post-processing step. If you want to post-process the task array outputs into one result task_array.sh.o${PBS_JOBID} then you can run bash task_array_combine_results.sh.  Because this involves deleting some output files, you should test with for dry-run to see what the operation would do.

task_array.sh

#!/bin/bash
#PBS -l walltime=10:00
#PBS -l nodes=1:ppn=16
# Append stderr to stdout
#PBS -j oe

module load python/2.7.latest

cd $PBS_O_WORKDIR
#ALTERNATIVELY: array=(1 3 4 50000)
array=($(seq 1000 1000 10000))

if [ -z ${PBS_ARRAYID+x} ] ; then # read http://stackoverflow.com/a/13864829/992834
    echo PBS_ARRAYID is not set
    echo submit this script using "qsub -t 1-10 $0"
else
    echo PBS_JOBID: $PBS_JOBID
    echo python example_lapack.py ${array[$PBS_ARRAYID]}
    python example_lapack.py ${array[$PBS_ARRAYID]}
fi

task_array_combine_results.sh

#!/bin/bash

jobscript="task_array.sh"
echo "Name of job script" $jobscript

jobids=$(ls ${jobscript}.o* -1 | grep -o -P '(?<=.o).*(?=-)' | sort | uniq)

function enter_choice {
    read -p "Combine (y/n/d) d=dry-run? " choice
    case "$choice" in
      y|Y ) echo "yes";;
      n|N ) echo "no";;
      d|D ) echo "dry";;
      * ) echo "invalid";;
    esac
}

for jobid in $jobids; do

    echo
    echo "Files to combine:"
    # sort by the task array ID after the '-'
    files=$(ls ${jobscript}.o${jobid}-* | sort -t'-' -k2 -n)
    echo $files

    choice=$(enter_choice)
    combined=${jobscript}.o${jobid}
    if [[ $choice == "yes" ]] ; then
        echo cat $files ">" $combined
        echo rm $files
        # almost the same as 'cat' above but we put an extra row in between
        for f in $files; do (cat "${f}" | sed '/^Resources requested:$/q' | \
            head -n -2; echo) >> $combined; done
        rm $files
    elif [[ $choice == "dry" ]] ; then
        echo cat $files ">" ${jobscript}.o$jobid
        echo rm $files
    fi

done
 

Approach 3: Using parallel-command-processor and mpiexec

This approach may appear to be difficult, but just take another deep dive and see how you can leverage mpiexec using parallel command processor. The idea is that each core runs an independent task on a per line basis using either a Bash Here Document or in our case, using a pipe with Bash.  In order to isolate a task to each node, one can check for the same remainder to ensure we are on a unique compute node.  For example, on Ruby debug nodes, we have nodes with either 16 or 20 cores, and if we request two (that is the maximum nodes you can request on the debug queue), we are designing the process to solve 2 cases at a time on 2 nodes simultaneously.  So in our example, we solve 5 batches in sets of 2 cases, where parallel-command-processor will take care of that logic. You can change the nodes to 10 to maximize parallelization, but be sure remove the line (#PBS -q debug) for requesting the debug queue, as your job will be rejected for having too many nodes requested in the debug queue.

Furthermore, on OSC systems, you optimize serial throughput by creating a a true parallel workload via mpiexec.  You can read more about parallel command processor on OSC systems by typing man parallel-command-processor.

Submit the job with qsub pcp_bash_loop.sh.  

The Python script should reside in the same present working directory.  You will see an output file of the form pcp_bash_loop.sh.o${PBS_JOBID} where $PBS_JOBID is a positive integer based on what the scheduler assigned.

pcp_bash_loop.sh

#!/bin/bash
#PBS -l walltime=10:00
#PBS -l nodes=2:ppn=16
#PBS -q debug
# Append stderr to stdout
#PBS -j oe

cd $PBS_O_WORKDIR
source /usr/local/lmod/lmod/init/sh
module load python/2.7.latest

procs=32
ppn=16
max_nodes=$((procs/ppn))
count=0
base=0
array=($(seq 1000 1000 10000))
while [ $base -lt ${#array[@]} ]; do
    for task in $(seq 1 $procs); do
        case=${array[$((task / ppn + base))]}
        command="python example_lapack.py $case"
        echo "if [ $((task%$ppn)) -eq 1 ] ; then $command; fi"
    done | mpiexec parallel-command-processor
    base=$((base+max_nodes))
done