Pryce Turner
Pryce Turner

Bioinformatics on Flyte: Read Alignment

Flyte presents an excellent solution for the complexities inherent in bioinformatics workflows. Tailored to abstract the intricacies of compute resources, Flyte allows bioinformaticians to focus on defining and executing reproducible workflows without grappling with underlying infrastructure details. Offering scalability and efficient distributed computing, Flyte proves advantageous for processing large datasets in parallel. Its cloud-native design facilitates seamless integration with cloud services, while its extensibility and customization support the incorporation of diverse tools and algorithms. This post is based largely on the evolving variant discovery workflow, specifically focusing on quality control, preprocessing, and alignment. We’ll explore a number of Flyte features which make it uniquely suited to these tasks.

Custom dependencies

While Flyte tasks are predominantly defined with Python, there are a number of abstractions which allow us to use any tool we like. A lot of common bioinformatics tools are written in languages like Java, C++, Perl, and R. As a container and k8s native orchestrator, we can easily define a Dockerfile that incorporates whatever dependencies we may need. You can build out the default Dockerfile that ships with `pyflyte init` to add custom dependencies. The project’s Dockerfile incorporates a number of common bioinformatics tools to create a base image for use in tasks. Here’s samtools for example, a utility for interacting with alignment files:

Copied to clipboard!
# Install samtools
RUN wget https://github.com/samtools/samtools/releases/download/${SAMTOOLS_VER}/samtools-${SAMTOOLS_VER}.tar.bz2 && \
 tar -xjf samtools-${SAMTOOLS_VER}.tar.bz2 && \
 rm samtools-${SAMTOOLS_VER}.tar.bz2 && \
 cd samtools-${SAMTOOLS_VER} && \
 ./configure --prefix=/usr/local && \
 make && \
 make install && \
 cd .. && \
 rm -rf samtools-${SAMTOOLS_VER}

Any additional python dependencies can also be specified in the requirements.txt file, which gets pulled into and installed in the final image. You'll also be able to use fast registration as you iterate on your code to avoid having to rebuild and push an image every time.

Flyte uses an object store, like S3, for storing all of it’s workflow files and any larger inputs and outputs. While we’re used to working with a traditional filesystem in bioinformatics, pointing tools to files and directories explicitly, Flyte makes this much more flexible. Since each task is run in it’s own container, those files and directories can be pulled in and placed wherever at the task boundary, greatly simplifying path handling. Additionally, we can create dataclasses to define your samples at the workflow level to provide a clean and extensible data structure to keep things tidy. Here’s the dataclass for capturing the outputs of fastp:

Copied to clipboard!
@dataclass
class FiltSample(DataClassJSONMixin):
    """
    Represents a filtered sequencing sample with its associated files and a quality report.

    This class defines the structure for representing a filtered sequencing sample. It includes
    attributes for the sample name, paths to the filtered read files (R1 and R2), and a quality
    report.

    Attributes:
        sample (str): The name or identifier of the filtered sequencing sample.
        filt_r1 (FlyteFile): A FlyteFile object representing the path to the filtered R1 read file.
        filt_r2 (FlyteFile): A FlyteFile object representing the path to the filtered R2 read file.
        report (FlyteFile): A FlyteFile object representing the quality report associated with
            the filtered sample.
    """

    sample: str
    filt_r1: FlyteFile
    filt_r2: FlyteFile
    report: FlyteFile

Instead of writing to directories and keeping track of things manually on the commandline, these dataclasses will keep relevant metadata about your samples and let you know where to find them in object storage.

Pre-processing & QC

Not only can we incorporate any tools we need into our workflows, but there are also a number of ways to interact with them. FastQC is a very common tool for gathering QC metrics about raw reads. It's a java tool and included in our Dockerfile. It doesn't have any python bindings, but luckily Flyte lets us run arbitrary ShellTasks with a clean way of passing in inputs and receiving outputs. Just define a script for what you need to do and ShellTask will handle the rest.

Copied to clipboard!
"""
Perform quality control using FastQC.

This function takes a FlyteDirectory object containing raw sequencing data, 
gathers QC metrics using FastQC, and returns a FlyteDirectory object that
can be crawled with MultiQC to generate a report.

Args:
    seq_dir (FlyteDirectory): An S3 prefix containing raw sequencing data to be processed.

Returns:
    qc (FlyteDirectory): A directory containing fastqc report output.
"""
fastqc = ShellTask(
    name="fastqc",
    debug=True,
    container_image=base_image,
    metadata=TaskMetadata(retries=3, cache=True, cache_version="1"),
    script="""
    mkdir {outputs.qc}
    fastqc {inputs.seq_dir}/*.fastq.gz --outdir={outputs.qc}
    """,
    inputs=kwtypes(seq_dir=FlyteDirectory),
    output_locs=[
        OutputLocation(var="qc", var_type=FlyteDirectory, location="/root/qc")
    ],
)

An additional benefit of defining the FastQC output explicitly is that the reports are then saved in the object store, allowing a later task to quantify them longitudinally to see sequencing performance over time.

