Bug Bulletin: The recent 3.2 release fixes many issues. If you run into a problem, please try the latest version before posting a bug report, as your problem may already have been solved.

Pipelining the GATK with Queue

Geraldine_VdAuweraGeraldine_VdAuwera Posts: 5,869Administrator, GATK Developer admin
edited February 3 in Queue

1. Introduction

As mentioned in the introductory materials, the core concept behind the GATK tools is the walker. The Queue scripting framework contains several mechanisms which make it easy to chain together GATK walkers.

2. Authoring walkers

As part of authoring your walker there are several Queue behaviors that you can specify for QScript authors using your particular walker.

Specifying how to partition

Queue can significantly speed up generating walker outputs by passing different instances of the GATK the same BAM or VCF data but specifying different regions of the data to analyze. After the different instances output their individual results Queue will gather the results back to the original output path requested by QScript.

Queue limits the level it will split genomic data by examining the @PartitionBy() annotation for your walker which specifies a PartitionType. This table lists the different partition types along with the default partition level for each of the different walker types.

PartitionType Default for Walker Type Description Example Intervals Example Splits
PartitionType.CONTIG Read walkers Data is grouped together so that all genomic data from the same contig is never presented to two different instances of the GATK. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 split 1: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60; split 2:chr3:10-11
PartitionType.INTERVAL (none) Data is split down to the interval level but never divides up an explicitly specified interval. If no explicit intervals are specified in the QScript for the GATK then this is effectively the same as splitting by contig. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 split 1: chr1:10-11, chr2:10-20, chr2:30-40; split 2: chr2:50-60, chr3:10-11
PartitionType.LOCUS Locus walkers, ROD walkers Data is split down to the locus level possibly dividing up intervals. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 split 1: chr1:10-11, chr2:10-20, chr2:30-35; split 2: chr2:36-40, chr2:50-60, chr3:10-11
PartitionType.NONE Read pair walkers, Duplicate walkers The data cannot be split and Queue must run the single instance of the GATK as specified in the QScript. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 no split: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11

If you walker is implemented in a way that Queue should not divide up your data you should explicitly set the @PartitionBy(PartitionType.NONE). If your walker can theoretically be run per genome location specify @PartitionBy(PartitionType.LOCUS).

@PartitionBy(PartitionType.LOCUS)
public class ExampleWalker extends LocusWalker<Integer, Integer> {
...

Specifying how to join outputs

Queue will join the standard walker outputs.

Output type Default gatherer implementation
SAMFileWriter The BAM files are joined together using Picard's MergeSamFiles.
VCFWriter The VCF files are joined together using the GATK CombineVariants.
PrintStream The first two files are scanned for a common header. The header is written once into the output, and then each file is appended to the output, skipping past with the header lines.

If your PrintStream is not a simple text file that can be concatenated together, you must implement a Gatherer. Extend your custom Gatherer from the abstract base class and implement the gather() method.

package org.broadinstitute.sting.commandline;

import java.io.File;
import java.util.List;

/**
 * Combines a list of files into a single output.
 */
public abstract class Gatherer {
    /**
     * Gathers a list of files into a single output.
     * @param inputs Files to combine.
     * @param output Path to output file.
     */
    public abstract void gather(List<File> inputs, File output);

