Multithreading Queue with SGE

prichmondprichmond University of British ColumbiaMember

Hello,
I'm trying to run GATK-Queue in my SGE environment to parallelize my pipeline, but I'm having trouble making use of multiple CPUs (-nct). I can run the test scala qscripts (ExampleCountReads.scala, Hello_World.scala), but those don't call upon multithreading.

I am able to process using the scatter approach, but unable to parallelize these scatters.

Instead of trying to write my own scala from scratch, I modified one that I found here
https://github.com/CuppenResearch/GATK-QScripts/blob/master/HaplotypeCaller.scala

When I run with the scatter, I'm using this command:
java -Djava.io.tmpdir=tmp -jar /opt/tools/Queue/Queue.jar -S HaplotypeCaller.scala -R hg19.fa -I BWAmem_dupremoved_realigned.sorted.bam -nct 1 -nsc 5 -mem 4 -stand_emit_conf 10 -stand_call_conf 30 -O BWAmem_queue_haplo.vcf -jobRunner GridEngine -run -l DEBUG

Looking into the script, the -nct is set as:

@Argument(doc="Number of cpu threads per data thread", shortName="nct", required=true)
var numCPUThreads: Int = _

haplotypeCaller.num_cpu_threads_per_data_thread = numCPUThreads

The -nsc is set as:
@Argument(doc="Number of scatters", shortName="nsc", required=true)
var numScatters: Int = _
haplotypeCaller.scatterCount = numScatters

This command runs in my SGE environment just fine.

However, when I try to run with a modified -nct > 1:

java -Djava.io.tmpdir=tmp -jar /opt/tools/Queue/Queue.jar -S HaplotypeCaller.scala -R hg19.fa -I BWAmem_dupremoved_realigned.sorted.bam -nct 6 -nsc 1 -mem 24 -stand_emit_conf 10 -stand_call_conf 30 -O BWAmem_queue_haplo.vcf -jobRunner GridEngine -run -l DEBUG

