Attention:
The frontline support team will be unavailable to answer questions until May27th 2019. We will be back soon after. Thank you for your patience and we apologize for any inconvenience!

How to run the entire pipeline (using even Spark tools) from Java?

VzzarrVzzarr Member
edited October 2017 in Ask the GATK team

I am trying to write a Java pipeline which follows the GATK Best Practices, in particular, using more than one input sample.
As first steps, I am trying to use FastqToSam (even if not mandatory for the Best Practices, but required in case of using fastq samples), BwaAndMarkDuplicatesPipelineSpark and BQSRPipelineSpark.

For example with FastqToSam I am using this simple approach, in which I manage to "sparkify" the command with more samples and obtaining even some speedup:

JavaRDD<String> rdd_fastq_r1_r2 = sc.parallelize(fastq_r1_r2);

createBashScript(gatkCommand);

JavaRDD<String> bashExec = rdd_fastq_r1_r2.pipe("/path/script.sh");

where fastq_r1_r2 is a list of String representing the paths of samples to use.
In few words, I execute a bash command for each couple of Paired End Reads file (in particular the bash command as explained here) inside the pipe method provided by Spark

java -Xmx8G -jar picard.jar FastqToSam [...]

But this approach would not work with Spark GATK tools, like BwaAndMarkDuplicatesPipelineSpark and BQSRPipelineSpark.

So, is there any other way to execute these Spark tools in Java code? For example 4.5 years ago in this post they suggested to use org.broadinstitute.sting.gatk.CommandLineGATK, but now this class is not available anymore.
And moreover, is available any kind of Java API (and in case any tutorial), in order to use your methods (I could say in a similar way of Spark API) without using bash commands?

Thanks for your time and I hope to be clear in explaining my questions,
Nicholas

Post edited by Vzzarr on

Best Answer

