This lesson is still being designed and assembled (Pre-Alpha version)

Processes Part 2

Overview

Teaching: 30 min
Exercises: 10 min
Questions
  • How do I get data, files, and values, out of processes?

  • How do I handle grouped input and output?

  • How can I control when a process is executed?

  • How do I control resources, such as number of CPUs and memory, available to processes?

  • How do I save output/results from a process?

Objectives
  • Define outputs to a process.

  • Understand how to handle grouped input and output using the tuple qualifier.

  • Understand how to use conditionals to control process execution.

  • Use process directives to control execution of a process.

  • Use the publishDir directive to save result files to a directory.

Outputs

We have seen how to input data into a process; now we will see how to output files and values from a process.

The output declaration block allows us to define the channels used by the process to send out the files and values produced.

An output block is not required, but if it is present it can contain one or more output declarations.

The output block follows the syntax shown below:

output:
  <output qualifier> <output name>
  <output qualifier> <output name>
  ...

Output values

Like the input, the type of output data is defined using type qualifiers.

The val qualifier allows us to output a value defined in the script.

Because Nextflow processes can only communicate through channels, if we want to share a value output of one process as input to another process, we would need to define that value in the output declaration block as shown in the following example:

//process_output_value.nf
nextflow.enable.dsl=2

params.transcriptome="${projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz"

process COUNT_CHR_SEQS {
  input:
  val chr

  output:
  val chr

  script:
  """
  zgrep -c '^>Y'$chr $params.transcriptome
  """
}
// Both 'Channel' and 'channel' keywords work to generate channels.
// However, it is a good practice to be consistent through the whole pipeline development
chr_ch = channel.of('A'..'P')

workflow {
  COUNT_CHR_SEQS(chr_ch)
  // use the view operator to display contents of the channel
  COUNT_CHR_SEQS.out.view()
}
N E X T F L O W  ~  version 21.10.6
Launching `p1.nf` [jovial_lavoisier] - revision: a652ef75d4
executor >  local (16)
executor >  local (16)
[6a/d82669] process > COUNT_CHR_SEQS (16) [100%] 16 of 16 ✔
B
456

A
118

C
186

[..truncated..]

Output files

If we want to capture a file instead of a value as output we can use the path qualifier that can capture one or more files produced by the process, over the specified channel.

//process_output_file.nf
nextflow.enable.dsl=2

params.transcriptome="${projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz"

process COUNT_CHR_SEQS {
  input:
  val chr

  output:
  path "${chr}_seq_count.txt"

  script:
  """
  zgrep -c '^>Y'$chr $params.transcriptome > ${chr}_seq_count.txt
  """
}
// Both 'Channel' and 'channel' keywords work to generate channels.
// However, it is a good practice to be consistent through the whole pipeline development
chr_ch = channel.of('A'..'P')

workflow {
  COUNT_CHR_SEQS(chr_ch)
  // use the view operator to display contents of the channel
  COUNT_CHR_SEQS.out.view()
}
N E X T F L O W  ~  version 21.10.6
Launching `process_output_file.nf` [angry_lichterman] - revision: 6a46c69413
executor >  local (16)
[95/ec5d62] process > COUNT_CHR_SEQS (13) [100%] 16 of 16 ✔
/Users/ggrimes2/Documents/process_wf/work/f2/6d5c44985a15feb0555b7b71c37a9c/J_seq_count.txt
executor >  local (16)
[95/ec5d62] process > COUNT_CHR_SEQS (13) [100%] 16 of 16 ✔
work/f2/6d5c44985a15feb0555b7b71c37a9c/J_seq_count.txt
work/4f/f810942341d003acc80c2603671177/B_seq_count.txt
work/23/883ccf187b5357137a9a87d98717c0/I_seq_count.txt
[..truncated..]

In the above example the process COUNT_CHR_SEQS creates a file named <chr>_seq_count.txt in the work directory containing the number of transcripts within that chromosome.

Since a file parameter using the same name, <chr>_seq_count.txt, is declared in the output block, when the task is completed that file is sent over the output channel.

A downstream operator, such as .view or a process declaring the same channel as input will be able to receive it.

Multiple output files

When an output file name contains a * or ? metacharacter it is interpreted as a pattern match. This allows us to capture multiple files into a list and output them as a one item channel.

