Nothing Special   »   [go: up one dir, main page]

App-executing steps

CWL apps on the Seven Bridges Platform can be executed directly with Hephaestus' built-in generic steps for task management (e.g. CreateAndRunTask). Alternatively, you can wrap each app as separate automation step and call Hephaestus from inside it. This requires a bit of extra code that needs to be written and maintained for your automation, but has several advantages:

  • The automation code using these app wrapper steps becomes easier to read and write.
  • Individual task outputs can be passed to downstream steps without causing an execution block.
  • App inputs and outputs can be extended or transformed before and after task execution, respectively.
  • App wrapper steps can be maintained in a central module or package and re-used across multiple automations.
  • Enables code auto-completion in your favorite IDE for app names, app inputs, and app outputs.
  • Wrapper steps protect your automation business logic from changes made in underlying CWL apps.

Below is an example of a step that wraps the FastQC CWL app. It starts a FastQC task, waits until the task is finished, and returns task outputs on separate step outputs. Also note how we name the task after the step so that we can later distinguish tasks on the platform that were run for the same app.

from datetime import datetime
 
class FastQC(Step):
    fastq_file = Input(File)
    app = Input(App)
    project = Input(Project)
 
    report_zip = Output(File)
    report_html = Output(File)
 
    def execute(self):
        task = FindOrCreateAndRunTask(
            new_name=self.name_ + " - " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            inputs={
                "input_fastq": [self.fastq_file]
            },
            app=self.app,
            in_project=self.project,
        ).finished_task
 
        self.report_zip = task.outputs["report_zip"][0]
        self.report_html = task.outputs["report_html"][0]

This step definition can go inside its own module (e.g. named apps.py), from where it can be imported and used in the following way:

# ... other imports from previous example ...
from apps import FastQC
 
class Main(Step):
    project_name = Input(str)
 
    def execute(self):
 
        # get project, fastq file, and app (not shown)
        ...
 
        fastqc = FastQC(
            fastq_file=fastq_file,
            app=fastqc_app,
            project=my_project
        )
 
        # pass output into downstream step without execution block
        # ExportFile(file=fastqc.report_html, ...)
 
if __name__ == "__main__":
    Automation(Main).run()

Because FastQC is now its own step and the task output dictionary is now evaluated inside the execution function of this step, accessing report_html within the task output dictionary does no longer cause an execution block. If we had directly used task = CreateAndRunTask(...) in our main code without a wrapper step and directly accessed the HTML output of this task with task.outputs['report_html'], the necessary evaluation of the outputs promise (dictionary) would have caused an execution block at this point, potentially preventing us from parallelizing multiple FastQC executions within a loop.

Please refer to section Dynamic Conditionals within this tutorial to understand what execution blocks are, why they happen, and how they can be avoided.

If you find it inconvenient that both app and project need to be passed into every FastQC invocation, please have a look at the use of singletons to learn about one possibility how to resolve this. Another possibility is to use a step generator (see below).

Post-processing app outputs

Sometimes tasks produce output files that contain information the automation needs to access, for example when deciding how to continue execution with a conditional or to simply pass on the extracted information to downstream steps.

One elegant solution to this problem is to parse the output file directly inside the app wrapper step and provide the extracted information on a separate output. The advantage of this solution is that the execution block caused by waiting for the FastQC output file remains local and does not impact the calling thread, which for example might run a loop.

Below is an example how the FastQC wrapper step from above can be extended to extract read length information from the HTML report it creates, and how to provide this information as separate output.

class FastQC(Step):
    fastq_file = Input(File)
    app = Input(App)
    project = Input(Project)
     
    report_zip = Output(File)
    report_html = Output(File)
    sequence_length = Output(int)
     
    def execute(self):
        task = FindOrCreateAndRunTask(
            inputs={
                'input_fastq' : [self.fastq_file]
            },
            app=self.app,
            in_project=self.project
        ).finished_task
 
        self.report_zip = task.outputs["report_zip"][0]
        self.report_html = task.outputs["report_html"][0]
        self.sequence_length = self.get_sequence_length(self.report_html)
 
    def get_sequence_length(self, html_file):
        re_seq_len = re.compile(r"Sequence length</td><td>(\d+)")
        for line in html_file.content().split("\n"):
            parts = re_seq_len.search(line)
            if parts:
                return int(parts.group(1))

The function parse_sequence_length() uses a regular expression to look for the desired information inside the HTML output file of FastQC. Note that the content of this file is streamed from the platform, without the need to download the file first.

Step generator

Instead of manually writing a wrapper step around each app, it is possible to use Seven Bridges' step generator that creates a step class automatically by introspecting on the CWL description of a given app.

With step generator in place, you can execute Seven Bridges apps in the following way:

from step_generator import generate_cwl_step
 
FastQC = generate_cwl_step(
    app="admin/sbg-public-data/fastqc-0-11-4",
    project=execution_project
)
 
fastqc = FastQC(fastq_file=fastq_file)

Alternatively, you can also create a step directly from an app object like this:

from step_generator import generate_cwl_step
 
app = SBApi().apps.get("admin/sbg-public-data/fastqc-0-11-4")
FastQC = generate_cwl_step(
    app=app,
    project=execution_project
)
fastqc = FastQC(fastq_file=fastq_file)

Step generator can be installed from Seven Bridges' PyPi server using the following command:

pip install --extra-index-url https://<YOUR-TOKEN-HERE>@pypi.sbgenomics.com step_generator