Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch - Concatenate Microbial fastq files #3958

Merged
merged 13 commits into from
Nov 25, 2024
24 changes: 12 additions & 12 deletions cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
from cg.services.deliver_files.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.services.deliver_files.rsync.service import (
DeliveryRsyncService,
)
from cg.services.deliver_files.rsync.service import DeliveryRsyncService
from cg.store.models import Analysis, Case

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,8 +88,8 @@ def deliver_case(
LOG.error(f"Could not find case with id {case_id}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else case.data_delivery,
workflow=case.data_analysis,
case=case,
delivery_type=delivery_type,
)
delivery_service.deliver_files_for_case(
case=case, delivery_base_path=Path(inbox), dry_run=dry_run
Expand All @@ -100,8 +98,8 @@ def deliver_case(

@deliver.command(
name="ticket",
help="Deliver all case files for cases in a ticket based on delivery type to the customer inbox on the HPC "
"and start an Rsync job to clinical-delivery. "
help="Deliver all case files for cases in a ticket based on delivery type to the customer"
"inbox on the HPC and start an Rsync job to clinical-delivery. "
"NOTE: the dry-run flag will copy files to the customer inbox on Hasta, "
"but will not perform the Rsync job.",
)
Expand All @@ -116,7 +114,8 @@ def deliver_ticket(
dry_run: bool,
):
"""
Deliver all case files based on delivery type to the customer inbox on the HPC for cases connected to a ticket.
Deliver all case files based on delivery type to the customer inbox on the HPC for cases
connected to a ticket.
"""
inbox: str = context.delivery_path
service_builder: DeliveryServiceFactory = context.delivery_service_factory
Expand All @@ -125,8 +124,8 @@ def deliver_ticket(
LOG.error(f"Could not find case connected to ticket {ticket}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else cases[0].data_delivery,
workflow=cases[0].data_analysis,
case=cases[0],
delivery_type=delivery_type,
)
delivery_service.deliver_files_for_ticket(
ticket_id=ticket, delivery_base_path=Path(inbox), dry_run=dry_run
Expand Down Expand Up @@ -173,8 +172,8 @@ def deliver_sample_raw_data(
LOG.error(f"Could not find case with id {case_id}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
case=case,
delivery_type=delivery_type,
workflow=case.data_analysis,
)
delivery_service.deliver_files_for_sample(
case=case, sample_id=sample_id, delivery_base_path=Path(inbox), dry_run=dry_run
Expand All @@ -186,7 +185,8 @@ def deliver_sample_raw_data(
@DRY_RUN
def deliver_auto_raw_data(context: CGConfig, dry_run: bool):
"""
Deliver all case files for the raw data workflow to the customer inbox on the HPC and start a Rsync job.
Deliver all case files for the raw data workflow to the customer inbox on the HPC and start a
Rsync job.
1. get all cases with analysis type fastq that need to be delivered
2. check if their upload has started
3. if not, start the upload
Expand Down
3 changes: 1 addition & 2 deletions cg/cli/deliver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime
from pathlib import Path

from cg.constants import Workflow
from cg.services.deliver_files.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
Expand All @@ -27,8 +26,8 @@ def deliver_raw_data_for_analyses(
try:
case: Case = analysis.case
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
case=case,
delivery_type=case.data_delivery,
workflow=Workflow.RAW_DATA,
)

delivery_service.deliver_files_for_case(
Expand Down
2 changes: 1 addition & 1 deletion cg/meta/upload/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def upload_files_to_customer_inbox(self, case: Case) -> None:
"""Uploads the analysis files to the customer inbox."""
factory_service: DeliveryServiceFactory = self.config.delivery_service_factory
delivery_service: DeliverFilesService = factory_service.build_delivery_service(
case=case,
delivery_type=case.data_delivery,
workflow=case.data_analysis,
)
delivery_service.deliver_files_for_case(
case=case, delivery_base_path=Path(self.config.delivery_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,13 @@
from cg.services.deliver_files.deliver_files_service.error_handling import (
handle_no_delivery_files_error,
)
from cg.services.deliver_files.file_fetcher.abstract import (
FetchDeliveryFilesService,
)
from cg.services.deliver_files.file_fetcher.abstract import FetchDeliveryFilesService
from cg.services.deliver_files.file_fetcher.models import DeliveryFiles
from cg.services.deliver_files.file_filter.abstract import FilterDeliveryFilesService
from cg.services.deliver_files.file_formatter.abstract import (
DeliveryFileFormattingService,
)
from cg.services.deliver_files.file_formatter.models import (
FormattedFiles,
)
from cg.services.deliver_files.file_formatter.abstract import DeliveryFileFormattingService
from cg.services.deliver_files.file_formatter.models import FormattedFiles
from cg.services.deliver_files.file_mover.service import DeliveryFilesMover
from cg.services.deliver_files.rsync.service import (
DeliveryRsyncService,
)
from cg.services.deliver_files.rsync.service import DeliveryRsyncService
from cg.store.exc import EntryNotFoundError
from cg.store.models import Case
from cg.store.store import Store
Expand Down
Original file line number Diff line number Diff line change
@@ -1,61 +1,41 @@
"""Module for the factory of the deliver files service."""

from typing import Type

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.apps.tb import TrailblazerAPI
from cg.constants import Workflow, DataDelivery
from cg.constants import DataDelivery, Workflow
from cg.constants.constants import PrepCategory
from cg.services.analysis_service.analysis_service import AnalysisService
from cg.services.deliver_files.file_filter.sample_service import SampleFileFilter
from cg.services.deliver_files.tag_fetcher.bam_service import (
BamDeliveryTagsFetcher,
)
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
)
from cg.services.deliver_files.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.deliver_files.deliver_files_service.exc import DeliveryTypeNotSupported
from cg.services.deliver_files.tag_fetcher.abstract import (
FetchDeliveryFileTagsService,
)
from cg.services.deliver_files.tag_fetcher.sample_and_case_service import (
SampleAndCaseDeliveryTagsFetcher,
)
from cg.services.deliver_files.file_fetcher.analysis_service import (
AnalysisDeliveryFileFetcher,
)
from cg.services.deliver_files.file_fetcher.abstract import (
FetchDeliveryFilesService,
)
from cg.services.deliver_files.file_fetcher.abstract import FetchDeliveryFilesService
from cg.services.deliver_files.file_fetcher.analysis_raw_data_service import (
RawDataAndAnalysisDeliveryFileFetcher,
)
from cg.services.deliver_files.file_fetcher.raw_data_service import (
RawDataDeliveryFileFetcher,
)
from cg.services.deliver_files.file_formatter.service import (
DeliveryFileFormatter,
)

from cg.services.deliver_files.file_formatter.abstract import (
DeliveryFileFormattingService,
)
from cg.services.deliver_files.file_formatter.utils.case_service import (
CaseFileFormatter,
)
from cg.services.deliver_files.file_fetcher.analysis_service import AnalysisDeliveryFileFetcher
from cg.services.deliver_files.file_fetcher.raw_data_service import RawDataDeliveryFileFetcher
from cg.services.deliver_files.file_filter.sample_service import SampleFileFilter
from cg.services.deliver_files.file_formatter.abstract import DeliveryFileFormattingService
from cg.services.deliver_files.file_formatter.service import DeliveryFileFormatter
from cg.services.deliver_files.file_formatter.utils.case_service import CaseFileFormatter
from cg.services.deliver_files.file_formatter.utils.sample_concatenation_service import (
SampleFileConcatenationFormatter,
)
from cg.services.deliver_files.file_formatter.utils.sample_service import (
SampleFileFormatter,
)
from cg.services.deliver_files.file_mover.service import (
DeliveryFilesMover,
from cg.services.deliver_files.file_formatter.utils.sample_service import SampleFileFormatter
from cg.services.deliver_files.file_mover.service import DeliveryFilesMover
from cg.services.deliver_files.rsync.service import DeliveryRsyncService
from cg.services.deliver_files.tag_fetcher.abstract import FetchDeliveryFileTagsService
from cg.services.deliver_files.tag_fetcher.bam_service import BamDeliveryTagsFetcher
from cg.services.deliver_files.tag_fetcher.sample_and_case_service import (
SampleAndCaseDeliveryTagsFetcher,
)
from cg.services.deliver_files.rsync.service import (
DeliveryRsyncService,
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
)
from cg.store.models import Case
from cg.store.store import Store


Expand All @@ -76,6 +56,36 @@ def __init__(
self.tb_service = tb_service
self.analysis_service = analysis_service

@staticmethod
def _sanitise_delivery_type(delivery_type: DataDelivery) -> DataDelivery:
"""Sanitise the delivery type."""
if delivery_type in [DataDelivery.FASTQ_QC, DataDelivery.FASTQ_SCOUT]:
return DataDelivery.FASTQ
if delivery_type in [DataDelivery.ANALYSIS_SCOUT]:
return DataDelivery.ANALYSIS_FILES
if delivery_type in [
DataDelivery.FASTQ_ANALYSIS_SCOUT,
DataDelivery.FASTQ_QC_ANALYSIS,
]:
return DataDelivery.FASTQ_ANALYSIS
return delivery_type

@staticmethod
def _validate_delivery_type(delivery_type: DataDelivery):
"""Check if the delivery type is supported. Raises DeliveryTypeNotSupported error."""
if delivery_type in [
DataDelivery.FASTQ,
DataDelivery.ANALYSIS_FILES,
DataDelivery.FASTQ_ANALYSIS,
DataDelivery.BAM,
]:
return
raise DeliveryTypeNotSupported(
f"Delivery type {delivery_type} is not supported. Supported delivery types are"
f" {DataDelivery.FASTQ}, {DataDelivery.ANALYSIS_FILES},"
f" {DataDelivery.FASTQ_ANALYSIS}, {DataDelivery.BAM}."
)

@staticmethod
def _get_file_tag_fetcher(delivery_type: DataDelivery) -> FetchDeliveryFileTagsService:
"""Get the file tag fetcher based on the delivery type."""
Expand All @@ -102,38 +112,43 @@ def _get_file_fetcher(self, delivery_type: DataDelivery) -> FetchDeliveryFilesSe
tags_fetcher=file_tag_fetcher,
)

@staticmethod
def _convert_workflow(self, case: Case) -> Workflow:
"""Converts a workflow with the introduction of the microbial-fastq delivery type an
unsupported combination of delivery type and workflow setup is required. This function
makes sure that a raw data workflow with microbial fastq delivery type is treated as a
microsalt workflow so that the microbial-fastq sample files can be concatenated."""
tag: str = case.samples[0].application_version.application.tag
microbial_tags: list[str] = [
application.tag
for application in self.store.get_active_applications_by_prep_category(
prep_category=PrepCategory.MICROBIAL
)
]
if case.data_analysis == Workflow.RAW_DATA and tag in microbial_tags:
return Workflow.MICROSALT
return case.data_analysis

def _get_sample_file_formatter(
workflow: Workflow,
self,
case: Case,
) -> SampleFileFormatter | SampleFileConcatenationFormatter:
"""Get the file formatter service based on the workflow."""
if workflow in [Workflow.MICROSALT]:
converted_workflow: Workflow = self._convert_workflow(case)
if converted_workflow in [Workflow.MICROSALT]:
return SampleFileConcatenationFormatter(FastqConcatenationService())
return SampleFileFormatter()
diitaz93 marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _validate_delivery_type(delivery_type: DataDelivery):
"""Check if the delivery type is supported. Raises DeliveryTypeNotSupported error."""
if delivery_type in [
DataDelivery.FASTQ,
DataDelivery.ANALYSIS_FILES,
DataDelivery.FASTQ_ANALYSIS,
DataDelivery.BAM,
]:
return
raise DeliveryTypeNotSupported(
f"Delivery type {delivery_type} is not supported. Supported delivery types are {DataDelivery.FASTQ}, {DataDelivery.ANALYSIS_FILES}, {DataDelivery.FASTQ_ANALYSIS}, {DataDelivery.BAM}."
)

def build_delivery_service(
self, workflow: Workflow, delivery_type: DataDelivery
self, case: Case, delivery_type: DataDelivery | None = None
) -> DeliverFilesService:
"""Build a delivery service based on the workflow and delivery type."""
delivery_type: DataDelivery = self._sanitise_delivery_type(delivery_type)
"""Build a delivery service based on a case."""
delivery_type: DataDelivery = self._sanitise_delivery_type(
delivery_type if delivery_type else case.data_delivery
)
self._validate_delivery_type(delivery_type)
diitaz93 marked this conversation as resolved.
Show resolved Hide resolved
file_fetcher: FetchDeliveryFilesService = self._get_file_fetcher(delivery_type)
sample_file_formatter: SampleFileFormatter | SampleFileConcatenationFormatter = (
self._get_sample_file_formatter(workflow)
self._get_sample_file_formatter(case)
)
file_formatter: DeliveryFileFormattingService = DeliveryFileFormatter(
case_file_formatter=CaseFileFormatter(), sample_file_formatter=sample_file_formatter
Expand All @@ -148,17 +163,3 @@ def build_delivery_service(
tb_service=self.tb_service,
analysis_service=self.analysis_service,
)

@staticmethod
def _sanitise_delivery_type(delivery_type: DataDelivery) -> DataDelivery:
"""Sanitise the delivery type."""
if delivery_type in [DataDelivery.FASTQ_QC, DataDelivery.FASTQ_SCOUT]:
return DataDelivery.FASTQ
if delivery_type in [DataDelivery.ANALYSIS_SCOUT]:
return DataDelivery.ANALYSIS_FILES
if delivery_type in [
DataDelivery.FASTQ_ANALYSIS_SCOUT,
DataDelivery.FASTQ_QC_ANALYSIS,
]:
return DataDelivery.FASTQ_ANALYSIS
return delivery_type
24 changes: 5 additions & 19 deletions cg/services/deliver_files/file_formatter/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,14 @@
import os
from pathlib import Path

from cg.constants.delivery import INBOX_NAME
from cg.services.deliver_files.file_fetcher.models import (
CaseFile,
DeliveryFiles,
SampleFile,
)
from cg.services.deliver_files.file_formatter.abstract import (
DeliveryFileFormattingService,
)
from cg.services.deliver_files.file_formatter.models import (
FormattedFile,
FormattedFiles,
)
from cg.services.deliver_files.file_formatter.utils.case_service import (
CaseFileFormatter,
)
from cg.services.deliver_files.file_fetcher.models import CaseFile, DeliveryFiles, SampleFile
from cg.services.deliver_files.file_formatter.abstract import DeliveryFileFormattingService
from cg.services.deliver_files.file_formatter.models import FormattedFile, FormattedFiles
from cg.services.deliver_files.file_formatter.utils.case_service import CaseFileFormatter
from cg.services.deliver_files.file_formatter.utils.sample_concatenation_service import (
SampleFileConcatenationFormatter,
)
from cg.services.deliver_files.file_formatter.utils.sample_service import (
SampleFileFormatter,
)
from cg.services.deliver_files.file_formatter.utils.sample_service import SampleFileFormatter

LOG = logging.getLogger(__name__)

Expand Down
13 changes: 13 additions & 0 deletions cg/store/crud/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,19 @@ def get_current_application_version_by_tag(self, tag: str) -> ApplicationVersion
valid_from=dt.datetime.now(),
).first()

def get_active_applications_by_prep_category(
self, prep_category: PrepCategory
) -> list[Application]:
"""Return all active applications by prep category."""
return apply_application_filter(
applications=self._get_query(table=Application),
filter_functions=[
ApplicationFilter.BY_PREP_CATEGORIES,
ApplicationFilter.IS_NOT_ARCHIVED,
],
prep_categories=[prep_category],
).all()

def get_bed_version_by_file_name(self, bed_version_file_name: str) -> BedVersion:
"""Return bed version with file name."""
return apply_bed_version_filter(
Expand Down
Loading
Loading