Bug Bulletin: The recent 3.2 release fixes many issues. If you run into a problem, please try the latest version before posting a bug report, as your problem may already have been solved.

A primer on parallelism with the GATK

Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin
edited January 2013 in Methods and Workflows

This document explains the concepts involved and how they are applied within the GATK (and Queue where applicable). For specific configuration recommendations, see the companion document on parallelizing GATK tools.

1. Introducing the concept of parallelism

Parallelism is a way to make a program finish faster by performing several operations in parallel, rather than sequentially (i.e. waiting for each operation to finish before starting the next one).

Imagine you need to cook rice for sixty-four people, but your rice cooker can only make enough rice for four people at a time. If you have to cook all the batches of rice sequentially, it's going to take all night. But if you have eight rice cookers that you can use in parallel, you can finish up to eight times faster.

This is a very simple idea but it has a key requirement: you have to be able to break down the job into smaller tasks that can be done independently. It's easy enough to divide portions of rice because rice itself is a collection of discrete units. In contrast, let's look at a case where you can't make that kind of division: it takes one pregnant woman nine months to grow a baby, but you can't do it in one month by having nine women share the work.

The good news is that most GATK runs are more like rice than like babies. Because GATK tools are built to use the Map/Reduce method (see doc for details), most GATK runs essentially consist of a series of many small independent operations that can be parallelized.

A quick warning about tradeoffs

Parallelism is a great way to speed up processing on large amounts of data, but it has "overhead" costs. Without getting too technical at this point, let's just say that parallelized jobs need to be managed, you have to set aside memory for them, regulate file access, collect results and so on. So it's important to balance the costs against the benefits, and avoid dividing the overall work into too many small jobs.

Going back to the introductory example, you wouldn't want to use a million tiny rice cookers that each boil a single grain of rice. They would take way too much space on your countertop, and the time it would take to distribute each grain then collect it when it's cooked would negate any benefits from parallelizing in the first place.

Parallel computing in practice (sort of)

OK, parallelism sounds great (despite the tradeoffs caveat), but how do we get from cooking rice to executing programs? What actually happens in the computer?

Consider that when you run a program like the GATK, you're just telling the computer to execute a set of instructions.

Let's say we have a text file and we want to count the number of lines in it. The set of instructions to do this can be as simple as:

  • open the file, count the number of lines in the file, tell us the number, close the file

Note that tell us the number can mean writing it to the console, or storing it somewhere for use later on.

Now let's say we want to know the number of words on each line. The set of instructions would be:

  • open the file, read the first line, count the number of words, tell us the number, read the second line, count the number of words, tell us the number, read the third line, count the number of words, tell us the number

And so on until we've read all the lines, and finally we can close the file. It's pretty straightforward, but if our file has a lot of lines, it will take a long time, and it will probably not use all the computing power we have available.

So to parallelize this program and save time, we just cut up this set of instructions into separate subsets like this:

  • open the file, index the lines

  • read the first line, count the number of words, tell us the number

  • read the second line, count the number of words, tell us the number
  • read the third line, count the number of words, tell us the number
  • [repeat for all lines]

  • collect final results and close the file

Here, the read the Nth line steps can be performed in parallel, because they are all independent operations.

You'll notice that we added a step, index the lines. That's a little bit of peliminary work that allows us to perform the read the Nth line steps in parallel (or in any order we want) because it tells us how many lines there are and where to find each one within the file. It makes the whole process much more efficient. As you may know, the GATK requires index files for the main data files (reference, BAMs and VCFs); the reason is essentially to have that indexing step already done.

Anyway, that's the general principle: you transform your linear set of instructions into several subsets of instructions. There's usually one subset that has to be run first and one that has to be run last, but all the subsets in the middle can be run at the same time (in parallel) or in whatever order you want.

2. Parallelizing the GATK

There are three different modes of parallelism offered by the GATK, and to really understand the difference you first need to understand what are the different levels of computing that are involved.

A quick word about levels of computing

