Bug Bulletin: The GenomeLocPArser error in SplitNCigarReads has been fixed; if you encounter it, use the latest nightly build.

Output management

Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,412Administrator, GATK Developer admin
edited October 2012 in Developer Zone

1. Introduction

When running either single-threaded or in shared-memory parallelism mode, the GATK guarantees that output written to an output stream created via the @Argument mechanism will ultimately be assembled in genomic order. In order to assemble the final output file, the GATK will write the output generated from each thread into a temporary output file, ultimately assembling the data via a central coordinating thread. There are three major elements in the GATK that facilitate this functionality:

  • Stub

    The front-end interface to the output management system. Stubs will be injected into the walker by the command-line argument system and relay information from the walker to the output management system. There will be one stub per invocation of the GATK.

  • Storage

    The back end interface, responsible for creating, writing and deleting temporary output files as well as merging their contents back into the primary output file. One Storage object will exist per shard processed in the GATK.

  • OutputTracker

    The dispatcher; ultimately connects the stub object's output creation request back to the most appropriate storage object to satisfy that request. One OutputTracker will exist per GATK invocation.

2. Basic Mechanism

Stubs are directly injected into the walker through the GATK's command-line argument parser as a go-between from walker to output management system. When a walker calls into the stub it's first responsibility is to call into the output tracker to retrieve an appropriate storage object. The behavior of the OutputTracker from this point forward depends mainly on the parallelization mode of this traversal of the GATK.

If the traversal is single-threaded:

  • the OutputTracker (implemented as DirectOutputTracker) will create the storage object if necessary and return it to the stub.

  • The stub will forward the request to the provided storage object.

  • At the end of the traversal, the microscheduler will request that the OutputTracker finalize and close the file.

If the traversal is multi-threaded using shared-memory parallelism:

  • The OutputTracker (implemented as ThreadLocalOutputTracker) will look for a storage object associated with this thread via a ThreadLocal.

  • If no such storage object exists, it will be created pointing to a temporary file.

  • At the end of each shard processed, that file will be closed and an OutputMergeTask will be created so that the shared-memory parallelism code can merge the output at its leisure.

  • The shared-memory parallelism code will merge when a fixed number of temporary files appear in the input queue. The constant used to determine this frequency is fixed at compile time (see HierarchicalMicroScheduler.MAX_OUTSTANDING_OUTPUT_MERGES).

3. Using output management

To use the output management system, declare a field in your walker of one of the existing core output types, coupled with either an @Argument or @Output annotation.

@Output(doc="Write output to this BAM filename instead of STDOUT")
SAMFileWriter out;

Currently supported output types are SAM/BAM (declare SAMFileWriter), VCF (declare VCFWriter), and any non-buffering stream extending from OutputStream.

4. Implementing a new output type

To create a new output type, three types must be implemented: Stub, Storage, and ArgumentTypeDescriptor.

To implement Stub

Create a new Stub class, extending/inheriting the core output type's interface and implementing the Stub interface.

OutputStreamStub extends OutputStream implements Stub<OutputStream> {

Implement a register function so that the engine can provide the stub with the session's OutputTracker.

public void register( OutputTracker outputTracker ) {
    this.outputTracker = outputTracker;
}

Add as fields any parameters necessary for the storage object to create temporary storage.

private final File targetFile;
public File getOutputFile() { return targetFile; }

Implement/override every method in the core output type's interface to pass along calls to the appropriate storage object via the OutputTracker.

public void write( byte[] b, int off, int len ) throws IOException {
    outputTracker.getStorage(this).write(b, off, len);
}

To implement Storage

Create a Storage class, again extending inheriting the core output type's interface and implementing the Storage interface.

public class OutputStreamStorage extends OutputStream implements Storage<OutputStream> {

Implement constructors that will accept just the Stub or Stub + alternate file path and create a repository for data, and a close function that will close that repository.

public OutputStreamStorage( OutputStreamStub stub ) { ... }
public OutputStreamStorage( OutputStreamStub stub, File file ) { ... }
public void close() { ... }

Implement a mergeInto function capable of reconstituting the file created by the constructor, dumping it back into the core output type's interface, and removing the source file.

public void mergeInto( OutputStream targetStream ) { ... }

Add a block to StorageFactory.createStorage() capable of creating the new storage object. TODO: use reflection to generate the storage classes.

    if(stub instanceof OutputStreamStub) {
        if( file != null )
            storage = new OutputStreamStorage((OutputStreamStub)stub,file);
        else
            storage = new OutputStreamStorage((OutputStreamStub)stub);
    }

To implement ArgumentTypeDescriptor

Create a new object inheriting from type ArgumentTypeDescriptor. Note that the ArgumentTypeDescriptor does NOT need to support the core output type's interface.

public class OutputStreamArgumentTypeDescriptor extends ArgumentTypeDescriptor {

Implement a truth function indicating which types this ArgumentTypeDescriptor can service.

 @Override
 public boolean supports( Class type ) {
     return SAMFileWriter.class.equals(type) || StingSAMFileWriter.class.equals(type);
 }

Implement a parse function that constructs the new Stub object. The function should register this type as an output by caling engine.addOutput(stub).

 public Object parse( ParsingEngine parsingEngine, ArgumentSource source, Type type, ArgumentMatches matches )  {
     ...
     OutputStreamStub stub = new OutputStreamStub(new File(fileName));
     ...
     engine.addOutput(stub);
     ....
     return stub;
}

Add a creator for this new ArgumentTypeDescriptor in CommandLineExecutable.getArgumentTypeDescriptors().

 protected Collection<ArgumentTypeDescriptor> getArgumentTypeDescriptors() {
     return Arrays.asList( new VCFWriterArgumentTypeDescriptor(engine,System.out,argumentSources),
                           new SAMFileWriterArgumentTypeDescriptor(engine,System.out),
                           new OutputStreamArgumentTypeDescriptor(engine,System.out) );
 }

After creating these three objects, the new output type should be ready for usage as described above.

5. Outstanding issues

  • Only non-buffering iterators are currently supported by the GATK. Of particular note, PrintWriter will appear to drop records if created by the command-line argument system; use PrintStream instead.

  • For efficiency, the GATK does not reduce output files together following the tree pattern used by shared-memory parallelism; output merges happen via an independent queue. Because of this, output merges happening during a treeReduce may not behave correctly.

Post edited by Geraldine_VdAuwera on

Geraldine Van der Auwera, PhD

Sign In or Register to comment.