We can also use the FastQC outputs immediately in our workflow in concert with conditionals to control execution based on input data quality. FastQC generates a summary file with a simple PASS / WARN / FAIL call across a number of different metrics, our workflow can check for any FAIL lines in the summary and automatically halt execution.

Copied to clipboard!
samples = (
    conditional("pass-qc")
    .if_((check == "PASS") | (check == "WARN"))
    .then(prepare_samples(seq_dir=seq_dir))
    .else_()
    .fail("One or more samples failed QC.")
)

This can surface an early failure without wasting valuable compute or anyone's time doing manual review.

Next up, we’ll incorporate fastp, a bioinformatics tool designed for the fast and flexible preprocessing of high-throughput sequencing data. It is specifically developed for tasks such as quality control, adapter removal, and filtering of raw sequencing reads. It can be a bit more memory hungry than Flyte's defaults are set to; luckily we can use Resources directly in the task definition to bump that up and allow it to run efficiently.

Copied to clipboard!
@task(requests=Resources(cpu="1", mem="2Gi"), container_image=base_image)
def pyfastp(rs: RawSample) -> FiltSample:
  ...

Additionally, we can make use of a map task in our workflow to parallelize the processing of fastp across all our samples. Using a map task we can easily take a task designed for 1 input (although multiple are allowed via `functools.partial`) and 1 output, and pass it a list of inputs.

Copied to clipboard!
filtered_samples = map_task(pyfastp)(rs=samples)

These get parallelized seamlessly across pods on the Flyte cluster, freeing up the need to manually manage concurrency.

As a final check before moving onto the alignment, we can define an explicit approval right in the workflow using Flyte’s Human-in-the-Loop functionality. Aggregating reports of all processing done up to this point, and visualizing it via Decks (more on that later), a researcher is able to quickly get a high level view of the work done so far. Please see this more in-depth write up around this functionality for details.

Copied to clipboard!
approve_filter = approve(
    render_multiqc(fqc=fqc_dir, filt_reps=filtered_samples, sams=[]),
    "filter-approval",
    timeout=timedelta(hours=2),
)

The workflow will pause execution and a modal in the console will prompt for approval before moving on to downstream processing. Subsequent steps could be anything from analyzing gene expression to classifying bacterial populations in a metagenomics study. In this case we’ll be moving on to variant calling to identify where a sample differs from the reference. This will provide us with salient locations to investigate disfunction.

Finally, I’d like to call out that we’re using Flyte’s native entity-chaining mechanism via the `>>` operator.

Copied to clipboard!
samples >> approve_filter >> bowtie2_idx
samples >> approve_filter >> hisat2_idx

This explicitly enforces the order of operations without having to rely on the downstream tasks consuming upstream inputs, giving us even more flexibility in how execution proceeds.

Index generation & alignment

Alignment tools commonly require the generation of a tool-specific index from a reference genome. This index allows the alignment algorithm to efficiently query the reference for the highest quality alignment. Index generation can be a very compute intensive step. Luckily, we can take advantage of Flyte's native caching when building that index. It’s of course possible to define that index offline and upload it to the object store beforehand since it will seldom change. However, when rapidly prototyping different tools to include in a pipeline, taking advantage of this built in functionality can be a huge speed boost. We've also defined a `cache_version` in the config that relies on a hash of the reference location in the object store. This means that changing the reference will invalidate the cache and trigger a rebuild, while allowing you to go back to your old reference with impunity. Here’s the ShellTask for Hisat2’s index:

Copied to clipboard!
ref_loc = "s3://my-s3-bucket/my-data/refs/GRCh38.fasta"
ref_hash = str(hash(ref_loc))[:4]
"""
Generate Hisat2 index files from a reference genome.

Args:
    ref (FlyteFile): A FlyteFile object representing the input reference file.

Returns:
    FlyteDirectory: A FlyteDirectory object containing the index files.
"""
hisat2_index = ShellTask(
    name="hisat2-index",
    debug=True,
    metadata=TaskMetadata(retries=3, cache=True, cache_version=ref_hash),
    requests=Resources(cpu="4", mem="10Gi"),
    container_image=base_image,
    script="""
    mkdir {outputs.idx}
    hisat2-build {inputs.ref} {outputs.idx}/hs2_idx
    """,
    inputs=kwtypes(ref=FlyteFile),
    output_locs=[
        OutputLocation(var="idx", var_type=FlyteDirectory, location="/root/idx")
    ],
)

Finally we arrive at the most important step: the alignment itself. It's usually a good idea to evaluate a few different tools to see how they perform with respect to runtime and resource requirements. This is easy with a dynamic workflow, which allows us to pass in an arbitrary number of inputs to be used with whatever tasks we want. Since their DAG is compiled at run time, we can nest dynamic workflows within static workflows, giving us a lot more flexibility in workflow definition. We’ll be comparing 2 popular aligners with fairly different approaches under the hood, bowtie2 and hisat.  In the main workflow you'll pass a list of filtered samples to each tool and be able to capture run statistics in the SamFile dataclass as well as visualize their runtimes in the Flyte console.