INFO 14:46:00,276 QScriptManager - Compiling 1 QScript
DEBUG 14:46:00,277 QScriptManager - Compilation directory: /opt/tools/Queue-3.5-0-g36282e4/resources/tmp/Q-Classes-304214474060816321
INFO 14:46:02,609 QScriptManager - Compilation complete
INFO 14:46:02,739 HelpFormatter - ----------------------------------------------------------------------
INFO 14:46:02,739 HelpFormatter - Queue v3.5-0-g36282e4, Compiled 2015/11/25 04:03:40
INFO 14:46:02,739 HelpFormatter - Copyright (c) 2012 The Broad Institute
INFO 14:46:02,739 HelpFormatter - For support and documentation go to http://www.broadinstitute.org/gatk
DEBUG 14:46:02,739 HelpFormatter - Current directory: /opt/tools/Queue-3.5-0-g36282e4/resources
INFO 14:46:02,740 HelpFormatter - Program Args: -S HaplotypeCaller.scala -R /mnt/data/GENOMES/hg19/FASTA/hg19.fa -I /mnt/data/WholeGenomes/Family14/TX14-2_BWAmem_dupremoved_realigned.sorted.bam -nct 6 -nsc 1 -mem 24 -stand_emit_conf 10 -stand_call_conf 30 -O /mnt/data/WholeGenomes/Family14/TX14-2_BWAmem_queue_haplo.vcf -jobRunner GridEngine -run -l DEBUG
INFO 14:46:02,740 HelpFormatter - Executing as [email protected] on Linux 2.6.32-573.12.1.el6.x86_64 amd64; Java HotSpot(TM) 64-Bit Server VM 1.7.0_79-b15.
INFO 14:46:02,740 HelpFormatter - Date/Time: 2016/06/07 14:46:02
INFO 14:46:02,740 HelpFormatter - ----------------------------------------------------------------------
INFO 14:46:02,740 HelpFormatter - ----------------------------------------------------------------------
INFO 14:46:02,746 QCommandLine - Scripting VariantCaller
DEBUG 14:46:02,829 QGraph - adding QNode: 0
INFO 14:46:02,844 QCommandLine - Added 1 functions
INFO 14:46:02,845 QGraph - Generating graph.
INFO 14:46:02,875 QGraph - Running jobs.
DEBUG 14:46:03,042 FunctionEdge - Starting: /opt/tools/Queue-3.5-0-g36282e4/resources > 'java' '-Xmx24576m' '-XX:+UseParallelOldGC' '-XX:ParallelGCThreads=4' '-XX:GCTimeLimit=50' '-XX:GCHeapFreeLimit=10' '-Djava.io.tmpdir=/opt/tools/Queue-3.5-0-g36282e4/resources/tmp' '-cp' '/opt/tools/Queue/Queue.jar' 'org.broadinstitute.gatk.engine.CommandLineGATK' '-T' 'HaplotypeCaller' '-I' '/mnt/data/WholeGenomes/Family14/TX14-2_BWAmem_dupremoved_realigned.sorted.bam' '-R' '/mnt/data/GENOMES/hg19/FASTA/hg19.fa' '-nct' '6' '-o' '/mnt/data/WholeGenomes/Family14/TX14-2_BWAmem_queue_haplo.vcf.raw_variants.vcf' '-stand_call_conf' '30.0' '-stand_emit_conf' '10.0'
INFO 14:46:03,043 FunctionEdge - Output written to /mnt/data/WholeGenomes/Family14/TX14-2_BWAmem_queue_haplo.vcf.raw_variants.vcf.out
INFO 14:46:03,075 GridEngineJobRunner - Native spec is: -V -l h_rss=29492M -pe smp_pe 6
ERROR 14:46:03,085 Retry - Caught error during attempt 1 of 4.
org.broadinstitute.gatk.queue.QException: Unable to submit job: job rejected: the requested parallel environment "smp_pe" does not exist
at org.broadinstitute.gatk.queue.engine.drmaa.DrmaaJobRunner$$anonfun$start$1.apply$mcV$sp(DrmaaJobRunner.scala:89)
at org.broadinstitute.gatk.queue.engine.drmaa.DrmaaJobRunner$$anonfun$start$1.apply(DrmaaJobRunner.scala:85)
at org.broadinstitute.gatk.queue.engine.drmaa.DrmaaJobRunner$$anonfun$start$1.apply(DrmaaJobRunner.scala:85)
at org.broadinstitute.gatk.queue.util.Retry$.attempt(Retry.scala:49)
at org.broadinstitute.gatk.queue.engine.drmaa.DrmaaJobRunner.start(DrmaaJobRunner.scala:85)
at org.broadinstitute.gatk.queue.engine.FunctionEdge.start(FunctionEdge.scala:84)
at org.broadinstitute.gatk.queue.engine.QGraph.runJobs(QGraph.scala:453)
at org.broadinstitute.gatk.queue.engine.QGraph.run(QGraph.scala:156)
at org.broadinstitute.gatk.queue.QCommandLine.execute(QCommandLine.scala:170)
at org.broadinstitute.gatk.utils.commandline.CommandLineProgram.start(CommandLineProgram.java:248)
at org.broadinstitute.gatk.utils.commandline.CommandLineProgram.start(CommandLineProgram.java:155)
at org.broadinstitute.gatk.queue.QCommandLine$.main(QCommandLine.scala:61)
at org.broadinstitute.gatk.queue.QCommandLine.main(QCommandLine.scala)

I believe the error lies somewhere in linking the smp_pe to the numCPUThreads.

I have attached my scala script (had to rename to a .txt in order to attach).

I'm happy to try out a different HaplotypeCaller scala script as well.

Issue · Github
by Sheila

Issue Number
975
State
closed
Last Updated
Assignee
Array
Milestone
Array
Closed By
vdauwera

