To celebrate the release of GATK 4.0, we are giving away free credits for running the GATK4 Best Practices pipelines in FireCloud, our secure online analysis portal. It’s first come first serve, so sign up now to claim your free credits worth $250. Sponsored by Google Cloud. Learn more at https://software.broadinstitute.org/firecloud/documentation/freecredits

cromwell spark backend

I got the following error when running command java -Dconfig.file=cromwell.config.spark.txt -jar cromwell-28.jar run hc.local.wdl hc.local.json

 [error] Failed to instantiate Cromwell System. Shutting down Cromwell.
[error] :
cromwell.backend.impl.spark.SparkBackendFactory.<init>(java.lang.String, cromwell.backend.BackendConfigurationDescriptor)

Here is the spark backend of my cromwell config file

backend {
  default = "Spark"
  providers {
    Spark {
     actor-factory = "cromwell.backend.impl.spark.SparkBackendFactory"
     config {
       # Root directory where Cromwell writes job results.  This directory must be
       # visible and writeable by the Cromwell process as well as the jobs that Cromwell
       # launches.
       root: "cromwell-executions"

       filesystems {
         local {
           localization: [
             "hard-link", "soft-link", "copy"
           ]
         }
        }
          # change (master, deployMode) to (yarn, client), (yarn, cluster)
          #  or (spark://hostname:port, cluster) for spark standalone cluster mode
         master: "spark://localhost:7077"
         deployMode: "cluster"
      }
    }
  }
}

can anyone help?
Thanks!