By levels of computing, we mean the computing units in terms of hardware: the core, the machine (or CPU) and the cluster.

  • Core: the level below the machine. On your laptop or desktop, the CPU (central processing unit, or processor) contains one or more cores. If you have a recent machine, your CPU probably has at least two cores, and is therefore called dual-core. If it has four, it's a quad-core, and so on. High-end consumer machines like the latest Mac Pro have up to twelve-core CPUs (which should be called dodeca-core if we follow the Latin terminology) but the CPUs on some professional-grade machines can have tens or hundreds of cores.

  • Machine: the middle of the scale. For most of us, the machine is the laptop or desktop computer. Really we should refer to the CPU specifically, since that's the relevant part that does the processing, but the most common usage is to say machine. Except if the machine is part of a cluster, in which case it's called a node.

  • Cluster: the level above the machine. This is a high-performance computing structure made of a bunch of machines (usually called nodes) networked together. If you have access to a cluster, chances are it either belongs to your institution, or your company is renting time on it. A cluster can also be called a server farm or a load-sharing facility.

Parallelism can be applied at all three of these levels, but in different ways of course, and under different names. Parallelism takes the name of multi-threading at the core and machine levels, and scatter-gather at the cluster level.

Multi-threading

In computing, a thread of execution is a set of instructions that the program issues to the processor to get work done. In single-threading mode, a program only sends a single thread at a time to the processor and waits for it to be finished before sending another one. In multi-threading mode, the program may send several threads to the processor at the same time.

image

Not making sense? Let's go back to our earlier example, in which we wanted to count the number of words in each line of our text document. Hopefully it is clear that the first version of our little program (one long set of sequential instructions) is what you would run in single-threaded mode. And the second version (several subsets of instructions) is what you would run in multi-threaded mode, with each subset forming a separate thread. You would send out the first thread, which performs the preliminary work; then once it's done you would send the "middle" threads, which can be run in parallel; then finally once they're all done you would send out the final thread to clean up and collect final results.

If you're still having a hard time visualizing what the different threads are like, just imagine that you're doing cross-stitching. If you're a regular human, you're working with just one hand. You're pulling a needle and thread (a single thread!) through the canvas, making one stitch after another, one row after another. Now try to imagine an octopus doing cross-stitching. He can make several rows of stitches at the same time using a different needle and thread for each. Multi-threading in computers is surprisingly similar to that.

Hey, if you have a better example, let us know in the forum and we'll use that instead.

Alright, now that you understand the idea of multithreading, let's get practical: how do we do get the GATK to use multi-threading?

There are two options for multi-threading with the GATK, controlled by the arguments -nt and -nct, respectively. They can be combined, since they act at different levels of computing:

  • -nt / --num_threads controls the number of data threads sent to the processor (acting at the machine level)

  • -nct / --num_cpu_threads_per_data_thread controls the number of CPU threads allocated to each data thread (acting at the core level).

Not all GATK tools can use these options due to the nature of the analyses that they perform and how they traverse the data. Even in the case of tools that are used sequentially to perform a multi-step process, the individual tools may not support the same options. For example, at time of writing (Dec. 2012), of the tools involved in local realignment around indels, RealignerTargetCreator supports -nt but not -nct, while IndelRealigner does not support either of these options.

In addition, there are some important technical details that affect how these options can be used with optimal results. Those are explained along with specific recommendations for the main GATK tools in a companion document on parallelizing the GATK.

Scatter-gather

If you Google it, you'll find that the term scatter-gather can refer to a lot of different things, including strategies to get the best price quotes from online vendors, methods to control memory allocation and… an indie-rock band. What all of those things have in common (except possibly the band) is that they involve breaking up a task into smaller, parallelized tasks (scattering) then collecting and integrating the results (gathering). That should sound really familiar to you by now, since it's the general principle of parallel computing.

So yes, "scatter-gather" is really just another way to say we're parallelizing things. OK, but how is it different from multithreading, and why do we need yet another name?