For example, here we will capture the files sequence_ids.txt and sequence.txt produced as results from SPLIT_FASTA in the output channel.

//process_output_multiple.nf
nextflow.enable.dsl=2

params.transcriptome="${projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz"

process SPLIT_FASTA {
  input:
  path transcriptome

  output:
  path "*"

  script:
  """
  zgrep  '^>' $transcriptome > sequence_ids.txt
  zgrep -v '^>' $transcriptome > sequence.txt
  """
}
// Both 'Channel' and 'channel' keywords work to generate channels.
// However, it is a good practice to be consistent through the whole pipeline development
transcriptome_ch = channel.fromPath(params.transcriptome)

workflow {
  SPLIT_FASTA(transcriptome_ch)
  // use the view operator to display contents of the channel
  SPLIT_FASTA.out.view()
}
$ nextflow run process_output_multiple.nf
N E X T F L O W  ~  version 21.10.6
Launching `process_output_multiple.nf` [goofy_meitner] - revision: 53cbf7e5a4
executor >  local (1)
[21/01e6ba] process > SPLIT_FASTA (1) [100%] 1 of 1 ✔
[/work/21/01e6baac41d2f37531f86dc7a57034/sequence.txt, work/21/01e6baac41d2f37531f86dc7a57034/sequence_ids.txt]

Note: There are some caveats on glob pattern behaviour:

Output channels

Modify the nextflow script process_exercise_output.nf to include an output block that captures the different output file ${chr}_seqids.txt.

//process_exercise_output.nf
nextflow.enable.dsl=2

process EXTRACT_IDS {
  input:
  path transcriptome
  each chr

  //add output block here to capture the file "${chr}_seqids.txt"

  script:
  """
  zgrep '^>Y'$chr $transcriptome > ${chr}_seqids.txt
  """
}

transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz')
chr_ch = channel.of('A'..'P')

workflow {
  EXTRACT_IDS(transcriptome_ch, chr_ch)
  EXTRACT_IDS.out.view()
}

Solution

//process_exercise_output_answer.nf
nextflow.enable.dsl=2

process EXTRACT_IDS {
  input:
  path transcriptome
  each chr

  //add output block here to capture the file "${chr}_seqids.txt"
  output:
  path "${chr}_seqids.txt"

  script:
  """
  zgrep '^>Y'$chr $transcriptome > ${chr}_seqids.txt
  """
}

transcriptome_ch = channel.fromPath('data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz')
chr_ch = channel.of('A'..'P')

workflow {
  EXTRACT_IDS(transcriptome_ch, chr_ch)
  EXTRACT_IDS.out.view()
}

Grouped inputs and outputs

So far we have seen how to declare multiple input and output channels, but each channel was handling only one value at time. However Nextflow can handle groups of values using the tuple qualifiers.

In tuples the first item is the grouping key and the second item is the list.

[group_key,[file1,file2,...]]

When using channel containing a tuple, such a one created with .filesFromPairs factory method, the corresponding input declaration must be declared with a tuple qualifier, followed by definition of each item in the tuple.

//process_tuple_input.nf
nextflow.enable.dsl=2

process TUPLEINPUT{
  input:
  tuple val(sample_id), path(reads)

  script:
  """
  echo $sample_id
  echo $reads
  """
}

reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')

workflow {
  TUPLEINPUT(reads_ch)
}

outputs

ref1
ref1_1.fq.gz ref1_2.fq.gz

In the same manner an output channel containing tuple of values can be declared using the tuple qualifier following by the definition of each tuple element in the tuple.

In the code snippet below the output channel would contain a tuple with the grouping key value as the Nextflow variable sample_id and a list containing the files matching the following pattern "${sample_id}.fq.gz".

output:
  tuple val(sample_id), path("${sample_id}.fq.gz")

An example can be seen in this script below.

//process_tuple_io.nf
nextflow.enable.dsl=2

process COMBINE_FQ {
  input:
  tuple val(sample_id), path(reads)

  output:
  tuple val(sample_id), path("${sample_id}.fq.gz")

  script:
  """
  cat $reads > ${sample_id}.fq.gz
  """
}

reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')

workflow {
  COMBINE_FQ(reads_ch)
  COMBINE_FQ.out.view()
}

nextflow run process_tuple_io.nf

