Update: July 26, 2019
This section of the forum is now closed; we are working on a new support model for WDL that we will share here shortly. For Cromwell-specific issues, see the Cromwell docs and post questions on Github.

Scatter-Gather Parallelism

KateNKateN Cambridge, MAMember, Broadie, Moderator admin
edited June 2017 in Plumbing Options

Parallelism is a way to make a program finish faster by performing several operations in parallel, rather than sequentially (i.e. waiting for each operation to finish before starting the next one). For a more detailed introduction on parallelism, you can read about it in-depth here.

image

To do this, we use the scatter function from the WDL standard library, which will produce parallelizable jobs running the same task on each input in an array, and output the results as an array as well.

Array[File] inputFiles

  scatter (oneFile in inputFiles) {
    call stepA { input: in=oneFile }
  }
  call stepB { input: files=stepA.out }

The magic here is that the array of outputs is produced and passed along to the next task without you ever making an explicit declaration about it being an array. Even though the output of stepA looks like a single file based on its declaration, just referring to stepA.out in any other call statement is sufficient for WDL to know that you mean the array grouping the outputs of all the parallelized stepA jobs.

In other words, the scatter part of the process is explicit while the gather part is implicit.


Generic example script

To put this in context, here is what the code for the workflow illustrated above would look like in full:

workflow ScatterGather {

  Array[File] inputFiles

  scatter (oneFile in inputFiles) {
    call stepA { input: in=oneFile }
  }
  call stepB { input: files=stepA.out }
}

task stepA {

  File in

  command { programA I=${in} O=outputA.ext }
  output { File out = "outputA.ext" }
}

task stepB {

  Array[File] files

  command { programB I=${files} O=outputB.ext }
  output { File out = "outputB.ext" }
}

Concrete example

Let’s look at a concrete example of scatter-gather parallelism in a workflow designed to call variants individually per-sample (HaplotypeCaller), then perform joint genotyping on all the per-sample GVCFs together (GenotypeGVCFs).

image

The workflow involves two tasks:

  • HaplotypeCallerERC takes in a File bamFile and produces a File GVCF.
  • GenotypeGVCFs takes in an Array[File] GVCF and produces a File rawVCF.

Concrete example script

This is what the code for the workflow illustrated above would look like:

workflow ScatterGatherExample {

  Array[File] sampleBAMs

  scatter (sample in sampleBAMs) {
    call HaplotypeCallerERC { input: bamFile=sample }
  }
  call GenotypeGVCF { input: GVCFs=HaplotypeCallerERC.GVCF }
}

task HaplotypeCallerERC {

  File bamFile

  command {
    java -jar GenomeAnalysisTK.jar \
        -T HaplotypeCaller \
        -ERC GVCF \
        -R reference.fasta \
        -I ${bamFile} \
        -o rawLikelihoods.gvcf
  }
  output {
    File GVCF = "rawLikelihoods.gvcf"
  }
}

task GenotypeGVCF {

  Array[File] GVCFs
  
  command {
    java -jar GenomeAnalysisTK.jar \
        -T GenotypeGVCFs \
        -R reference.fasta \
        -V ${GVCFs} \
        -o rawVariants.vcf
  }
  output {
    File rawVCF = "rawVariants.vcf"
  }
}

Note that here for simplicity we omitted the handling of index files, which has to be done explicitly in WDL. For examples of how to do that, see the Tutorials and Real Workflows.

Additionally, please note that we did not expressly handle delimiters for Array data types in this example. To learn explicitly about how to specify the ${GVCFs} input, please see this tutorial.

Post edited by KateVoss on

