Bug Bulletin: The GenomeLocPArser error in SplitNCigarReads has been fixed; if you encounter it, use the latest nightly build.

Parallelism with the GATK - RETIRED

Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,412Administrator, GATK Developer admin
edited January 2013 in Developer Zone

This article is out of date and has been replaced by updated documents:

- Primer on parallelism

- Specific usage recommendations

The old document, for archival purposes:

1. Overview

The MapReduce architecture of the GATK allows most walkers in the GATK to be run in a parallel-processing mode. The GATK supports two basic parallel processing models known as shared memory and scatter-gather.

  • Shared memory parallelism

    Parallelism within a single multi-threading process with access to a large, shared RAM. Shared memory parallelism is stable and supported by many tools that access pileups of bases.

  • Scatter/gather (SG) parallelism

    In SG parallelism, the target genomic regions are divided up into N independent GATK instances that are run separately on a single machine or across a computing cluster. The outputs of each independent walker, are merged together once all are completed. SG works very efficiently in the GATK, provided the output of a walker is independent per site (e.g. Unified Genotyper) or per chromosome (e.g. Indel Realigner). SG parallelism is a completely stable approach in the GATK, and used routinely by the GATK team in processing large data sets; it is also natively supported by GATK-Queue, which automatically scatters and gathers GATK processes given a desired N number of processes to execute simultaneously.

Note that parallel-processing will significantly speed up data processing but may produce statistically insignificant differences. While this non-determinism is not ideal in practice the minute differences have been mathematically meaningless while producing consistent results in a reasonable amount of time for whole genome and whole exome data. However, if absolute determinism is more important than speed we recommend you do not use parallelism with the GATK.

2. Comparison of GATK parallelism options

There are costs and benefits to each type of parallelism in the GATK, as outlined in the following table.

Comparison of standard parallelism approaches in the GATK