Answers

  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin

    Hi @prichmond

    I'm afraid that's not something we can help you with. We don't work with GridEngine ourselves, so we don't have a way to test job native arguments. @Johan_Dahlberg is one of the forum's resident experts on the drmaa front, perhaps he will chime in.

  • Johan_DahlbergJohan_Dahlberg Member ✭✭✭

    Hi!

    Sorry for the late answer. I have no experience of running Queue with GridEngine, so I can't really comment that much on that part. Looking at the stacktrace I agree with you that the problem lies somewhere in the native spec being wrong: -V -l h_rss=29492M -pe smp_pe 6

    Is smp_pe a valid parallel environment on your cluster? Perhaps you could use the --job_environment_name parameter to modify it to a environment that is valid for your circumstances?

    I can't seem to see the script attachment, so it's a bit difficult to say if there are any problems there.

    Hope this helps at least a little bit in tracking down the problem.

  • prichmondprichmond University of British ColumbiaMember

    So when you say a job environment name, how would I include that into the scala script? Just as another argument?

    This is the script:

    package org.broadinstitute.gatk.queue.qscripts

    import org.broadinstitute.gatk.queue.QSettings
    import org.broadinstitute.gatk.queue.QScript
    import org.broadinstitute.gatk.queue.extensions.gatk._

    class VariantCaller extends QScript {
    // Create an alias 'qscript' to be able to access variables in the VariantCaller.
    // 'qscript' is now the same as 'VariantCaller.this'
    qscript =>

    // Required arguments. All initialized to empty values.
    @Input(doc="The reference file for the bam files.", shortName="R", required=true)
    var referenceFile: File = _
    
    @Input(doc="One or more bam files.", shortName="I")
    var bamFiles: List[File] = Nil
    
    @Input(doc="Output core filename.", shortName="O", required=true)
    var out: File = _
    
    @Argument(doc="Maxmem.", shortName="mem", required=true)
    var maxMem: Int = _
    
    @Argument(doc="Number of cpu threads per data thread", shortName="nct", required=true)
    var numCPUThreads: Int = _
    
    @Argument(doc="Number of scatters", shortName="nsc", required=true)
    var numScatters: Int = _
    
    @Argument(doc="Minimum phred-scaled confidence to call variants", shortName="stand_call_conf", required=true)
    var standCallConf: Int = _ //30 //default: best-practices value
    
    @Argument(doc="Minimum phred-scaled confidence to emit variants", shortName="stand_emit_conf", required=true)
    var standEmitConf: Int = _ //10 //default: best-practices value
    
    // The following arguments are all optional.
    @Input(doc="An optional file with known SNP sites.", shortName="D", required=false)
    var dbsnpFile: File = _
    
    @Input(doc="An optional file with targets intervals.", shortName="L", required=false)
    var targetFile: File = _
    
    @Argument(doc="Amount of padding (in bp) to add to each interval", shortName="ip", required=false)
    var intervalPadding: Int = 0
    
    def script() {
    val haplotypeCaller = new HaplotypeCaller
    
    // All required input
    haplotypeCaller.input_file = bamFiles
    haplotypeCaller.reference_sequence = referenceFile
    haplotypeCaller.out = qscript.out + ".raw_variants.vcf"
    
    haplotypeCaller.scatterCount = numScatters
    haplotypeCaller.memoryLimit = maxMem
    haplotypeCaller.num_cpu_threads_per_data_thread = numCPUThreads
    
    haplotypeCaller.stand_emit_conf = standEmitConf
    haplotypeCaller.stand_call_conf = standCallConf
    
    // Optional input
    if (dbsnpFile != null) {
        haplotypeCaller.D = dbsnpFile
    }
    if (targetFile != null) {
        haplotypeCaller.L :+= targetFile
        haplotypeCaller.ip = intervalPadding
    }
    
    //add function to queue
    add(haplotypeCaller)
    }
    

    }

    So I would add another line:
    @Input(doc="Job Environment Name Change", shortName = "jenc", required=true)
    var ???????

    What would I put for the var then? And do I need to call it when I define the script in the bottom portion?

    Thanks for your help! @Johan_Dahlberg

  • prichmondprichmond University of British ColumbiaMember

    Additionally, if I'm processing my whole pipeline using the SGE scheduler, then do I just put the call to the scala script within my job script that I submit to the cluster? I'm a bit confused as to how I'm supposed to use Queue within the larger framework of the analysis. I think some better example queue scripts, perhaps with a full pipeline, would really help me out.

    Thanks,
    Phil.

    @Johan_Dahlberg

  • Johan_DahlbergJohan_Dahlberg Member ✭✭✭

    Hi Phil!

    Both -jobEnv and -jobParaEnv a built in options of Queue so you should be able to use them without adding anything to your script. Have a look below:

    $ java -jar gatk-protected/target/Queue.jar --help
    ----------------------------------------------------------------------
    Queue v3.3-0-geee94ec, Compiled 2016/03/30 11:32:01
    Copyright (c) 2012 The Broad Institute
    For support and documentation go to http://www.broadinstitute.org/gatk
    ----------------------------------------------------------------------
    ----------------------------------------------------------------------
    usage: java -jar gatk-queue-package-distribution-3.3.jar -S <script> [-args <arg_file>] [-run] [-jobRunner <job_runner>] 
           [-bsub] [-qsub] [-status] [-retry <retry_failed>] [-startFromScratch] [-keepIntermediates] [-statusTo <status_email_to>] 
           [-statusFrom <status_email_from>] [-gv <graphviz>] [-gvsg <graphviz_scatter_gather>] [-jobReport <jobReport>] 
           [-disabpleJobReport] [-emailHost <emailSmtpHost>] [-emailPort <emailSmtpPort>] [-emailTLS] [-emailSSL] [-emailUser 
           <emailUsername>] [-emailPassFile <emailPasswordFile>] [-emailPass <emailPassword>] [-runName <run_name>] [-jobProject 
           <job_project>] [-jobQueue <job_queue>] [-jobPriority <job_priority>] [-jobNative <job_native_arg>] [-jobResReq 
           <job_resource_request>] [-jobEnv <job_environment_name>] [-memLimit <memory_limit>] [-memLimitThresh 
           <memory_limit_threshold>] [-resMemLimit <resident_memory_limit>] [-resMemReq <resident_memory_request>] [-resMemReqParam 
           <resident_memory_request_parameter>] [-wallTime <job_walltime>] [-jobParaEnv <job_parallel_env>] [-multiCoreJerk] 
           [-noGCOpt] [-runDir <run_directory>] [-tempDir <temp_directory>] [-jobSGDir <job_scatter_gather_directory>] [-logDir 
           <log_directory>] [-l <logging_level>] [-log <log_to_file>] [-h] [-version]
    
    [...]
     -jobEnv,--job_environment_name <job_environment_name>                                    Environment names for the job 
                                                                                              runner.
    [...]                                                                                          walltime or LSF run limit.
     -jobParaEnv,--job_parallel_env <job_parallel_env>                                        An SGE style parallel 
                                                                                              environment to use for jobs 
                                                                                              requesting more than 1 core. 
                                                                                               Equivalent to submitting jobs 
                                                                                              with -pe ARG nt for jobs with 
                                                                                              nt > 1
    

    When it comes using Queue in a wider context, you can have a look at Piper, the system that we use to analyze whole human genomes here at the Swedish National Genomics Infrastructure. It's built to work with our systems here, so it's probably not that portable, but it might still provide something that you can take some inspiration from: https://github.com/johandahlberg/piper and to see one of the actual Queue-scripts: https://github.com/johandahlberg/piper/blob/master/piper/src/main/scala/molmed/qscripts/DNABestPracticeVariantCalling.scala

Sign In or Register to comment.