The output is now a tuple containing the sample id and the combined fastq files.

[ref1, work/2d/a073d34b5b3231b1f57872599bd308/ref1.fq]

Composite inputs and outputs

Fill in the blank ___ input and output qualifiers for process_exercise_tuple.nf. Note: the output for the COMBINE_REPS process.

//process_exercise_tuple.nf
nextflow.enable.dsl=2

process COMBINE_REPS {
  input:
  tuple ___(sample_id), ___(reads)

  output:
  tuple ___(sample_id), ___("*.fq.gz")

  script:
  """
  cat *_1.fq.gz > ${sample_id}_R1.fq.gz
  cat *_2.fq.gz > ${sample_id}_R2.fq.gz
  """
}

reads_ch = Channel.fromFilePairs('data/yeast/reads/ref{1,2,3}*.fq.gz',size:-1)

workflow{
  COMBINE_REPS(reads_ch)
  COMBINE_REPS.out.view()
}

Solution

//process_exercise_tuple_answer.nf
nextflow.enable.dsl=2

process COMBINE_REPS {
  input:
  tuple val(sample_id), path(reads)

  output:
  tuple val(sample_id), path("*.fq.gz")

  script:
  """
  cat *_1.fq.gz > ${sample_id}_R1.fq.gz
  cat *_2.fq.gz > ${sample_id}_R2.fq.gz
  """
}

reads_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2,3}.fq.gz',size:-1)

workflow{
  COMBINE_REPS(reads_ch)
  COMBINE_REPS.out.view()
}
N E X T F L O W  ~  version 21.04.0
Launching `process_exercise_tuple.nf` [spontaneous_coulomb] - revision: 06ff22f1a9
executor >  local (3)
[75/f4a44d] process > COMBINE_REPS (3) [100%] 3 of 3 ✔
[ref3, work/99/a7d9176e332fdc0988973dbb89df63/ref3_R1.fq.gz, work/99/a7d9176e332fdc0988973dbb89df63/ref3_R2.fq.gz]
[ref2, /work/53/e3cbd39afa9f0f84a3d9cd060d991a/ref2_R1.fq.gz, /work/53/e3cbd39afa9f0f84a3d9cd060d991a/ref2_R2.fq.gz]
[ref1, work/75/f4a44d0bc761fa4774c2f23a465766/ref1_R1.fq.gz, work/75/f4a44d0bc761fa4774c2f23a465766/ref1_R2.fq.gz]

Conditional execution of a process

The when declaration allows you to define a condition that must be verified in order to execute the process. This can be any expression that evaluates a boolean value; true or false.

It is useful to enable/disable the process execution depending on the state of various inputs and parameters.

In the example below the process CONDITIONAL will only execute when the value of the chr variable is less than or equal to 5:

//process_when.nf
nextflow.enable.dsl=2

process CONDITIONAL {
  input:
  val chr

  when:
  chr <= 5

  script:
  """
  echo $chr
  """
}

chr_ch = channel.of(1..22)

workflow {
  CONDITIONAL(chr_ch)
}
4

5

2

3

1

Directives

Directive declarations allow the definition of optional settings, like the number of cpus and amount of memory, that affect the execution of the current process without affecting the task itself.

They must be entered at the top of the process body, before any other declaration blocks (i.e. input, output, etc).

Note: You do not use = when assigning a value to a directive.

Directives are commonly used to define the amount of computing resources to be used or extra information for configuration or logging purpose.

For example:

//process_directive.nf
nextflow.enable.dsl=2

process PRINTCHR {
  tag "tagging with chr$chr"
  cpus 1
  echo true

  input:
  val chr

  script:
  """
  echo processing chromosome: $chr
  echo number of cpus $task.cpus
  """
}

chr_ch = channel.of(1..22, 'X', 'Y')

workflow {
  PRINTCHR(chr_ch)
}
processing chromosome: 1
number of cpus 1

processing chromosome: 2
number of cpus 1

processing chromosome: 6
number of cpus 1
[..truncated..]

The above process uses the three directives, tag, cpus and echo.

The tag directive to allow you to give a custom tag to each process execution. This tag makes it easier to identify a particular task (executed instance of a process) in a log file or in the execution report.

The second directive cpus allows you to define the number of CPUs required for each task.

The third directive echo true prints the stdout to the terminal.

