Biowulf High Performance Computing at the NIH
nextflow on Biowulf

Nextflow is a domain specific language modelled after UNIX pipes. It simplifies writing parallel and scalable pipelines. The version installed on our systems can run jobs locally (on the same machine) and by submitting to Slurm.

The code that is executed at each pipeline stage can be written in a number of different languages (shell, python, R, ...).

Intermediate results for workflows are stored in the $PWD/work directory which allows resuming execution of pipelines.

The language used to write pipeline scripts is an extension of groovy.

Nextflow is a complex workflow management tool. Please read the manual carefully and make sure to place appropriate limits on your pipeline to avoid submitting too many jobs or running too many local processes.

Nextflow, when running many tasks appears to create many temp files in the ./work directory. Please make sure that your pipeline does not inadvertantly create millions of small files which would result in a degradation of file system performance.

Documentation
Important Notes

Interactive job
Interactive jobs should be used for debugging, graphics, or applications that cannot be run as batch jobs.

First, let's do some basic local execution. For this we will allocate an interactive session:

biowulf$ sinteractive
[...snip...]
node$ module load nextflow

For the traditional hello world example we will parallelize the uppercasing of different language greetings:

# create file of greetings
node cat > greetings.txt <<EOF
Hello world!
Hallo world!
Ciao world!
Salut world!
Bongiorno world!
Servus world!
Gruess Gott world!
Na was los world!
Gruetzi world!
Hello world!
Come va world!
Ca va world!
Hi world!
Good bye world!

We then create a file called hello.nf that describes the workflow to be executed

// vim: set ft=groovy:

params.file = file('greetings.txt').toAbsolutePath()

process splitLetters {

    output:
    file 'chunk_*' into letters mode flatten

    """
    pwd
    split -l1 '${params.file}' chunk_
    """
}


process convertToUpper {

    input:
    file x from letters

    output:
    stdout result

    """
    cat $x | tr '[a-z]' '[A-Z]'
    """
}

result.subscribe {
    println it.trim()
}

The workflow is executed with

node$ nextflow run hello.nf
N E X T F L O W  ~  version 0.25.5
Launching `hello.nf` [pedantic_stallman] - revision: f195027c60
[warm up] executor > local
[6c/63582f] Submitted process > splitLetters
[fc/c4fe5b] Submitted process > convertToUpper (2)
[...snip...]
SALUT WORLD!
CIAO WORLD!
HELLO WORLD!
COME VA WORLD!
HI WORLD!
SERVUS WORLD!
[...snip...]

Note that results are out of order.

The same workflow can be used to run each of the processes as a slurm job by creating a nextflow.config file.

node$ cat > nextflow.config <<EOF
process {
  executor='slurm'
  queue = 'quick'
  time = '10m'
  memory = '250MB'
  clusterOptions = '--job-name=nxf_test'
}
EOF 
node$ nextflow run hello.nf
N E X T F L O W  ~  version 0.25.5
Launching `hello.nf` [sleepy_carson] - revision: d9bdd10441
[warm up] executor > local
[warm up] executor > slurm
[b0/c74ce3] Submitted process > splitLetters
[0d/b90b8b] Submitted process > convertToUpper (1)
[df/ceed0d] Submitted process > convertToUpper (3)
[d9/d32a97] Submitted process > convertToUpper (2)
[d0/9f85db] Submitted process > convertToUpper (4)
HELLO WORLD!
[...snip...]

The master process submitting jobs should be run either as a batch job or on an interactive node - not on the biowulf login node.

Properties can be added to the processes to control how many will be run at a time ( maxForks ), where they will be run ( executor ), whether to use local scratch ( scratch). For example, here is the same workflow as above but running the first process locally and only a max of 4 parallel uppercase processes.

// vim: set ft=groovy:

params.file = file('greetings.txt').toAbsolutePath()

process splitLetters {
    executor 'local'

    output:
    file 'chunk_*' into letters mode flatten

    """
    pwd
    split -l1 '${params.file}' chunk_
    """
}


process convertToUpper {
    maxForks 4
    clusterOptions '--gres=lscratch:1'
    scratch '/lscratch/$SLURM_JOB_ID'

    input:
    file x from letters

    output:
    stdout result

    """
    cat $x | tr '[a-z]' '[A-Z]'
    """
}

result.subscribe {
    println it.trim()
}

This mode of parallelization is suited for pipelines where a reasonable number of jobs each run for periods of hours or minutes. For pipelines that run very large numbers of jobs that run for short periods (seconds or low minutes) there is another submission mode that uses OpenMPI to start a nextflow worker on each exclusively allocated node and then distributes work across the workers. This reduces the load on the SLURM scheduler. Below is a wrapper script that would submit a nextflow pipeline in that way

#! /bin/bash
#SBATCH --ntasks=2
#SBATCH --cpus-per-task=32
#SBATCH --nodes=2
#SBATCH --partition=multinode
#SBATCH --exclusive
#SBATCH --gres=lscratch:10



module load openmpi/1.10.0/gcc-4.4.7
module load nextflow/0.25.5
mpirun --pernode nextflow run nuc.nf -with-mpi

Where nuc.nf is

// vim: set ft=groovy :

params.in_fa = file('./gencode.vM9.transcripts.fa')
params.out = './nuc.csv'
params.chunk_size = 1000

// split the input fasta file into chunks of size 1
Channel
    .fromPath(params.in_fa)
    .splitFasta(by: params.chunk_size)
    .set { fasta }


// dirty python code to determine dinucleotide content
// assumes whole sequence is on a single line.
process nuc_content {
    scratch '/lscratch/$SLURM_JOB_ID'

    input:
    file fa from fasta

    output:
    stdout result

    """
    #!/usr/bin/env python
    import collections
    from Bio import SeqIO
    for record in SeqIO.parse("${fa}", "fasta"):
        seq = record.seq
        name = record.id.split('|')[0]
        n = collections.OrderedDict([('A', 0),
                                     ('C', 0),
                                     ('G', 0),
                                     ('T', 0),
                                     ('N', 0)])
        for nt in seq:
            try:
                n[nt] += 1
            except KeyError:
                pass
        print "%s,%s" % (name,
                         ",".join(str(a) for a in n.values()))
    """
}


/*
 * save the contents of the result channel to a file
 */
result
    .collectFile(name: params.out, newLine: false)
    .println {file -> "results in output file: ${file}\n" }

Batch job
Most jobs should be run as batch jobs.

The main job of a nextflow pipeline should be submitted as a batch job.