Property Shared Memory Scatter/Gather
Stability Stable Stable | Retired in codebase
Applicable walker types By locus and by ROD only. ReadWalkers are not supported. All walker types. ReadWalkers can only be split safely by chromosome in general
Example walkers UnifiedGenotyper, BaseRecalibrator, VariantEval All walkers, including ReadWalkers like IndelRealigner
Scalability Fewer than 32 cores. Each thread operates completely independently, so N threads requires N times more memory than 1 thread alone. Best scaling at 8 or fewer threads. Hundreds of processes. Limited by capabilities of the underlying storage system, in general. Isilon-class storage can run thousands of jobs effectively.
How to enable Use the -nt argument in the Engine, on any walker that supports shared memory parallelism (the engine will tell you if it does not). 1. Provide -L interval lists to the GATK; a different one for each of the N independent GATK tools. For example -L chr1 for first process, and -L chr2 for the second. When all processes have finished, merge the output together, as appropriate (e.g. use MergeSam.jar for BAMs, and cat or CombineVariants for VCFs). 2. Use GATK-Queue to automatically divide up your GATK jobs. For example, using this.scatterCount = 10 argument will result in 10 independent processes.
Pros - Easy to enable. - Single output file merged together by internally by the GATK engine - Efficiently uses multi-core processors sharing a single memory space - Works for all walker types, including ReadWalkers - Scales to hundreds of independent jobs - Easy to enable with single -L argument - Directly supported and managed by GATK-Queue - Totally independent processing per interval -- failed parts can be easily resumed without repeating already successfully processed regions
Cons - Limited to fewer than 32 processors without significant overhead - Limited to cores physically present on the machine, cannot take advantage of computing cluster resources - Does not work for ReadWalkers (Table Recalibrator, Indel Realigner) - Requires manual preparation of sliced genomic intervals for processing (if you aren't using GATK-Queue). - For ReadWalkers and other tools that can only be processed by chromosome, leading to load balancing problems (chr1 is much bigger than chr22) - Sensitive to data density variation over the genome. Dividing chr20 processing in 63 1MB chunks leads to 10x differences in completion times due to data pileups around the centromere, for example. - Must wait until all parts of the scatter have completed before gathering, making the process sensitive to farm scheduling problems. If a job will run for M minutes, but waits Z minutes to start on the farm, the entire SG process will not complete for at least M + Z minutes.

3. Which parallelism option is right for me?

Almost certainly, either shared memory or scatter/gather parallelism is the right choice for your NGS pipeline. Our go-to option for parallelism here at the Broad Institute is S/G, since S/G allows you to cut up your jobs into hundreds of pieces to run on our standard computing farm, using GATK-Queue. When I have a small job that needs to be run quickly, am testing out program options or need a quick VariantEval result, I'm using shared memory parallelism with ~10 threads on a single large computer with a lot (>64 GB) of memory.

Basically, if I have N processors, and I want to choose between shared memory or S/G, here's how I would choose:

  • If all N processors are on a single computer, and my walker supports it, use shared memory parallelism

  • If not, use S/G

4. Shared Memory Parallelism (Stable)

The GATK currently supports a hierarchical version of parallelism. In this form of parallelism, data is divided into shards, each shard is map/reduced independently, and the results are combined with a 'tree reduce' step. While the framework handles much of the heavy lifting of data division required for parallelism, each walker must individually be reviewed to ensure that it isn't tracking state internally in a non-threadsafe way. Many tools support shared memory parallelism, including critical tools such as:

  • UnifiedGenotyper
  • CountCovariates
  • VariantEval

Please review the source to discover if your walker is parallelizable, or attempt to run it with -nt 2 and if it the engine doesn't complain the walker is parallelized.

In shared memory parallelism, each thread operates on a 16 kbp shard of reference sequence in a completely independent memory space from all other threads. Up to 24 threads can run efficiently in this design, but greater parallelism is limited by resource starvation from the single reduce thread and/or memory inefficiency by keeping each thread’s data totally independent. See gatkParallelism performance 082112 these plots for an analysis of the scalability of several key GATK tools as a function of nt.

Enabling n-way parallelism from the command-line

Run the GATK, using the -nt command-line argument to specify the number of threads that the GATK should attempt to use.

[[image:HierarchicalParallelism.png|thumb|Shared memory parallelism architecture]]

Helpful hints: Implementing a walker with parallelism in mind

First, be aware that some walkers may, by design, require a rewrite for complete parallelization.

  • When implementing a standard (non-parallelized) walker, one must implement the reduce method, which combines an individual datum returned by the map function with the aggregate of the prior map calls. When implementing a parallelizable walker, one must also implement the org.broadinstitute.sting.gatk.walkers.TreeReducible interface and the treeReduce() function. The TreeReduce function tells the GATK how to combine two adjacent reduces, rather than one map result and one reduce.

  • The GATK supports writing to output files from either the map or the reduce when running in parallel. However, only unbuffered writers are currently supported. Please use PrintStream rather than PrintWriter at this time.

Limitations

The GATK's support for parallelism is currently limited. The following classes of walkers are not supported by our parallelization framework:

  • Read walkers
  • Reduce-by-interval walkers

Note that each thread operates completely independently in the current GATK implementation of shared memory parallelism. So if you provide 1Gb to the GATK with nt 1, then you should provide 4Gb to run with nt 4. If you don't do this, it is possible to starve out the GATK so that it runs much much slower.

The performance of the multi-threaded GATK is really dependent on whether you are IO or CPU limited and the relative overhead of the parallelism on your computer. Additionally, nt can start to have very high overheads with nt > 20 due to our implementation and memory contention issues.

The best option for nt is a value less or equal to the number of available cores with sufficient memory to run each threads (nt x amount provided to 1 core), capped additionally by the available IO bandwidth so that the individual threads don't starve each other.

5. Scatter / gather parallelism

Scatter / gather is a simple approach for process-independent parallelism with the GATK. First you scatter multiple independent GATK instances out over a network of computing nodes, each operating on slightly different genomic intervals, and when they all complete, the output of each is gathered up into a merged output dataset. In the GATK S/G is extremely easy to use, as all of the GATK tools support the -L option to operate over only genomic specific intervals, and many tools emit files that can be merged together across multiple regions. Unified Genotyper, for example, can operate over the whole genome in one process, or on each chromosome independently. The output of this later approach, after merging together, should be the same as the whole genome results, minus any slight differences due to random number effects. The simplicity and efficiency of S/G parallelism makes this a key option for getting things done quickly with the GATK.

Using S/G parallelism is extremely easy, either by hand or using the automated Scatter/Gather in GATK-Queue. Suppose I have the following command line:

java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1

This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it with S/G parallelism mode, just partition chr1 into two independent parts:

This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. If you run at the command line:

gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:1-125,000,000 -o my.1.vcf &
gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:125,000,001-249,250,621 -o my.2.vcf &

When these two jobs finish, I just merge the two VCFs together and I've got a complete data set in half the time.

Post edited by Geraldine_VdAuwera on

Geraldine Van der Auwera, PhD

Comments

  • ClarraClarra Posts: 4Member

    Hello,

    I have a question regarding the subject on this page: what is the default behavior regarding parallelism, if no this.scatterCount parameter nor -nt command line argument are specified?

    Is it by default the execution of the program serial, meaning wil the map() function from each walker be called sequentially(within only one process) for each piece of data?

    Thanks! Clarra

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,412Administrator, GATK Developer admin

    Yes, that's correct, Clarra. By default execution is serial.

    Geraldine Van der Auwera, PhD

  • ClarraClarra Posts: 4Member

    Thank you for the answer. Regarding the parallelism I would have another question:

    when using a JobRunner, are there different walkers also run in parallel, provided that their execution is independent from each other? or is the parallelism limited to the map/reduce operations within one walker?

    Thanks! Clarra

  • CarneiroCarneiro Posts: 274Administrator, GATK Developer admin

    -nt is limited to the walker and uses shared memory (runs on the same machine), scatter/gather is done by walker, uses independent memory and each run will be on a different machine, furthermore it's independent of -nt (you can have both).

    Answering your first question, the default value of -nt is 0, but scatterCount is defined in each QScript, so each script can have it's own default. If you're writing your own script and you never specifiy a scatter count, however, that defaults to 0. But if you're using someone else's script, you'll have to look inside.

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,412Administrator, GATK Developer admin

    @Clarra, if the walkers' execution is independent, they will be scatter-gathered in parallel.

    Geraldine Van der Auwera, PhD

  • ClarraClarra Posts: 4Member

    Thanks for the answers guys. 1) @Geraldine: if so, this means that if I set a scatterCount = 3 in the QScript, and I have a pipeline of walkers (graph) like: A->B and another component C (C not connected yet, may be used later on) => B and C are independent => B and C will be also executed in parallel with each other (according to what you said before), each of them on 3 nodes, which means that 1 node will get tasks from both walker B and walker C, right?

    Further questions: 2) ScatterCount looks as a global parameter, given in the QScript => all the walkers will scatter/gather on the same number of nodes. Can I somehow specify that walker A must run serial and walker B in parallel mode (ScatterGather)?

    3) you talk in the above page about the risk of inconsistency in results when running a program with Scatter/Gather. What kind of inconsistencies more precisely, is this related to how the output from different nodes is gathered and put together? Or how exactly do these inconsistencies materialize?

    Thanks again!

  • CarneiroCarneiro Posts: 274Administrator, GATK Developer admin

    Answering your questions:

    2) ScatterCount is a walker variable. Each walker has it's own scatterCount. Scripts may have a global parameter that sets each walker's variable to be the same because most of the time, this is the desired behavior, but it doesn't have to be the same.

    3) Walkers are responsible for telling Queue how to scatter gather their run. Some walkers have no easy way to split the data, or simply can't split data by locus, this is the case of most (if not all) read walkers. If you scatter intervals randomly by locus, reads that span two consecutive intervals will be present in both independent runs of the walker, and your result will contain duplicates, hence being inconsistent. If the walker tells Queue that it can only be split by contig, interval or locus, scatter gather will do the right thing. By default all read walkers are scattered by contig and all locus walkers by locus. For all the walkers released by our team, we have the correct @ScatterBy annotation, so you don't need to worry.

  • ClarraClarra Posts: 4Member

    Hello,

    I am sorry I need to come back with another set of questions for the team regading the parallelism with Queue.

    1) When using Scatter/Gather, is there a master node designated that scatters the input data and gathers the results, or this is taken care of internally by Queue on the node where the Queue is run?

    2) How does the communication between the master node and the "slave" nodes take place? Is it made through messages that are passed on the network or ...?

    3) How does the master know when a slave has finised executing or when all the slaves are done?

    Thank you agin in advance! Clarra

  • Lee_CIDRLee_CIDR Posts: 2Member

    Hello - will "soon" come soon, please? Just wondering when this obsolete doc will be replaced with the "updated version" promised back in August. Because we really REALLY need how to properly make use of the new Nanoscheduler in GATK 2.2 in order to speed up WGS runs - it's taking days and we need to get down to hours. Thanks for any updates - some is better than none!

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,412Administrator, GATK Developer admin

    Working on it at this very moment... one more round of internal review to fact check and it will be out. ETA Monday morning with the new release.

    Geraldine Van der Auwera, PhD

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,412Administrator, GATK Developer admin
    edited December 2012

    The updated parallelism docs are now available:

    Post edited by Geraldine_VdAuwera on

    Geraldine Van der Auwera, PhD

Sign In or Register to comment.