As you know by now, multithreading specifically refers to what happens internally when the program (in our case, the GATK) sends several sets of instructions to the processor to achieve the instructions that you originally gave it in a single command-line. In contrast, the scatter-gather strategy as used by the GATK involves a separate program, called Queue, which generates separate GATK jobs (each with its own command-line) to achieve the instructions given in a so-called Qscript (i.e. a script written for Queue in a programming language called Scala).

image

At the simplest level, the Qscript can involve a single GATK tool*. In that case Queue will create separate GATK commands that will each run that tool on a portion of the input data (= the scatter step). The results of each run will be stored in temporary files. Then once all the runs are done, Queue will collate all the results into the final output files, as if the tool had been run as a single command (= the gather step).

Note that Queue has additional capabilities, such as managing the use of multiple GATK tools in a dependency-aware manner to run complex pipelines, but that is outside the scope of this article. To learn more about pipelining the GATK with Queue, please see the Queue documentation.

Compare and combine

So you see, scatter-gather is a very different process from multi-threading because the parallelization happens outside of the program itself. The big advantage is that this opens up the upper level of computing: the cluster level. Remember, the GATK program is limited to dispatching threads to the processor of the machine on which it is run – it cannot by itself send threads to a different machine. But Queue can dispatch scattered GATK jobs to different machines in a computing cluster by interfacing with your cluster's job management software.

That being said, multithreading has the great advantage that cores and machines all have access to shared machine memory with very high bandwidth capacity. In contrast, the multiple machines on a network used for scatter-gather are fundamentally limited by network costs.

The good news is that you can combine scatter-gather and multithreading: use Queue to scatter GATK jobs to different nodes on your cluster, then use the GATK's internal multithreading capabilities to parallelize the jobs running on each node.

Going back to the rice-cooking example, it's as if instead of cooking the rice yourself, you hired a catering company to do it for you. The company assigns the work to several people, who each have their own cooking station with multiple rice cookers. Now you can feed a lot more people in the same amount of time! And you don't even have to clean the dishes.

multithread_sm.png
300 x 127 - 16K
queue_sm.png
400 x 154 - 41K
Post edited by Geraldine_VdAuwera on

Geraldine Van der Auwera, PhD