Answers

  • EADGEADG KielMember ✭✭✭

    Hi @Vzzarr,

    a short question why reinvent the wheel? Can you use wdl+cromewell and even the old queue to archive the things you plan to implement?

    Greets EADG

  • SheilaSheila Broad InstituteMember, Broadie, Moderator admin

    @Vzzarr
    Hi Nicholas,

    We are planning to release public WDLs that will help you run the Best Practices in GATK4. They are not fully available yet, but they are coming :smiley:

    -Sheila

  • Thank you both for suggesting me WDL, but I explain better the reason why I'd like to use java:
    for example, given the two available Spark Pipeline BwaAndMarkDuplicatesPipelineSpark and BQSRPipelineSpark, I would like to create a pipeline following this approach, that reduces the most possible writings on disk (if possible it would be interesting for example execute the pipeline from BWA to HaplotypeCaller writing on disk only the final result, avoiding intermediate writings).

    This is what I think happens inside BwaAndMarkDuplicatesPipelineSpark or BQSRPipelineSpark, but as a first step I was trying to implement a Pipeline similar to these two, in particular combining these two pipelines, in a trivial way here resumed:
    https://pastebin.com/XEqvpKmG

    try (final BwaSparkEngine engine = new BwaSparkEngine(ctx, referenceArguments.getReferenceFileName().replaceAll("2bit", "fasta"), indexImageFile, getHeaderForReads(), getReferenceSequenceDictionary())) {
                final JavaRDD<GATKRead> alignedReads = engine.alignPaired(getReads());
                final JavaRDD<GATKRead> markedReadsWithOD = MarkDuplicatesSpark.mark(alignedReads, engine.getHeader(), duplicatesScoringStrategy, new OpticalDuplicateFinder(), getRecommendedNumReducers());
                final JavaRDD<GATKRead> markedReads = MarkDuplicatesSpark.cleanupTemporaryAttributes(markedReadsWithOD);
    
    
    
                if (joinStrategy == JoinStrategy.BROADCAST && ! getReference().isCompatibleWithSparkBroadcast())
                    throw new UserException.Require2BitReferenceForBroadcast();
    
                // The initial reads have already had the WellformedReadFilter applied to them, which
                // is all the filtering that ApplyBQSR wants. BQSR itself wants additional filtering
                // performed, so we do that here.
                //NOTE: this filter doesn't honor enabled/disabled commandline filters
                final ReadFilter bqsrReadFilter = ReadFilter.fromList(BaseRecalibrator.getBQSRSpecificReadFilterList(), getHeaderForReads());
                final JavaRDD<GATKRead> filteredReadsForBQSR = markedReads.filter(read -> bqsrReadFilter.test(read));
    
                final VariantsSparkSource variantsSparkSource = new VariantsSparkSource(ctx);
                final JavaRDD<GATKVariant> bqsrKnownVariants = variantsSparkSource.getParallelVariants(baseRecalibrationKnownVariantPaths, getIntervals());
    
                final JavaPairRDD<GATKRead, ReadContextData> rddReadContext = AddContextDataToReadSpark.add(ctx, filteredReadsForBQSR, getReference(), bqsrKnownVariants, baseRecalibrationKnownVariantPaths, joinStrategy, getHeaderForReads().getSequenceDictionary(), readShardSize, readShardPadding);
                //note: we use the reference dictionary from the reads themselves.
                final RecalibrationReport bqsrReport = BaseRecalibratorSparkFn.apply(rddReadContext, getHeaderForReads(), getHeaderForReads().getSequenceDictionary(), bqsrArgs);
    
                final Broadcast<RecalibrationReport> reportBroadcast = ctx.broadcast(bqsrReport);
                final JavaRDD<GATKRead> finalReads = ApplyBQSRSparkFn.apply(markedReads, reportBroadcast, getHeaderForReads(), applyBqsrArgs.toApplyBQSRArgumentCollection(bqsrArgs.PRESERVE_QSCORES_LESS_THAN));
    
                writeReads(ctx, output, finalReads);
    

    In few words, I just used the JavaRDD of both the pipelines and combined them in a single class, in order to obtain a single Pipeline that executes the steps since BWA to BQSR, with only a final writing.
    I tried to use this class and seemed working until BWA and Mark Duplicates (regular outputs were printed on terminal); but when the BQSR process started, in particular while loading the KnownSitesCache, I obtained a "Garbage Collector overhead limit exceeded" error.
    So my question is: why a simple class pipeline like this doesn't exist yet? Is it because, as I experimented, avoiding intermediate writings requires much more memory resources?

    I'm curious to know your answer in order to better understand the quantity of resources used during the different processes and to understand how to reach my goal,
    Nicholas

    P.S. I used a single VM with 28GB of memory, but I will try with a larger VM in order to see if the execution ends

  • SheilaSheila Broad InstituteMember, Broadie, Moderator admin

    @Vzzarr
    Hi,

    I will check with the team and get back to you.

    -Sheila

  • SkyWarriorSkyWarrior TurkeyMember ✭✭✭

    With this kind of code your heap size may never be enough. You need to find a way to stream out the reads instead of collecting them in object memory.

    Try using system.out for printing reads as a way of streaming your pipeline. Or use a StreamWriter object to flush the reads out of memory to non-volatile storage to reduce your heapsize needs.

  • VzzarrVzzarr Member
    edited November 2017

    Hi @Sheila,
    I tried as you suggested to improve --executor-memory, so now I am working with a 8 cores and 55GB RAM VM and set Spark Parameters in the spark-defaults.conf file in this way:

    spark.driver.maxResultSize      55g
    spark.driver.memory             20g
    spark.executor.memory           8g
    spark.executor.cores            2
    spark.executor.instances        4
    

    managing to overcome the GC issue. I executed the Java code of the previous comment (just like an other GATK Spark Tool) and I obtained this stacktrace with this particular issue:

    17/11/03 13:02:14 ERROR Utils: Aborting task
    java.lang.IllegalArgumentException: Reference index for 'chr11' not found in sequence dictionary.
        at htsjdk.samtools.SAMRecord.resolveIndexFromName(SAMRecord.java:536)
        at htsjdk.samtools.SAMRecord.setHeaderStrict(SAMRecord.java:1650)
        at org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSink.lambda$pairReadsWithSAMRecordWritables$19b8fd39$1(ReadsSparkSink.java:245)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1031)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1031)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1117)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    and seems that the exception is raised here in the method resolveIndexFromName().
    I tried to understand the problem, but I'm not so much familiar with genomics or SAMtools, I even tried to google the error, but it is not quite documented.

    So do you know which could be the reason of this exception? Maybe is required any synchronization point before of the writing in order to execute BQSR? Because I was noticing that the Exception was raised at writeReads(ctx, output, finalReads);, the last line of the Java code of my previous comment.

    (I previously executed sequentially BwaAndMarkDuplicatesPipelineSpark, BQSRPipelineSpark and HaplotypeCallerSpark without any problem, so I think that there is some conceptual problem in what I am doing)

    Post edited by Vzzarr on

    Issue · Github
    by Sheila

    Issue Number
    2650
    State
    closed
    Last Updated
    Assignee
    Array
    Milestone
    Array
    Closed By
    vdauwera
  • SheilaSheila Broad InstituteMember, Broadie, Moderator admin

    @Vzzarr
    Hi,

    I will have to check with the team and get back to you.

    -Sheila

  • shleeshlee CambridgeMember, Broadie ✭✭✭✭✭

    Hi @Vzzarr,

    Sheila is traveling for a workshop and so I will followup. In the meanwhile, we do have a tutorial on running a GATK Spark tool that I wrote back in July:
    https://gatkforums.broadinstitute.org/gatk/discussion/10060/how-to-run-flagstatspark-on-a-cloud-spark-cluster. In this tutorial, I give a helpful link to a Cloudera blogpost on setting executor options.

    One thing to remember is that using the GATK4 launch script, currently callable withgatk-launch, sets parameters optimally, I believe even for Spark runs.

    Our developers are getting ready for the big release of GATK4 in January. I'm certain they are keen on your question but it may be a while before we have some feedback.

  • VzzarrVzzarr Member
    edited November 2017

    Hi @shlee,
    Thanks for your tutorial it is interesting.
    But about my previous issue I think that the problem is that I haven't the file index which is normally generated by BWA (in my approach is all calculated in memory and so I haven't this index); I think that probably is the .bam.splitting-bai file.
    I think that resolving this problem from my point of view it's quite difficult, because I think that this index should be saved in memory (or even in the file system, it is a little file) and then used by following steps which require this index (for me is hard to understand where is used this index).

    So I am patiently waiting for developers' answer in order to resolve this issue (if I can anyway help, just tell me),
    Nicholas

  • shleeshlee CambridgeMember, Broadie ✭✭✭✭✭

    Hi Nicholas (@Vzzarr),

    I'm not a developer. However, from what I'm reading, I think the developers would need more clarification on what exactly you need our help with. Can you list your questions carefully please?

    This file index you speak of, which file do you refer to? Are you having troubling indexing a BAM file or generating the per-reference alignment index files? Remember that BAMs need to be coordinate-sorted before you can index them and the output of BWA alignment is not coordinate-sorted. Rather, the output is query-name-grouped. You will have to sort the BAM (Picard SortSam or Samtools) before you can index it.

    The GATK is able to partition BAM data to Spark Clusters, the RDDs. I assume here the index, if present, and which our tools look for within the same directory as the BAM, is also partitioned. Are you saying the index is generated on-the-fly, after partitioning? You can check out our open-source codebase at https://github.com/broadinstitute/gatk to see how our code handles such situations.

  • Hi @shlee ,
    I am really sorry for the delay but I was busy in the last weeks.

    Anyway I will try to be clearer with this picture:

    as you can see I would like to combine the tools BwaAndMarkDuplicatesPipelineSpark and BQSRPipelineSpark in one single tool, in order to improve efficiency of the pipeline (avoiding for example a disk writing).
    I tried to do it with this naive approach as I reported in previous comments, but executing this code I obtain this error (as you can see at the end of this stack-trace ) :

    17/11/03 13:02:14 ERROR Utils: Aborting task
    java.lang.IllegalArgumentException: Reference index for 'chr11' not found in sequence dictionary.
    

    Do you think is better if I speak directly with developers in the GitHub repository?

    Best regards,
    Nicholas

    Issue · Github
    by shlee

    Issue Number
    3878
    State
    open
    Last Updated
    Assignee
    Array
  • shleeshlee CambridgeMember, Broadie ✭✭✭✭✭

    Hi @Vzzarr,

    Remember all Spark tools and pipelines are in BETA.

    If you want to minimize I/O, then perhaps consider combining BwaSpark and ReadsPipelineSpark. The latter Spark pipeline takes aligned reads and runs MarkDuplicates, BQSR and HaplotypeCaller. If the listed tools are the only tools the latter pipeline runs, then the pipeline must depend on a coordinate sorted input.

    As I said before, BWA gives a queryname-grouped output. So in between these two, if our assumptions are correct, then you will have to do a sort, e.g. with SortReadFileSpark.

    [1] BwaSpark --> SortReadFileSpark --> ReadsPipelineSpark

    Another option would be:

    [2] BwaAndMarkDuplicatesPipelineSpark --> SortReadFileSpark --> BQSRPipelineSpark --> HaplotypeCallerSpark.

    The first pipeline [1] does not take advantage of new features in MarkDuplicates that marks also supplementary and secondary alignments as duplicate in a set.

    The second pipeline [2] appears to apply MarkDuplicates on the queryname-grouped reads. If we assume the pipelines only runs the tools named and no other processes (I don't know the details of these pipelines) and that you wish to recapitulate our reference implementation, then [2] would require additional processing. Namely, the SortAndFixTags step's fixing tags part appears to be missing from [2]. The tool to use towards this is SetNmMdAndUqTags as the described SetNmAndUqTags has since been deprecated.

    I'll see what the developers say.

  • Geraldine_VdAuweraGeraldine_VdAuwera Cambridge, MAMember, Administrator, Broadie admin

    Hi @Vzzarr, as you correctly identified our support team is not really set up for handling developer support; for something like this you would indeed be better off creating an issue ticket in the github repo. The development team may not have the bandwidth to respond to you in detail but they may be able to point you to helpful resources. Good luck!

Sign In or Register to comment.