Answers

  • jgentryjgentry Member, Broadie, Dev

    Pinging @DilliWala_Cric - this looks like a continuation of the previous question

  • @blueskypy Can you share if possible your hc.local.wdl , hc.local.json and cromwell.config.spark.txt ?

  • @jgentry quick question, it has been a while since i touched SparkBackend. My assumption is with new version of Cromwell 0.29 or 0.30 backward compatibility should be there , as in i don't have to change anything in SparkBackend to adapt to new interface or design ? In theory it should work as it was tested last time ?

  • Thanks @DilliWala_Cric for the help! Please see attached files. The bam and ref files are from this tutorial. I changed it to HC Spark and attached the ref.2bit it needs.

  • I didn't include the below runtime block in the task definition because I don't know what's the appMainClass. Could that be the problem?

    runtime {
                    appMainClass: "${entry_point}"
                    executorMemory: "${memory}"
                    executorCores: "${cores}"
            }
    
  • In addition, can I add other options of spark-submit to the runtime block. If I run HC Spark using gatk-launch only, i.e. w/o WDL and cromwell

    gatk-4.beta.4/gatk-launch HaplotypeCallerSpark  -R f.fasta \
      -I in.bam -O s.raw.indels.snps.vcf \
      --  --sparkRunner SPARK \
      --sparkMaster spark://localhost:7077 \
      --deploy-mode cluster \
      --driver-cores 4 \
      --driver-memory 8g \
      --executor-memory 4g \
      --total-executor-cores 10 \
     --dryRun
    

    It actually run spark-submit with many options which may need to be included in the runtime block if running with cromwell.

    spark-submit --master spark://localhost:7077 --conf spark.driver.userClassPathFirst=true \
     --conf spark.io.compression.codec=lzf --conf spark.driver.maxResultSize=0 \
     --conf spark.executor.extraJavaOptions=-DGATK_STACKTRACE_ON_USER_EXCEPTION=true \
     -Dsamjdk.use_async_io_read_samtools=false -Dsamjdk.use_async_io_write_samtools=false \
     -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=1 -Dsnappy.disable=true  \
     --conf spark.driver.extraJavaOptions=-DGATK_STACKTRACE_ON_USER_EXCEPTION=true \
     -Dsamjdk.use_async_io_read_samtools=false -Dsamjdk.use_async_io_write_samtools=false \
     -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=1 -Dsnappy.disable=true  \
     --conf spark.kryoserializer.buffer.max=512m --conf spark.yarn.executor.memoryOverhead=600 \
     --deploy-mode cluster --driver-cores 4 --driver-memory 8g --executor-memory 4g --total-executor-cores 10 \
     gatk-4.beta.4/gatk-package-4.beta.4-spark.jar HaplotypeCallerSpark -R f.fasta -I in.bam \
     -O s.raw.indels.snps.vcf --sparkMaster spark://localhost:7077
    

    p.s.
    I'm using a Spark standalone cluster

  • @blueskypy thank you for the reply, I am trying to run SparkBackend with latest Cromwell for client mode with simple word count Spark application to verify if backend is still functional with latest Cromwell. But, i see similar issue. So let me first resolve that.

    Second, entry point is one of the supported runtime attributes, because in Spark-submit there is a --class switch to tell where to start from.

  • blueskypyblueskypy Member
    edited September 2017

    @DilliWala_Cric Thanks for the quick reply! Regarding the 2nd point, the above gatk-launch script actually run successfully in my Spark standalone cluster, but the spark-submit command produced from gatk-launch does not have the --class

  • @blueskypy ok, When you say working in your Spark standalone cluster, is that through Cromwell or without ? I think --class is not mandatory as long as application jar knows where to start executing.

  • Thanks @DilliWala_Cric ! That 'working' is without cromwell.

  • @DilliWala_Cric There are some errors in my wdl and json files in previous upload, although those are not the cause of the current issue. Attached are the corrected wdl and json files

  • @blueskypy , i tried to run a simple word count yesterday through Spark Backend in Cromwell, i had to do minor fix and it was working, I am trying to reach @jgentry for his suggestion how to reflect that change in master branch and if any change to README as well.

  • blueskypyblueskypy Member
    edited September 2017

    Thanks @DilliWala_Cric ! Looking forward to the update!

    Also can I add other options of spark-submit in the runtime block, in addition to the four listed in the README?

  • edited September 2017

    @jgentry with current cromwell version. If following wdl and input are used with Spark backend i see an error stating runtime key not found.

    task sparkjob_with_yarn_cluster {
    File input_jar
    String output_base
    String entry_point
    Int cores
    String memory
    
        command {
                ${input_jar} ${output_base}
        }
    
        runtime {
                appMainClass: "${entry_point}"
                executorMemory: "${memory}"
                executorCores: "${cores}"
        }
        output {
                File out = "${output_base}"
          }
    
    }
    
    workflow sparkBETest {
    call sparkjob_with_yarn_cluster
    }
    
    

    and Input

    {
    "sparkBETest.sparkjob_with_yarn_cluster.memory": "4G",
    "sparkBETest.sparkjob_with_yarn_cluster.output_base":"/home/himanshu/Documents/test/sparkOutput",
    "sparkBETest.sparkjob_with_yarn_cluster.entry_point": "SimpleApp",
    "sparkBETest.sparkjob_with_yarn_cluster.deployMode": "client",
    "sparkBETest.sparkjob_with_yarn_cluster.cores": "12",
    "sparkBETest.sparkjob_with_yarn_cluster.input_jar": "/home/himanshu/Documents/test/sparktest_2.11-1.0.jar"
    }
    
    
    Error: 
    
    Runtime attributes evaluation:
    key not found: entry_point
    key not found: memory
    key not found: cores
    akka.actor.ActorInitializationException: akka://cromwell-system/user/SingleWorkflowRunnerActor/WorkflowManagerActor/WorkflowActor-d7f7f13a-5690-4b69-8f14-3acd94f5ad82/WorkflowExecutionActor-d7f7f13a-5690-4b69-8f14-3acd94f5ad82/d7f7f13a-5690-4b69-8f14-3acd94f5ad82-EngineJobExecutionActor-sparkBETest.sparkjob_with_yarn_cluster:NA:1/d7f7f13a-5690-4b69-8f14-3acd94f5ad82-BackendJobExecutionActor-d7f7f13a:sparkBETest.sparkjob_with_yarn_cluster:-1:1: exception during creation
    
  • @blueskypy i have committed minor fixes to bring back Sparkbacken to live, it should be compiling and running Spark jobs as per current Readme, can you give it a shot and see how much progress you make with current state of the backend.

  • Thanks @DilliWala_Cric ! I cloned the github repository and rebuilt the cromwell jar. Here is my command:
    java -Dconfig.file=cromwell.config.spark.txt -jar cromwell-30-2ee98e9-SNAP.jar run hc.local.wdl -i hc.local.json

    The cromwell started successfully, but I still got error:

    [2017-09-13 22:28:03,89] [error] SparkClusterProcess reason: Unable to get json out of submission response file
    [2017-09-13 22:28:03,89] [warn] Couldn't find a suitable DSN, defaulting to a Noop one.
    [2017-09-13 22:28:03,90] [info] Using noop to send events.
    [2017-09-13 22:28:03,91] [error] SparkClusterProcess Spark Job failed to submit successfully following reason: Unable to get json out of submission response file

    Please see the attachment for the wdl, json, and cromwell.conf.spark.txt

  • Hi @blueskypy i am trying to understand this following command

    command 1:

    command {
    
     set -euxo pipefail
            ${gatkL} HaplotypeCallerSpark \
                -R ${Ref2bit} \
                -I ${inputBAM} \
                -O ${sampleName}.raw.indels.snps.vcf \
          -- --sparkRunner LOCAL
    
    }
    
    

    what should be your ultimate command ? like currently if you pass under command section as follows

    command 2:

      command {    <spark_application>  <arg1> .... <argN> } 
    
    

    so i am trying to break above command 1.

    command 2 would be ultimately interpolated to the following command

    spark-submit  --master local  --total-executor-cores 12  --deploy-mode client  --class xxx  --executor-memory 4  <spark_application> <arg1> ....  <argN> 
    
    
  • Thanks @DilliWala_Cric for your continuous effort working on this issue! Really appreciate! The command block in my wdl file produces the following command:
    java -Dsamjdk.use_async_io_read_samtools=false -Dsamjdk.use_async_io_write_samtools=true -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=1 -Dsnappy.disable=true -jar /path/to/gatk-4.beta.3-SNAPSHOT/gatk-package-4.beta.3-SNAPSHOT-local.jar HaplotypeCallerSpark -R ${Ref2bit} -I ${inputBAM} -O ${sampleName}.raw.indels.snps.vcf

  • @DilliWala_Cric , after answering your question. I changed the wdl and json file a bit so that the command block produces the following:
    /path/to/gatk-4.beta.3-SNAPSHOT/gatk-package-4.beta.3-SNAPSHOT-local.jar HaplotypeCallerSpark -R ${Ref2bit} -I ${inputBAM} -O ${sampleName}.raw.indels.snps.vcf

    But still I got exactly the same error. Here are the new wdl file:

    workflow hcSpark {
            call haplotypeCallerSpark
    }
    
    task haplotypeCallerSpark {
            File GATK_jar
            File Ref2bit
            String sampleName
            File inputBAM
            File bamIndex
            command {
                    ${GATK_jar} HaplotypeCallerSpark \
                            -R ${Ref2bit} \
                            -I ${inputBAM} \
                            -O ${sampleName}.raw.indels.snps.vcf
            }
      runtime {
                    appMainClass: "org.broadinstitute.hellbender.Main"
                    executorMemory: "4G"
                    executorCores: "2"
            }
            output {
                    File rawVCF = "${sampleName}.raw.indels.snps.vcf"
            }
    }
    

    The new json file

    {
      "hcSpark.haplotypeCallerSpark.Ref2bit": "ref/ref.2bit",
      "hcSpark.haplotypeCallerSpark.bamIndex": "bams/mother.bai",
      "hcSpark.haplotypeCallerSpark.GATK_jar": "gatk-4.beta.3-SNAPSHOT/gatk-package-4.beta.3-SNAPSHOT-lo
    cal.jar",
      "hcSpark.haplotypeCallerSpark.inputBAM": "bams/mother.bam",
      "hcSpark.haplotypeCallerSpark.sampleName": "NA12878"
    }
    
  • edited September 2017

    Hi @blueskypy can you check the script file under the execution folder ? if that file exist then try to run that command through console. Secondly, do you think in theory following command should work

    spark-submit --master spark:// <ip>: 7077/  ...  --class ....   application.jar args ... 
    
    

    i think you are submitting in cluster mode right ? and making changes to configuration as per : (Configuration)[https://github.com/broadinstitute/cromwell/blob/develop/README.md#configuring-spark-master-and-deploy-mode]

  • blueskypyblueskypy Member
    edited September 2017

    Thanks @DilliWala_Cric ! In the script file under the execution folder, the command contains
    --executor-memory 4 instead of --executor-memory 4g. That error seems preventing the executor from starting. So can you fix this bug?

    Also there are many spark-submit options used in GATK functions with spark version. I wonder if the runtime block can be changed to something like this

    runtime {
      --class org.broadinstitute.hellbender.Main 
      --driver-cores 4 
      --driver-memory 8g 
      --total-executor-cores 10  
      --executor-memory 4g 
      --conf spark.io.compression.codec=lzf
      ...
            }
    

    So users can put any options into it and those strings are added to spark-submit command w/o any change.

    Also, seems the current spark backend is treating --class as mandatory, could you remove that requirement since it's not mandatory for GATK jars?

    last question: does the spark backend also check the status of the submitted spark jobs, so that cromwell knows when those jobs are done and move to the next task?

    Thanks!

  • @blueskypy Thank you for trying that and identifying more requirements as well as an issue. Can you please create an issue in Cromwell github or what is the way to go @jgentry ? I would fix that unit part for executor memory as one task , whereas adding more runtime attributes and making --class as optional in another task. So, can you do me a favor and request them as two separate entities ?

    Yes Spark backend do support checking of the status for a job submitted in client or cluster mode.

  • blueskypyblueskypy Member
    edited September 2017

    Thanks @DilliWala_Cric ! I've submitted the --class and attribute issues as two separate tickets in cromwell github.

  • @blueskypy thank you for creating those issues, also are you sure that you don't see in script executor memory as 2g ? because we pass whatever you specify in runtime attributes as it is.

  • blueskypyblueskypy Member
    edited September 2017

    @DilliWala_Cric It's "4G" in the wdl script:

    runtime {
        appMainClass: "org.broadinstitute.hellbender.Main"
        executorMemory: "4G"
        executorCores: "2"
    }
    

    But in the script file under the execution folder, the command reads --executor-memory 4

  • @DilliWala_Cric , I wonder if you can help me with a question?

    I suppose the cromwell spark backend would use spark-submit to submit the command block of those tasks. Can the command block be any program? for example, a bash shell script, or a regular R script not using Spark API?

  • @blueskypy thank you for the clarification on memory unit not been passed. To answer your question on command block, yes you can pass anything like script (bash or R) but ultimately command will be as you saw in script file spark-submit .... <command block from wdl>.... is that what you were asking ?

  • @blueskypy Please check Pull request to address your request, let us know if that looks ok to you?

  • @blueskypy Just wondering did you get a chance to try out your requested changes in Spark Backend yet ?

Sign In or Register to comment.