Comments

  • priesgopriesgo Posts: 19Member

    Thanks Geraldine this is really helpful!

    But let me ask: is it documented somewhere which commands accept core and/or machine multithreading?

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    Glad you found this useful!

    It's not documented yet but that is at the top of my to-do list for 2013... i.e. starting next week. Will try to get that out for you all as soon as possible.

    Geraldine Van der Auwera, PhD

  • priesgopriesgo Posts: 19Member

    By the way I'll share what I found for some of the commands. This was obtained with with GATK Lite 2.3: BaseRecalibrator -> only nct PrintReads -> only nct RealignerTargetCreator -> only nt IndelRealigner -> no parallelism as stated above UnifiedGenotyper -> admits nt and nct VariantRecalibrator -> only nt ApplyRecalibration -> only nt VariantFiltration -> no parallelism VariantAnnotator -> only nt ReadBackedPhasing -> no parallelism

    Hope it helps someone!

  • priesgopriesgo Posts: 19Member

    uuufff what a mess... let's try again:

    BaseRecalibrator -> only nct

    PrintReads -> only nct

    RealignerTargetCreator -> only nt

    IndelRealigner -> no parallelism as stated above

    UnifiedGenotyper -> admits nt and nct

    VariantRecalibrator -> only nt

    ApplyRecalibration -> only nt

    VariantFiltration -> no parallelism

    VariantAnnotator -> only nt

    ReadBackedPhasing -> no parallelism

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    Thanks for contributing this, @priesgo! And sorry about the formatting mess... Markdown is a bit finicky. FYI adding two spaces at the end of a line will prevent wrapping.

    Geraldine Van der Auwera, PhD

  • lzhtomlzhtom Posts: 12Member

    A great article! But I'm still a little confused about the "machine" level and "core" level multithreading. I understand GATK uses a hierarchical microscheduler to manage a thread tree, with data thread the branches and cpu threads the leaves (correct me if I'm wrong). But if you call them "machine" and "core" level threads, do you mean each data thread is assigned to one core which contains all the associated cpu threads? I thought only OS can decide which threads go to one particular core ? If I'm wrong, could you tell me which java files within GATK contain the actual code of thread management? Thanks a lot!

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    Hi there,

    In this article we simplified the explanation to make it accessible to a wide audience. If you want more details and are comfortable reading through the code, I'd recommend looking at the MicroScheduler and NanoScheduler classes.

    Geraldine Van der Auwera, PhD

  • lucdhlucdh Posts: 10Member
    edited August 2013

    Hi, we've implemented our own scatter-gather strategy for running GATK on a cluster, using Sun Grid Engine to schedule jobs. For our purposes it is essential that all jobs run single threaded, i.e. use max 100% of each CPU core. That is because we inform the scheduler each job requires max 100% cpu. The scheduler relies on that information to optimize cluster performance. If the information is false, cluster nodes get overloaded and eventually crash. However, our attempts to enforce "single-threadedness" with parameter settings '-nt 1 -nct 1' have failed so far. We are observing peaks of 700% CPU for a supposedly single-threaded process. Below is an example of a process call. Any help or suggestions are most welcome. I would like to know in particular if the '-nt 1 -nct 1' setting has been observed to work , i.e. enforce "single-threadedness", on a cluster.

    sample command:

    [clipped]/jdk/1.7.0/bin/java -Xmx8G -XX:-UsePerfData -Djava.io.tmpdir=[clipped]/tmp -jar [clipped]/gatk/2.4.9/GenomeAnalysisTK.jar -et NO_ET -K [clipped]key -nt 1 -nct 1 -L 5:120000001-140000000 -T UnifiedGenotyper -R [clipped]gatk/broad_bundle_b37_v2.2/human_g1k_v37.fasta -I [clipped]gatk.bam --dbsnp [clipped]gatk/broad_bundle_b37_v2.2/dbsnp_137.b37.vcf --out [clipped]result/tmp/120000001-140000000.gatk.UnifiedGenotyper.raw.vcf --standard_min_confidence_threshold_for_calling 50 --standard_min_confidence_threshold_for_emitting 0 -dcov 200 --annotation AlleleBalance --annotation AlleleBalanceBySample --annotation BaseCounts --annotation BaseQualityRankSumTest --annotation ChromosomeCounts --annotation ClippingRankSumTest --annotation Coverage --annotation DepthPerAlleleBySample --annotation FisherStrand --annotation HaplotypeScore --annotation HardyWeinberg --annotation HomopolymerRun --annotation InbreedingCoeff --annotation LowMQ --annotation MappingQualityRankSumTest --annotation MappingQualityZero --annotation MappingQualityZeroBySample --annotation NBaseCount --annotation QualByDepth --annotation RMSMappingQuality --annotation ReadPosRankSumTest --annotation SampleList --annotation SpanningDeletions --annotation TandemRepeatAnnotator --annotation VariantType --output_mode EMIT_ALL_SITES --genotype_likelihoods_model BOTH --min_indel_fraction_per_sample 0.1
    
    Post edited by Geraldine_VdAuwera on
  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    Hi there,

    Using -nt 1 -nct 1 should not be necessary, as the GATK will never run multi-threaded within being explicitly instructed to do so. I'm not sure why you're seeing excessive CPU load but you can rule out the idea of GATK going rogue and multithreading on its own initiative. Unless you have evidence that that's what's happening, which we would definitely want to see. But I suspect your scheduler is to blame...

    In addition, I should also caution you that -nt 1 -nct 1 is not something you should ever add systematically to all commands, because some of the GATK tools do not support either one or both of those options, and in those cases the command will fail with an error.

    Geraldine Van der Auwera, PhD

  • Mark_DePristoMark_DePristo Posts: 153Administrator, GATK Developer admin

    It's possible that you are seeing the GC using many threads. I believe Queue specifies the number of GC threads for the JVM, otherwise this value will default to the number of cores on the machine. See:

    http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html#par_gc

    -- Mark A. DePristo, Ph.D. Co-Director, Medical and Population Genetics Broad Institute of MIT and Harvard

  • lucdhlucdh Posts: 10Member

    Thanks Geraldine and Mark, that was spot-on. Simply adding java command line option "-XX:-UseParallelGC" solved the problem.

  • jofejofe Posts: 2Member
    edited August 2013

    Hi! Very useful information! Not sure if I understand the difference between -nt and -nct. I have 1 node with 4 processors each has 16 cores and I want to run the following GATK tools: RealignerTargetCreator, IndelRealigner,BaseCalibrator and PrintReads, According the information of this post (and its comments), I would only be able to run RealignerTargetCreator using multithread ? Is this correct?

    Post edited by jofe on
  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    Not quite. Have a look at the companion document, it gives you more details about which tools take which options:

    http://www.broadinstitute.org/gatk/guide/article?id=1975

    The ultimate resource to know if a given tool accepts as certain type of parallelism is of course its Tech Doc article, which you can find here:

    http://www.broadinstitute.org/gatk/gatkdocs/

    Geraldine Van der Auwera, PhD

  • jofejofe Posts: 2Member

    Thank you very much for your reply. I have read the information you suggested. It help a lot.I just need to clarify a doubt: If I use, for example -nt 8 this means that I am using 8 cores? And if I use -nct 8 means the same but using a different parallelism technique?

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    Glad it helps!

    You are correct that -nt 8 means you use 8 cores.

    And if I use -nct 8 means the same but using a different parallelism technique?

    Not quite. -nct 8 is indeed a different mode of parallelism, but it means you use 8 data threads within a single core. The two options are complementary and can be used together (for the tools that support both), so if you use -nt 8 -nct 8 that means you have 8 data threads in each of the 8 cores, so you will have 64 data threads running in total.

    Geraldine Van der Auwera, PhD

  • garyhowellgaryhowell NC State UnviersityPosts: 1Member

    Is there any way to use more than one node, i.e., to use distributed memory.

    So far whenever I try to run a job with queue with the bsub option, the bsub file (which I can back out during runtime from bjobs -l .. ) there is a span[hosts=1] , which means the job is running with shared memory ..

    The user who wants gatk hopes that "scatter gather" will allow use of many nodes (each of which has shared memory), thereby allowing a highly parallel job. But so far I don't see that happening. Is the scalia meant to provide a distributed memory scatter gatther (without recourse to MPI calls embedded in fortran or C ?)

  • Geraldine_VdAuweraGeraldine_VdAuwera Posts: 6,089Administrator, GATK Developer admin

    @garyhowell This depends on how your cluster is set up. Queue itself essentially generates shell scripts that are submitted by bsub to the job manager/scheduler. It's up to the job manager to ensure that the jobs are dispatched to different nodes.

    Geraldine Van der Auwera, PhD

  • pdexheimerpdexheimer Posts: 335Member, GSA Collaborator ✭✭✭

    @garyhowell‌ - scatter/gather doesn't quite work that way. The idea is to break a single large task into a number of smaller tasks that are each independent of the other, and then gather the results of those subtasks into the final result. So it's really a distributed system rather than a parallel one, and shared memory is not used at all. In my use of Queue, I end up running a whole bunch of single-core jobs that each reserve 4GB of RAM. Some of those jobs will end up on the same multi-core node, but that's up to LSF to work out and the jobs themselves don't care.

    The nodes do need to all have access to a common storage, but shared memory is not necessary.

    If you're not seeing multiple jobs, I would guess that you're not setting the "scatterCount" (sp?) field of the QFunction. Also, keep in mind that certain tasks can only be subdivided so far, and others can't be divided at all (MarkDuplicates is the only one of the latter that I routinely use)

Sign In or Register to comment.