Comments

  • rtemannirtemanni QatarMember

    Hi,
    Is there any documentation on how to use schedulers(ie. LSF) to run parallel jobs as part of the workflow ?
    Thanks in advance for your help
    Best
    Ramzi

  • KateNKateN Cambridge, MAMember, Broadie, Moderator admin

    Unfortunately, there is no documentation on how to use schedulers as we do not currently support scheduler use in Cromwell. However, this feature is certainly on the radar, and plans to address on prem compute are in the works.

  • dshihdshih BostonMember, Broadie

    In the code below,

    task stepB {
    
      Array[File] files
    
      command { programB I=${file} O=outputB.ext }
      output { File out = "outputB.ext" }
    }
    

    Should the command line not use ${files} instead of ${file} ?

    Additionally, in the concrete example for scatter/gather,

    command {
      java -jar GenomeAnalysisTK.jar \
          -T GenotypeGVCFs \
          -R reference.fasta \
          -V ${GVCFs} \
          -o rawVariants.vcf
    }
    

    Is ${GVCFs} passed command line as an expanded string in which the elements of the array are delimited by space?

    For example, given that ${GVCFs} contain the array [file1, file2, file3], would the ${GVCFs} variable be substituted by the string file1 file2 file3?

  • KateNKateN Cambridge, MAMember, Broadie, Moderator admin

    You are correct, it should be ${files}, rather than ${file}--that was a typo on my part. I have changed the document to reflect this.

    To input ${GVCFs} properly, you do need to specify a sep delimiter. For more information on exactly how that works, you can read the tutorial here. For sake of simplicity, I did not include this sep specification in this overview-style document. However, I will add a note to clarify.

    Thank you for your feedback!

  • rtemannirtemanni QatarMember

    Hi,
    I've seen some reference in the documentation showing support for schedulers but these are set as comments for now.
    https://github.com/broadinstitute/cromwell/blob/39976521780c226a4cd693efd073053296df64e4/core/src/main/resources/application.conf
    //LSF { // actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory" // config { // submit = "bsub -J ${job_name} -cwd ${cwd} -o ${out} -e ${err} /bin/bash ${script}" // kill = "bkill ${job_id}" // check-alive = "bjobs ${job_id}" // job-id-regex = "Job <(\\d+)>.*" // } //}
    Is that working ? Any documentation how to use backend support ?
    Thanks in advance
    Best
    Ramzi

  • amitgsiramitgsir Busan, South KoreaMember

    Hi, is there any way to scatter by contig or region (-L Interval List).

    I want to use scatter in Variant Calling by specifying the interval_region (by -L), but can not foing how to use in wdl.

    Amit

  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin
    @rtemanni Sorry for the late response. As you will have found by now we don't yet have documentation for local backend execution, that's on our todo list. In the meantime you may be able to get that information by making an issue in the Cromwell repository.
  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin
    @amitgsir Yes, this can be done. Have a look at the pipeline scripts in the WDL repository, there are several that use scatter by interval.
  • oskarvoskarv BergenMember

    I'm trying to run HaplotypeCaller with 87 input files and it's consuming all of my RAM. If I limit the RAM given to the .jar with "-Xmx1G" it runs fine, but the longest estimation I saw for one file was over one week, while another file had an estimated analysis time of 12h. If I split the .bam file with bamtools split and limit the number of input files to only the 22 chromosomes, I get an estimated analysis time of 3h to 4 days depending on which file I look at. Running the original 130 GB input file without scatter gather, without splitting it with bamtools, and with -nct 8, results in a total runtime of 16 hours.

    Here's my .wdl script:
    //workflow jointCallingGenotypes { // // File inputSamplesFile // Array[Array[File]] inputSamples = read_tsv(inputSamplesFile) // File gatk // File refFasta // File refIndex // File refDict // // scatter (sample in inputSamples) { // call HaplotypeCallerERC { // input: GATK=gatk, // RefFasta=refFasta, // RefIndex=refIndex, // RefDict=refDict, // sampleName=sample[0], // bamFile=sample[1], // bamIndex=sample[2] // } //} //} //task HaplotypeCallerERC { // // File GATK // File RefFasta // File RefIndex // File RefDict // String sampleName // File bamFile // File bamIndex // // command { // java -Xmx8G \ // -jar ${GATK} \ // -T HaplotypeCaller \ // -ERC GVCF \ // -R ${RefFasta} \ // -I ${bamFile} \ // -o ${sampleName}_rawLikelihoods.g.vcf //# -L 10 //} //# runtime { //# memory: "10GB" //# cpu: "1" //#} // output { // File GVCF = "${sampleName}_rawLikelihoods.g.vcf" // } //}

    I'm using a modified version of the example script for Scatter Gather, and I've also tried changing the runtime parameters, but I think I read that they are limited to cloud computing setups?

    And as I wrote earlier, my input is a list of 87 input files that I created by running the original 133GB .bam file through "bamtools split -in input.bam -reference". Running all of them will eventually crash the server by consuming all of the available RAM, so I manually kill the process to save the server.

    Why does it take longer to run the analysis with this scatter gather script than it does when I straight up use the gatk.jar file and -nct 8? Either I'm overestimating how effective scatter gather is, or there's something wrong somewhere. Another example is when I run this script with a single input file that is 27MB, it takes approximately 48 minutes to complete the analysis, and if I split it with bamtools and only run the 22 chromosomes, the runtime is around 3 hours per file.

    How can this be?

  • KateNKateN Cambridge, MAMember, Broadie, Moderator admin
    edited October 2016

    It sounds like you are trying to scatter by interval. You have your original 130 GB file, which you split into 87 parts, writing each to a new file. You then scatter by each file, using the scatter method outlined in this article.

    If that is correct, then I believe I have a more efficient method of scattering for you. WDL allows you to scatter by interval, using just one master BAM file. In fact, this is the method we use in our production pipeline, a public version of which can be found here. An in-depth document describing that pipeline can be found here. The relevant part to you would be this section in the workflow:

      # Call variants in parallel over WGS calling intervals
      scatter (subInterval in scattered_calling_intervals) {
    
        # Generate GVCF by interval
        call HaplotypeCaller {
          input:
            input_bam = GatherBamFiles.output_bam,
            input_bam_index = GatherBamFiles.output_bam_index,
            interval_list = subInterval,
            gvcf_basename = sample_name,
            ref_dict = ref_dict,
            ref_fasta = ref_fasta,
            ref_fasta_index = ref_fasta_index,
            disk_size = agg_small_disk,
            preemptible_tries = agg_preemptible_tries
         }
      }
    
      # Combine by-interval GVCFs into a single sample GVCF file
      call GatherVCFs {
        input:
          input_vcfs = HaplotypeCaller.output_gvcf,
          input_vcfs_indexes = HaplotypeCaller.output_gvcf_index,
          output_vcf_name = final_gvcf_name,
          disk_size = agg_small_disk,
          preemptible_tries = agg_preemptible_tries
      }
    

    Each interval, subInterval from a list of intervals, scattered_calling_intervals is run through HaplotypeCaller. (Though not named as such, this HaplotypeCaller runs in reference confidence mode, thus producing GVCFs) The scatter loop generates a separate GVCF for each interval, which then must be combined by the GatherVCFs task.

    I believe this method will show improvement to your runtimes, as there is less overhead. Please let me know if this is not the case.

    With regards to your other question, yes: certain runtime parameters are limited to certain backends. We currently support three backends, and here is a handy chart that lists which backend acknowledges which runtime parameters: (accurate as of Cromwell version 0.21)

    Runtime Attribute LOCAL JES SGE
    continueOnReturnCode x x x
    cpu x x
    disks x
    zones x
    docker x x x
    failOnStderr x x x
    memory x x
    preemptible x
    bootDiskSizeGb x
  • oskarvoskarv BergenMember

    Hello again,
    I still haven't managed to get it to work properly, and I'm getting the same problem as before.
    I'm using the same 130GB input file, but I have managed to split up the interval list file into separate interval lists to use them for scatter gather parallelization. The json file below only has two interval files for readability, but the complete set of interval files is 86.

    I still have a problem with the analysis time as detailed further below. But let's start with a clarifying question first:
    When I run an analysis, my premise is that each interval list will tell cromwell (?) to only run on that specific contig and only on those specific intervals, and each interval list is given one thread, correct? What I'm seeing when I grep "Y:" in each of the stderr files, from a finished run, is different Y-coordinates in each file. Is this as it's supposed to be? Does it mean that the outputs would get sorted and combined in the next step?
    Whether this is correct or not, I am under the impression that the parallelization is based on each thread getting one interval list, e.g thread 1 would get the interval list for chromosome 1, and once it's done, it gets more work if there's any available, and at the end, all of the results are combined into one vcf file? Maybe I'm completely wrong though.

    I'm running the script on a 32 thread, 256GB RAM machine.
    When I run the same 130GB bam file as last time, with -nct 8, the total run time is 16h, but if I try to run the scatter gather script below with the 86 interval lists, the total run time is roughly 40 hours. I say roughly because I'm using the estimated time from the stderr file. I've looked at various shards for contigs that range from 60MB to 6GB, and the ETA is roughly the same for all of them.

    I split up the intervals into eight lists so I could run the analysis on eight threads to see if it would change the ETA, but that still gave a 40h ETA.

    I figured that perhaps something was wrong with the input data, and to verify that I ran an analysis on a 2.5MB bam file to view the .vcf file. When I tried using all of the 86 intervals it resulted in an ETA of 17h. Using only four interval lists gave an ETA of 50 minutes.
    I didn't get a vcf file because my script doesn't support that yet, but I got the stderr files and saw what I described above, each shard is given a part of each interval list, as opposed to one interval list for each shard.

    I basically have the same problem as before, but this time I'm scattering based on lists rather than based on files like I did before.

    I've copied and adapted the script that you posted in your previous reply, perhaps there's something I'm missing that is causing all of this confusion?

    Here's the json file:
    { "scattergather.bamIndex": "/data/CoriellIndex-133GB.bai", "scattergather.gatk": "/tools/tools/GenomeAnalysisTK-3.6/GenomeAnalysisTK.jar", "scattergather.refDict": "/references/human_g1k_v37_decoy.dict", "scattergather.sampleName": "Sample_", "scattergather.gvcfBasename": "GVCF_", "scattergather.refFasta": "/references/human_g1k_v37_decoy.fasta", "scattergather.refIndex": "/references/human_g1k_v37_decoy.fasta.fai", "scattergather.inputBam": "/data/CoriellIndex-133GB.bam", "scattergather.intervalListFile": [ "/tools/scattergather-test/output/X", ... "/tools/scattergather-test/output/hs37d5" ] }

    And here's the wdl file:

    workflow scattergather {
    
      File inputBam
      Array[File] intervalListFile
      File bamIndex
      File gatk
      File refFasta
      File refIndex
      File refDict
    
      String sampleName
      String gvcfBasename
    
      scatter (sample in intervalListFile) {
        call HaplotypeCallerERC {
          input:
            GATK=gatk, 
            RefFasta=refFasta, 
            RefIndex=refIndex, 
            RefDict=refDict, 
            SampleName=sampleName,
            InputFile=inputBam,
            InputIndex=bamIndex,
            IntervalList=sample
        }
    }
    }
    task HaplotypeCallerERC {
    
      File GATK
      File RefFasta
      File RefIndex
      File RefDict
      File InputFile
      File IntervalList
      File InputIndex
    
      String SampleName
    
      command {
        java -Xmx8G \
            -jar ${GATK} \
            -T HaplotypeCaller \
            -ERC GVCF \
            -R ${RefFasta} \
            -I ${InputFile} \
            -o ${SampleName}_rawLikelihoods.g.vcf \
            -L ${IntervalList}
    }
    
      output {
        File GVCF = "${SampleName}_rawLikelihoods.g.vcf"
      }
    }
    
  • KateNKateN Cambridge, MAMember, Broadie, Moderator admin

    I have a couple questions for you to clear up some points that may be contributing to your longer runtimes.

    1. When you say that you run the 130GB bam with -nct 8 and it runs for 16 hours, are you running that using a WDL script, or just typing the GATK command into your terminal on your local machine?
    2. When you say that you get the estimated runtimes from the stderr, how long do you let the program run before you kill the process to check the stderr? I know when initially spinning up a job, the estimates for run time can be inaccurate. For example, I have seen HaplotypeCaller initially estimate a 4-day runtime, then within 20 minutes or so, it has a more accurate estimate of a couple hours.

    To answer some of your questions: yes, your script is correct, given what I suggested in my previous post. You did not miss anything. However, multithreading (using -nct 8) is different from scattering (using the scatter block in WDL.) Scattering 8 ways, does not mean that each way gets one thread to work with. In fact, you can use multithreading and scattering in conjunction by adding a -nct 8 to your command section.

    At the end of the scatter block, all results are not combined into one vcf file. Instead, you will get an array of vcf files, each containing the results for one interval. You can merge these vcfs into one using the GatherVcfs task I mentioned in my earlier post.

  • oskarvoskarv BergenMember

    I'm not kidding you, I just came to work and started doing some initial tests, and it works now. I'm baffled, I have no idea what I did or didn't do to make it work. The only odd thing that happened when I was setting up was that when I started the analysis, it complained about the list files not being named .list, .bed or some other supported file name. It didn't complain about that yesterday. But I renamed the interval files to .list and the longest ETA is 5 hours, some of the jobs are already done after just a few minutes, and I'm using the 130GB input file. And it's only using ~10GB RAM with 8 interval lists, as opposed to over 200GB yesterday. I had to kill it so it wouldn't crash the server. And it even works with all of the 86 interval files, I think the max RAM usage I saw was around 80GBs. At first it went really slow, but most of the intervals are really short so they completed really quickly, and the remaining jobs have reasonable ETAs.

    But to answer your questions.
    1. Straight into the terminal.
    2. I let it run for up to 10 minutes, any longer would have crashed the server since it was eating up all of the RAM.

    One thing that I noticed is that the outputs in each stderr file show only one contig each, as opposed to a mix of all of the contigs like I saw yesterday. Perhaps that was the issue? Maybe the lack of correct file endings slipped through the cracks and garbled up the reading of the interval files so that each shard was reading a portion of all interval lists? And now with the correct file endings, each shard only has one interval list and that makes it behave properly? Either way it works now, that's all that matters for me.

    Thank you so much for your support, this makes it so much more worth it to use your tools!
    I'm building pipelines for national biobank projects in Norway, and I will definitely heavily consider your tools for upcoming projects!

  • oskarvoskarv BergenMember

    By the way, I have a feature request. It would be nice to have a setting where the user could choose to automatically split an interval file into contigs, or an arbitrary number, for the scatter gather process. I'm sure there are more optimal ways to split interval files, I think I saw in your example script on your github that you chose to split your interval list into 50 pieces due to your local server setup? Not every user is able to optimize an individual analysis, so having the option to automatically split the interval list into contigs is probably better than running it serially. We will probably set up a bash script for it, but having it built in could be usable for others too.

    By the way again, it only took 62 minutes to finish the analysis from my previous post using scatter gather, compared to over 16 hours when I used -nct 8 straight in the terminal.

  • KateNKateN Cambridge, MAMember, Broadie, Moderator admin

    I am happy to hear that it works now! I will certainly pass the feature request on to our developers.

  • amitgsiramitgsir Busan, South KoreaMember
    edited February 2017

    Hi, Attached is the my wdl script. I am running BaseRecalibrator (and post baserecal), PrintReads and Mutect2 by scattering the BAM file using the genomic region bed file for each chomosome.

    After PrintReads I can find the bam and bai (index) file for each job (scattered process) and it's output bam file is Inout for the Mutect2.
    MuTect2 can read the bam file from PrintRead step but it always throw the error than the index file is missing. however, Index file is located with the BAM file in PrintReads output folder.

    Any suggestion what I am missing here?

    Below is the mutect2 error messgae:

    INFO 22:17:36,093 HelpFormatter - --------------------------------------------------------------------------------
    INFO 22:17:36,096 HelpFormatter - The Genome Analysis Toolkit (GATK) v3.6-0-g89b7209, Compiled 2016/06/01 22:27:29
    INFO 22:17:36,097 HelpFormatter - Copyright (c) 2010-2016 The Broad Institute
    INFO 22:17:36,097 HelpFormatter - For support and documentation go to https://www.broadinstitute.org/gatk
    INFO 22:17:36,097 HelpFormatter - [Thu Feb 16 22:17:36 PST 2017] Executing on Linux 4.4.0-57-generic amd64
    INFO 22:17:36,097 HelpFormatter - Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 JdkDeflater
    INFO 22:17:36,104 HelpFormatter - Program Args: -T MuTect2 -R /data2/WDL/cromwell-executions/SimpleVariantSelection/9b134faf-ed54-4cb9-adec-665869060b13/call-Mutect2/shard-0/inputs/data1/BIO/data/ucsc.hg19.fasta -I:tumor /data2/WDL/cromwell-executions/SimpleVariantSelection/9b134faf-ed54-4cb9-adec-665869060b13/call-Mutect2/shard-0/inputs/data2/WDL/cromwell-executions/SimpleVariantSelection/9b134faf-ed54-4cb9-adec-665869060b13/call-PrintRead/shard-0/execution/TestMutect1.PreProcessedReads.bam --dbsnp /data2/WDL/cromwell-executions/SimpleVariantSelection/9b134faf-ed54-4cb9-adec-665869060b13/call-Mutect2/shard-0/inputs/data2/NuGen/Data/dbsnp147_All_20160408_hg19.vcf --cosmic /data2/WDL/cromwell-executions/SimpleVariantSelection/9b134faf-ed54-4cb9-adec-665869060b13/call-Mutect2/shard-0/inputs/data2/NuGen/Data/CosmicCodingMuts_hg19_sorted.vcf --normal_panel /data2/WDL/cromwell-executions/SimpleVariantSelection/9b134faf-ed54-4cb9-adec-665869060b13/call-Mutect2/shard-0/inputs/data2/NuGen/Data/ExAC.hg19.vcf -L chrM --disable_auto_index_creation_and_locking_when_reading_rods -o TestMutect1.raw.indels.snps.vcf
    INFO 22:17:36,119 HelpFormatter - Executing as [email protected] on Linux 4.4.0-57-generic amd64; Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14.
    INFO 22:17:36,119 HelpFormatter - Date/Time: 2017/02/16 22:17:36
    INFO 22:17:36,119 HelpFormatter - --------------------------------------------------------------------------------
    INFO 22:17:36,120 HelpFormatter - --------------------------------------------------------------------------------
    INFO 22:17:36,198 GenomeAnalysisEngine - Strictness is SILENT
    INFO 22:17:36,539 GenomeAnalysisEngine - Downsampling Settings: Method: BY_SAMPLE, Target Coverage: 1000
    INFO 22:17:36,551 SAMDataSource$SAMReaders - Initializing SAMRecords in serial
    INFO 22:17:36,600 SAMDataSource$SAMReaders - Done initializing BAM readers: total time 0.05
    INFO 22:52:34,621 IntervalUtils - Processing 16571 bp from intervals
    INFO 22:52:34,940 GenomeAnalysisEngine - Preparing for traversal over 1 BAM files

    ERROR ------------------------------------------------------------------------------------------
    ERROR A USER ERROR has occurred (version 3.6-0-g89b7209):
    ERROR
    ERROR This means that one or more arguments or inputs in your command are incorrect.
    ERROR The error message below tells you what is the problem.
    ERROR
    ERROR If the problem is an invalid argument, please check the online documentation guide
    ERROR (or rerun your command with --help) to view allowable command-line arguments for this tool.
    ERROR
    ERROR Visit our website and forum for extensive documentation and answers to
    ERROR commonly asked questions https://www.broadinstitute.org/gatk
    ERROR
    ERROR Please do NOT post this error to the GATK forum unless you have really tried to fix it yourself.
    ERROR
    ERROR MESSAGE: Invalid command line: Cannot process the provided BAM/CRAM file(s) because they were not indexed. The GATK does offer limited processing of unindexed BAM/CRAMs in --unsafe mode, but this feature is unsupported -- use it at your own risk!
    ERROR ------------------------------------------------------------------------------------------
    Post edited by amitgsir on
  • amitgsiramitgsir Busan, South KoreaMember
    edited February 2017

    Sorry, file was not uploaded due to filetype issue. Edited the above comment.

    Post edited by amitgsir on
  • shleeshlee CambridgeMember, Broadie ✭✭✭✭✭

    Hi @amitgsir,

    Next time, you can just add a .txt extension to your WDL script to attach it to a post.

    I've extracted your script and see that the PrintRead task's output section only shows the BAM as the output and is missing the BAI index.

      output
      {
        File PreProcessedBam = "${sampleName}.PreProcessedReads.bam"
      }
    

    You can change this to:

      output
      {
        File PreProcessedBam = "${sampleName}.PreProcessedReads.bam"
        File PrePrecessedBam_index= "${sampleName}.PreProcessedReads.bai"
      }
    

    Then add File PreProcesseDBam_index to the object types at the top of the MuTect2 task, and then provide this index file accordingly, e.g. PreProcesseDBam_index=PrintRead.PreProcessedBam_index in the call Mutect2 section of the workflow. I think you get the idea.

  • amitgsiramitgsir Busan, South KoreaMember

    Yeah, I missed the trick. It's working fine now. thanks @shlee~~

  • oskarvoskarv BergenMember

    I'm trying to manage the server load during scatter gather execution and I've noticed some random and potentially useful behavior in cromwell in server mode. When I start cromwell fresh and run my test pipeline for scatter gather jobs I'm using 51 interval lists and I'm limiting the number of jobs to 13. The problem is that when the scatter gather process starts the load average rises up to ~150, and I'd like to keep it around 30 on average with random spikes up to 60 or so. I've noticed that if I stop and start the script 2-3 times the load average doesn't go up to 150 when I start it again, it doesn't go much higher than 30-40 which is what I'm aiming for. Is this due to some caching of the interval lists that has happens in cromwell?

    Either way I want to ask if there is some way to e.g slowly allow 1 job to start every 60 seconds? That would possibly allow each job to finish its start up procedure before the next job starts, and thus keep the load average at a more sensible level.

    And I'm curious to know why you've built your scatter gather part in your best practices pipeline the way it's built. You gather the bam files from ApplyBQSR and use that single bam file as input for HaplotypeCaller, but why not use the shards as input? You can still gather the bamfiles if it's important, but do it in parallel with HaplotypeCaller. And then you gather the VCF files and use that single VCF file as input for GenotypeGVCF, but why not use the shards from HaplotypeCaller as input for GenotypeGVCF? And again, you can still gather the VCF shards, but do it parallel with GenotypeGVCF. That way you utilize your CPU better, I was able to shave one one hour off doing this.

    But maybe there's something I'm missing?

  • shleeshlee CambridgeMember, Broadie ✭✭✭✭✭

    Hi @oskarv,

    Thanks for sharing your observation. Someone on the team should have an idea whether caching is involved and hopefully comment. Also, the ramping up sounds like a good feature to have.

    I have a personal set of scripts where I use the HaplotypeCaller shards as input to GenotypeGVCFs as you do. This is a legitimate approach! For my case, these had to be two separate scripts because at the time I wrote them, scatter of scatters (the transpose scatter across the shards of multiple samples) was not yet supported. So I had an intermediary Python script to help me organize my numerous inputs for the GenotypeGVCF step. The advantage of gathering a per-sample file between these steps is the simple file handling on the human front without any helper script to organize the scattered files.

    If efficient CPU utilization and saving money is important to your aims, then by all means you should use your scripts. You are spot on and not missing anything.

  • oskarvoskarv BergenMember
    edited March 2017

    @shlee said:
    Hi @oskarv,

    Thanks for sharing your observation. Someone on the team should have an idea whether caching is involved and hopefully comment. Also, the ramping up sounds like a good feature to have.

    I have a personal set of scripts where I use the HaplotypeCaller shards as input to GenotypeGVCFs as you do. This is a legitimate approach! For my case, these had to be two separate scripts because at the time I wrote them, scatter of scatters (the transpose scatter across the shards of multiple samples) was not yet supported. So I had an intermediary Python script to help me organize my numerous inputs for the GenotypeGVCF step. The advantage of gathering a per-sample file between these steps is the simple file handling on the human front without any helper script to organize the scattered files.

    If efficient CPU utilization and saving money is important to your aims, then by all means you should use your scripts. You are spot on and not missing anything.

    I'm not sure if we are talking about the same thing now regarding my question about scatter gather, so I've made two professional (ahem...) visual representations of what I'm talking about, I've attached them to this post. I've also attached two screenshots from a scatter gather run where HaplotypeCaller had to wait for PrintReads to finish, illustrating the inefficiency that is caused by the necessity to gather the input files before starting the next tool. (yes I know it was actually waiting for PrintReadsOnUnmappedReads, but HaplotypeCaller in the best practices pipeline would look identical, except it would wait for GatherBamFiles to finish instead, so instead of rerunning the pipeline I'll just use this screenshot since the principle still applies)
    The second screenshot shows the efficiency of my optimized scatter gather implementation. In the screenshots, I ran the pipeline with four concurrent jobs, so when BaseRecalibrator had three jobs left, PrintReads could start its first job, instead of needing to wait for GatherBQSR to finish before it could start.

    I saved 2 hours and 30 minutes on an NA12878 benchmark doing it this way, and at the same time I lowered the load average thanks to using intervals lists and limit the number of concurrent jobs.

    So my question is if this might reduce the quality of the analysis in some unforeseen way? Saving time is one thing, but it makes no sense to do that if it the quality of the analysis is affected negatively.

  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin
    These are very clear, no "ahem" necessary ;)

    Although the efficiency increase is attractive, the reason we don't do it that way is that it changes the BQSR step in a way that is undesirable. Specifically, you learn the pattern of covariation separately per set of intervals, and apply a different set of recalibration rules to each as well. We want to integrate covariation information from all intervals and apply the same recalibration to all. Does that make sense?
  • oskarvoskarv BergenMember

    @Geraldine_VdAuwera said:
    These are very clear, no "ahem" necessary ;)

    Although the efficiency increase is attractive, the reason we don't do it that way is that it changes the BQSR step in a way that is undesirable. Specifically, you learn the pattern of covariation separately per set of intervals, and apply a different set of recalibration rules to each as well. We want to integrate covariation information from all intervals and apply the same recalibration to all. Does that make sense?

    Ok, two questions:
    1. Would I only need to revert the GatherBqsrFiles step or all of them?
    2. And is this the reason you use interval lists for HaplotypeCaller, but groups of contigs for BaseRecalibrator and PrintReads?

  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin
    1. I'm not sure what you mean by "revert the GatherBqsrFiles step", can you please clarify?

    2. Not really, in principle you could use the same intervals lists for both; the reason we use different ones is that for BQSR we don't need to subset below the level of the contig (at least for WGS we don't -- for exome we subset to lists of exome targets), while for HC we do (to avoid "impossible to call" regions that inflate runtime for nothing).

  • oskarvoskarv BergenMember

    @Geraldine_VdAuwera said:
    1. I'm not sure what you mean by "revert the GatherBqsrFiles step", can you please clarify?

    What I meant was to be more conservative and run BaseRecalibrator with scatter gather, then gather the recalibration reports and use the gathered recalibration reports as input for each shard in PrintReads, as opposed to not using the gathered recalibration reports, but instead use the recalibration report from shard n from BaseRecalibrator as input for shard n in PrintReads, etc, as I did in the optimized version.

    I ran a test to see the classification difference myself, and it's insignificant. The first scenario was the optimized version and the second, more conservative version, used the gathered recalibration reports as input for each shard in PrintReads.
    I used hap.py (https://github.com/Illumina/hap.py/) to test the VCF files, and if we look at the recall and precision calculations, the SNP file from the optimized version has 96.5% recall and 99.8% precision, while the conservative version has 96.3% recall and 99.8% precision. The INDEL file from the optimized version has 87.5% recall and 98.6% precision, while the conservative version has 87.7% recall and 98.5% precision. The TiTv ratio and the het_hom ratio practically didn't change.

    But what's perhaps more interesting is that the execution times didn't really differ, so I'll go conservative and use the gathered recalibration reports as input for PrintReads.

  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin
    Oh I see; that makes sense in general, but just realize that it's tricky to test implementation effects on BQSR because it usually makes little difference how you run it when your test data is healthy (i.e. mostly unbiased). It's only when something is wrong that you really see an effect.

    So I approve of the conservative choice :)
Sign In or Register to comment.