    /**
     * Returns true if the caller should wait for the input files to propagate over NFS before running gather().
     */
    public boolean waitForInputs() { return true; }
}

Specify your gatherer using the @Gather() annotation by your @Output.

@Output
@Gather(MyGatherer.class)
public PrintStream out;

Queue will run your custom gatherer to join the intermediate outputs together.

3. Using GATK walkers in Queue

Queue GATK Extensions

Running 'ant queue' builds a set of Queue extensions for the GATK-Engine. Every GATK walker and command line program in the compiled GenomeAnalysisTK.jar a Queue compatible wrapper is generated.

The extensions can be imported via import org.broadinstitute.sting.queue.extensions.gatk._

import org.broadinstitute.sting.queue.QScript
import org.broadinstitute.sting.queue.extensions.gatk._

class MyQscript extends QScript {
...

Note that the generated GATK extensions will automatically handle shell-escaping of all values assigned to the various Walker parameters, so you can rest assured that all of your values will be taken literally by the shell. Do not attempt to escape values yourself -- ie.,

Do this:

filterSNPs.filterExpression = List("QD<2.0", "MQ<40.0", "HaplotypeScore>13.0")

NOT this:

filterSNPs.filterExpression = List("\"QD<2.0\"", "\"MQ<40.0\"", "\"HaplotypeScore>13.0\"")

Listing variables

In addition to the GATK documentation on this wiki you can also find the full list of arguments for each walker extension in a variety of ways.

The source code for the extensions is generated during ant queue and placed in this directory:

build/queue-extensions/src

When properly configured an IDE can provide command completion of the walker extensions. See Queue with IntelliJ IDEA for our recommended settings.

If you do not have access to an IDE you can still find the names of the generated variables using the command line. The generated variable names on each extension are based off of the fullName of the Walker argument. To see the built in documentation for each Walker, run the GATK with:

java -jar GenomeAnalysisTK.jar -T <walker name> -help

Once the import statement is specified you can add() instances of gatk extensions in your QScript's script() method.

Setting variables

If the GATK walker input allows more than one of a value you should specify the values as a List().

  def script() {
    val snps = new UnifiedGenotyper
    snps.reference_file = new File("testdata/exampleFASTA.fasta")
    snps.input_file = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")
    add(snps)
  }

Although it may be harder for others trying to read your QScript, for each of the long name arguments the extensions contain aliases to their short names as well.

  def script() {
    val snps = new UnifiedGenotyper
    snps.R = new File("testdata/exampleFASTA.fasta")
    snps.I = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")
    add(snps)
  }

Here are a few more examples using various list assignment operators.

  def script() {
    val countCovariates = new CountCovariates

    // Append to list using item appender :+
    countCovariates.rodBind :+= RodBind("dbsnp", "VCF", dbSNP)

    // Append to list using collection appender ++
    countCovariates.covariate ++= List("ReadGroupCovariate", "QualityScoreCovariate", "CycleCovariate", "DinucCovariate")

    // Assign list using plain old object assignment
    countCovariates.input_file = List(inBam)

    // The following is not a list, so just assigning one file to another
    countCovariates.recal_file = outRecalFile

    add(countCovariates)
  }

Specifying an alternate GATK jar

By default Queue runs the GATK from the current classpath. This works best since the extensions are generated and compiled at time same time the GATK is compiled via ant queue.

If you need to swap in a different version of the GATK you may not be able to use the generated extensions. The alternate GATK jar must have the same command line arguments as the GATK compiled with Queue. Otherwise the arguments will not match and you will get an error when Queue attempts to run the alternate GATK jar. In this case you will have to create your own custom CommandLineFunction for your analysis.

  def script {
    val snps = new UnifiedGenotyper
    snps.jarFile = new File("myPatchedGATK.jar")
    snps.reference_file = new File("testdata/exampleFASTA.fasta")
    snps.input_file = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")
    add(snps)
  }

GATK scatter/gather

Queue currently allows QScript authors to explicitly invoke scatter/gather on GATK walkers by setting the scatter count on a function.

  def script {
    val snps = new UnifiedGenotyper
    snps.reference_file = new File("testdata/exampleFASTA.fasta")
    snps.input_file = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")
    snps.scatterCount = 20
    add(snps)
  }

This will run the UnifiedGenotyper up to 20 ways parallel and then will merge the partial VCFs back into the single snps.vcf.

Additional caveat

Some walkers are still being updated to support Queue fully. For example they may not have defined the @Input and @Output and thus Queue is unable to correctly track their dependencies, or a custom Gatherer may not be implemented yet.

Post edited by Geraldine_VdAuwera on

Geraldine Van der Auwera, PhD

Comments

Sign In or Register to comment.