We use the Nextflow task.cpus variable to capture the number of cpus assigned to a task. This is frequently used to specify the number of threads in a multi-threaded command in the script block.

Another commonly used directive is memory specification: memory.

A complete list of directives is available at this link.

Adding directives

Modify the Nextflow script process_exercise_directives.nf

  1. Add a tag directive logging the sample_id in the execution output.
  2. Add a cpus directive to specify the number of cpus as 2.
  3. Change the fastqc -t option value to $task.cpus in the script directive.
//process_exercise_directives.nf
nextflow.enable.dsl=2

process FASTQC {
  //add tag directive
  //add cpu directive
 
  input:
  tuple val(sample_id), path(reads)
  
  output:
  tuple val(sample_id), path("fastqc_out")
  
  script:
  """
  mkdir fastqc_out
  fastqc $reads -o fastqc_out -t 1
  """
}

read_pairs_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2}.fq.gz')

workflow {
  FASTQC(read_pairs_ch)
  FASTQC.out.view()
}

solution

//process_directives_answer.nf
nextflow.enable.dsl=2

process FASTQC {
  tag "$sample_id"
  cpus 2
  
  input:
  tuple val(sample_id), path(reads)
  
  output:
  tuple val(sample_id), path("fastqc_out")
 
  script:
  """
  mkdir fastqc_out
  fastqc $reads -o fastqc_out -t $task.cpus
  """
}

read_pairs_ch = Channel.fromFilePairs('data/yeast/reads/ref*_{1,2}.fq.gz')

workflow {
  FASTQC(read_pairs_ch)
  FASTQC.out.view()
}
N E X T F L O W  ~  version 21.04.0
Launching `process_exercise_directives.nf` [sad_rosalind] - revision: 2ccbfa4937
executor >  local (3)
[90/de1125] process > FASTQC (ref1) [100%] 3 of 3 ✔
[ref2, work/ea/9e6a341b88caf8879e8d18b77049c8/fastqc_out]
[ref3, work/94/d059b816a9ec3d868f2924c26813e7/fastqc_out]
[ref1, work/90/de11251d362f494d6650789d9f8c1d/fastqc_out]

Organising outputs

PublishDir directive

Nextflow manages intermediate results from the pipeline’s expected outputs independently.

Files created by a process are stored in a task specific working directory which is considered as temporary. Normally this is under the work directory, which can be deleted upon completion.

The files you want the workflow to return as results need to be defined in the process output block and then the output directory specified using the directive publishDir. More information here.

Note: A common mistake is to specify an output directory in the publishDir directive while forgetting to specify the files you want to include in the output block.

publishDir <directory>, parameter: value, parameter2: value ...

For example if we want to capture the results of the COMBINE_READS process in a results/merged_reads output directory we need to define the files in the output and specify the location of the results directory in the publishDir directive:

//process_publishDir.nf
nextflow.enable.dsl=2

process COMBINE_READS {
  publishDir "results/merged_reads"

  input:
  tuple val(sample_id), path(reads)

  output:
  path("${sample_id}.merged.fq.gz")

  script:
  """
  cat ${reads} > ${sample_id}.merged.fq.gz
  """
}

reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')


workflow {
  COMBINE_READS(reads_ch)
}

$ nextflow run process_publishDir.nf
N E X T F L O W  ~  version 21.04.0
Launching `process_publishDir.nf` [friendly_pauling] - revision: 9b5c315893
executor >  local (1)

[d9/909ed9] process > COMBINE_READS (1) [100%] 1 of 1 ✔

We can use the UNIX command tree to examine the contents of the results directory.

tree results
results/
└── merged_reads
    └── ref1.merged.fq.gz -> work/48/f97234d7185cbfbd86e2f11c1afab5/ref1.merged.fq.gz

In the above example, the publishDir "results/merged_reads", creates a symbolic link -> to the output files specified by the process merged_reads to the directory path results/merged_reads.

publishDir

The publishDir output is relative to the path the pipeline run has been launched. Hence, it is a good practice to use implicit variables like projectDir to specify publishDir value.

publishDir parameters

The publishDir directive can take optional parameters, for example the mode parameter can take the value "copy" to specify that you wish to copy the file to output directory rather than just a symbolic link to the files in the working directory. Since the working directory is generally deleted on completion of a pipeline, it is safest to use mode: "copy" for results files. The default mode (symlink) is helpful for checking intermediate files which are not needed in the long term.