Copied to clipboard!
@dynamic
def compare_aligners(
    bt2_idx: FlyteDirectory, hs2_idx: FlyteDirectory, samples: List[FiltSample]
) -> List[List[SamFile]]:
    """
    Compare alignment results using two different aligners for multiple samples.

    This function takes two FlyteDirectory objects representing indices for two different
    aligners, a list of FiltSample objects containing sample data, and compares the
    alignment results for each sample using both aligners. The function returns a
    list of lists, where each inner list contains the alignment results (SamFile objects)
    for a sample ran through each aligner.

    Args:
        bt2_idx (FlyteDirectory): The FlyteDirectory object representing the bowtie2 index.
        hs2_idx (FlyteDirectory): The FlyteDirectory object representing the hisat2 index.
        samples (List[FiltSample]): A list of FiltSample objects containing sample data
            to be processed.

    Returns:
        List[List[SamFile]]: A list of lists, where each inner list contains alignment
            results (SamFile objects) for a sample, with results from both aligners.
    """
    sams = []
    for sample in samples:
        bt2_sam = bowtie2_align_paired_reads(idx=bt2_idx, fs=sample)
        hs2_sam = hisat2_align_paired_reads(idx=hs2_idx, fs=sample)
        pair = [bt2_sam, hs2_sam]
        sams.append(pair)
    return sams

As a last step, we can generate a similar report as we used earlier in the approval step, now enhanced with alignment statistics from both aligners. We’re using MultiQC here, an excellent report aggregation and visualization tool to accomplish this.

Copied to clipboard!
@task(container_image=multiqc_image_spec, disable_deck=False)
def render_multiqc(
    fqc: FlyteDirectory, filt_reps: List[FiltSample], sams: List[List[SamFile]]
) -> FlyteFile:
    """
    Generate MultiQC report by rendering quality and alignment data.

    Takes a FlyteDirectory object containing FastQC reports (`fqc`),
    a list of FiltSample objects containing reports (`filt_reps`), and a
    list of lists of SamFile objects representing alignment results (`sams`). It generates
    a MultiQC report by rendering quality and alignment data and returns a FlyteFile object
    of the report.

    Args:
        fqc (FlyteDirectory): A FlyteDirectory object containing FastQC reports.
        filt_reps (List[FiltSample]): A list of FiltSample objects representing filtered samples.
        sams (List[List[SamFile]]): A list of lists of SamFile objects representing alignment results.

    Returns:
        FlyteFile: A FlyteFile object representing the MultiQC report.
    """
    ldir = Path(current_context().working_directory)

    fqc.download()
    for f in os.listdir(fqc.path):
        src = ldir.joinpath(fqc.path, f)
        dest = ldir.joinpath(ldir, f)
        shutil.move(src, dest)
    logger.debug(f"FastQC reports downloaded to {ldir}")

    for filt_rep in filt_reps:
        filt_rep.report.download()
        shutil.move(filt_rep.report.path, ldir)
    logger.debug(f"FastP reports downloaded to {ldir}")

    for pair in sams:
        pair[0].report.download()
        shutil.move(pair[0].report.path, ldir)
        pair[1].report.download()
        shutil.move(pair[1].report.path, ldir)
    logger.debug(f"Alignment reports for {sams} downloaded to {ldir}")

    final_report = ldir.joinpath("multiqc_report.html")
    mqc_cmd = ["multiqc", str(ldir), "-n", str(final_report)]
    logger.debug(f"Generating MultiQC report at {final_report} with command: {mqc_cmd}")
    subproc_raise(mqc_cmd)

    with open(final_report, "r") as f:
      report_html = f.read()
      
    current_context().default_deck.append(report_html)
    logger.debug("MultiQC report HTML added to default deck")

    return FlyteFile(path=str(final_report))

Although this was in play earlier, it’s worth mentioning that this dependency was added in-line and built on the fly via ImageSpec. This allows you to build or enhance existing Docker images without ever having to touch a Dockerfile. Implementation is as easy as:

Copied to clipboard!
# Add MultiQC to the base image
multiqc_image_spec = ImageSpec(
    name="multiqc",
    packages=["multiqc"],
    registry="ghcr.io/pryce-turner",
    base_image="ghcr.io/pryce-turner/variant-discovery:latest",
)

After gathering all relative metrics from the workflow, we're able to render that report via Decks, giving us rich run statistics without ever leaving the Flyte console!

Screenshot 2023-11-27 at 12.03.54 PM-20231127-200400.png

Summing up

Flyte offers a number of excellent utilities to carry out bioinformatics analyses. You can bring your own tried-and-true tools and quickly compare them without having to shoehorn them into a new format. With an abstracted object store you can forget about messy path handling and manage all your datasets with rich metadata in one single, declarative place. As you move to production, Flyte has a plethora of options for resource management, parallelization, and caching to get the most bang for your buck regarding time and compute. With versioning across the board and strongly typed interfaces between tasks, you can also unburden yourself from time-wasting errors and awkward code revisions. Finally, the Flyte console has a number of powerful conveniences that let you view and interact with your workflow in unprecedented ways. All of these things together make for straightforward, robust research; unlocking what really matters: actionable insights, sooner.