publishDir "results/merged_reads", mode: "copy"

Full list here.

Manage semantic sub-directories

You can use more than one publishDir to keep different outputs in separate directories. To specify which files to put in which output directory use the parameter pattern with the a glob pattern that selects which files to publish from the overall set of output files.

In the example below we will create an output folder structure in the directory results, which contains a separate sub-directory for sequence id file, pattern:"*_ids.txt" , and a sequence directory, results/sequence" for the sequence.txt file. Remember, we need to specify the files we want to copy as outputs.

//process_publishDir_semantic.nf
nextflow.enable.dsl=2

params.transcriptome="${projectDir}/data/yeast/transcriptome/Saccharomyces_cerevisiae.R64-1-1.cdna.all.fa.gz"

process SPLIT_FASTA {
  publishDir "results/ids", pattern: "*_ids.txt", mode: "copy"
  publishDir "results/sequence", pattern: "sequence.txt", mode: "copy"


  input:
  path transcriptome

  output:
  path "*"

  script:
  """
  zgrep  '^>' $transcriptome > sequence_ids.txt
  zgrep -v '^>' $transcriptome > sequence.txt
  """
}
// Both 'Channel' and 'channel' keywords work to generate channels.
// However, it is a good practice to be consistent through the whole pipeline development
transcriptome_ch = channel.fromPath(params.transcriptome)

workflow {
  SPLIT_FASTA(transcriptome_ch)
  // use the view operator to display contents of the channel
  SPLIT_FASTA.out.view()
}
$ nextflow run process_publishDir_semantic.nf
N E X T F L O W  ~  version 21.04.0
Launching `process_publishDir_semantic.nf` [golden_poisson] - revision: 421a604840

executor >  local (1)
[be/950786] process > SPLIT_FASTA (1) [100%] 1 of 1 ✔

We can now use the tree command to examine the results directory.

$ tree results
results/
├── sequence
│   └── sequence.txt
└── ids
    └── sequence_ids.txt

Publishing results

Add a publishDir directive to the nextflow script process_exercise_publishDir.nf that copies the merged reads to the results folder merged_reps.

//process_exercise_publishDir.nf
nextflow.enable.dsl=2

params.reads= "data/yeast/reads/ref{1,2,3}*{1,2}.fq.gz"

process MERGE_REPS {
  
  input:
  tuple val(sample_id), path(reads)
  
  output:
  path("*fq.gz")

  script:
  """
  cat *1.fq.gz > ${sample_id}.merged.R1.fq.gz
  cat *2.fq.gz > ${sample_id}.merged.R2.fq.gz
  """
}
reads_ch = Channel.fromFilePairs(params.reads,checkIfExists:true,size:6)

workflow {
  MERGE_REPS(reads_ch)
}

Solution

//process_exercise_publishDir_answer.nf
nextflow.enable.dsl=2

params.reads= "data/yeast/reads/ref{1,2,3}*{1,2}.fq.gz"

process MERGE_REPS {
  publishDir "results/merged_reps"
  input:
  tuple val(sample_id), path(reads)
  output:
  path("*fq.gz")

  script:
  """
  cat *1.fq.gz > ${sample_id}.merged.R1.fq.gz
  cat *2.fq.gz > ${sample_id}.merged.R2.fq.gz
  """
}

reads_ch = Channel.fromFilePairs(params.reads,checkIfExists:true,size:6)

workflow {
  MERGE_REPS(reads_ch)
}
$ nextflow run process_exercise_publishDir.nf
N E X T F L O W  ~  version 21.04.0
Launching `process_exercise_publishDir.nf` [infallible_becquerel] - revision: 4d865241a8

executor >  local (1)
[22/88aa22] process > MERGE_REPS (1) [100%] 1 of 1 ✔

Nextflow Patterns

If you want to find out common structures of Nextflow processes, the Nextflow Patterns page collects some recurrent implementation patterns used in Nextflow applications.

Key Points

  • Outputs to a process are defined using the output blocks.

  • You can group input and output data from a process using the tuple qualifier.

  • The execution of a process can be controlled using the when declaration and conditional statements.

  • Files produced within a process and defined as output can be saved to a directory using the publishDir directive.