Skip to content

Commit

Permalink
FE-187 remove copy project pipeline and tests (#332)
Browse files Browse the repository at this point in the history
* remove test_copy_project, copy_project_to_new_dataset_job (unused and calls copy_project which I'm about to delete as well), copy_project pipeline solids, copy_project pipeline solids test, copy_project pipeline, remove copy_project test config, remove copy_project import

* linting for style

* Update requirements.txt

* removing unused copy_project related config

* Adding concurrency limiter - IE if a new push for the PR comes through stop the previously running jobs, if any.

* add sleep to load_hca to see if the e2e issue is time sensitive.

* FE-203 disable test load hca e2e (#333)

---------

Co-authored-by: dsp-fieldeng-bot <[email protected]>
  • Loading branch information
bahill and dsp-fieldeng-bot authored Apr 8, 2024
1 parent 4af7883 commit 8881ec0
Show file tree
Hide file tree
Showing 28 changed files with 47 additions and 1,303 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/validate_pull_request_main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ on:
pull_request:
branches:
- main
concurrency:
group: ${{ github.ref }}
cancel-in-progress: true
jobs:
pr-validation:
name: PR Validation
Expand Down
4 changes: 0 additions & 4 deletions ops/helmfiles/dagster/helmfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ releases:
queuedRunCoordinator:
maxConcurrentRuns: 10
tagConcurrencyLimits:
- key: dev_refresh_snapshot_backfill
limit: 1
- key: dcp_release
limit: 5
- key: dcp2_prod_migration
limit: 3
envSecrets:
- name: monster-dagster-secrets
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,10 @@
Defines partitioning logic for the Q3 2021 dev refresh
"""

import os

from dagster import file_relative_path, Partition
from dagster.utils import load_yaml_from_path
from dagster import Partition
from dagster_utils.typing import DagsterObjectConfigSchema


def run_config_for_dev_refresh_partition(partition: Partition) -> DagsterObjectConfigSchema:
path = file_relative_path(
__file__, os.path.join("./run_config", "copy_project_run_config.yaml")
)
run_config: DagsterObjectConfigSchema = load_yaml_from_path(path)
run_config["resources"]["hca_project_copying_config"]["config"]["source_hca_project_id"] = partition.value
run_config["resources"]["load_tag"]["config"]["load_tag_prefix"] = f"dr"

return run_config


def run_config_for_per_project_dataset_partition(partition: Partition) -> DagsterObjectConfigSchema:
path = file_relative_path(
__file__, os.path.join("./run_config", "copy_project_new_dataset_run_config.yaml")
)
run_config: DagsterObjectConfigSchema = load_yaml_from_path(path)
run_config["resources"]["hca_project_copying_config"]["config"]["source_hca_project_id"] = partition.value
run_config["resources"]["load_tag"]["config"]["load_tag_prefix"] = f"dr"

return run_config


def run_config_for_cut_snapshot_partition(partition: Partition) -> DagsterObjectConfigSchema:
run_config = {
"solids": {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,6 @@
from dagster_utils.typing import DagsterObjectConfigSchema


def run_config_for_real_prod_migration_dcp1(partition: Partition) -> DagsterObjectConfigSchema:
path = file_relative_path(
__file__, os.path.join("./run_config", "dcp1_real_prod_migration.yaml")
)
run_config: DagsterObjectConfigSchema = load_yaml_from_path(path)
run_config["resources"]["hca_project_id"]["config"]["hca_project_id"] = partition.value

return run_config


def run_config_for_real_prod_migration_dcp2(partition: Partition) -> DagsterObjectConfigSchema:
path = file_relative_path(
__file__, os.path.join("./run_config", "dcp2_real_prod_migration.yaml")
)
run_config: DagsterObjectConfigSchema = load_yaml_from_path(path)
run_config["resources"]["hca_project_id"]["config"]["hca_project_id"] = partition.value

return run_config


def run_config_cut_project_snapshot_job_real_prod_dcp2(partition: Partition) -> DagsterObjectConfigSchema:
path = file_relative_path(
__file__, os.path.join("./run_config", "dcp2_real_prod_migration_snapshot.yaml")
)
run_config: DagsterObjectConfigSchema = load_yaml_from_path(path)
run_config["resources"]["snapshot_config"]["config"]["source_hca_project_id"] = partition.value

return run_config


def run_config_per_project_snapshot_job(partition: Partition) -> DagsterObjectConfigSchema:
path = file_relative_path(
__file__, os.path.join("./run_config", "per_project_snapshot.yaml")
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

2 changes: 0 additions & 2 deletions orchestration/hca_orchestration/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from hca_orchestration.pipelines.copy_project import copy_project
from hca_orchestration.pipelines.cut_snapshot import cut_snapshot
from hca_orchestration.pipelines.load_hca import load_hca
from hca_orchestration.pipelines.set_snapshot_public import set_snapshot_public
Expand All @@ -7,7 +6,6 @@
__all__ = [
'cut_snapshot',
'load_hca',
'copy_project',
'set_snapshot_public',
'validate_ingress_graph'
]
28 changes: 0 additions & 28 deletions orchestration/hca_orchestration/pipelines/copy_project.py

This file was deleted.

45 changes: 8 additions & 37 deletions orchestration/hca_orchestration/repositories/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,18 @@
import os
from typing import Any, Optional

from dagster import PipelineDefinition, in_process_executor, run_status_sensor, PipelineRunStatus, \
RunStatusSensorContext, pipeline_failure_sensor, PipelineRun, file_relative_path
from dagster import (
PipelineRun,
PipelineRunStatus,
RunStatusSensorContext,
file_relative_path,
pipeline_failure_sensor,
run_status_sensor,
)
from dagster.utils import load_yaml_from_globs
from dagster_gcp.gcs import gcs_pickle_io_manager
from dagster_slack import make_slack_on_pipeline_failure_sensor
from dagster_utils.resources.bigquery import bigquery_client
from dagster_utils.resources.data_repo.jade_data_repo import jade_data_repo_client
from dagster_utils.resources.google_storage import google_storage_client
from slack_sdk import WebClient

from hca_orchestration.config import preconfigure_resource_for_mode
from hca_orchestration.pipelines import copy_project
from hca_orchestration.resources.config.scratch import scratch_config
from hca_orchestration.resources import bigquery_service, load_tag
from hca_orchestration.resources.hca_project_config import hca_project_copying_config, hca_project_id
from hca_orchestration.resources.config.datasets import find_or_create_project_dataset
from hca_orchestration.resources.data_repo_service import data_repo_service
from hca_orchestration.resources.utils import run_start_time


def copy_project_to_new_dataset_job(src_env: str, target_env: str) -> PipelineDefinition:
return copy_project.to_job(
name=f"copy_project_from_{src_env}_to_{target_env}",
description=f"Copies a project from {src_env} to {target_env}",
resource_defs={
"bigquery_client": bigquery_client,
"data_repo_client": preconfigure_resource_for_mode(jade_data_repo_client, target_env),
"gcs": google_storage_client,
"scratch_config": scratch_config,
"bigquery_service": bigquery_service,
"hca_project_copying_config": hca_project_copying_config,
"target_hca_dataset": find_or_create_project_dataset,
"hca_project_id": hca_project_id,
"load_tag": load_tag,
"data_repo_service": data_repo_service,
"io_manager": preconfigure_resource_for_mode(gcs_pickle_io_manager, src_env),
"run_start_time": run_start_time,
},
executor_def=in_process_executor
)


def config_path(relative_path: str) -> str:
path: str = file_relative_path(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
dev_run_config_for_dcp_release_per_project_partition,
run_config_per_project_public_snapshot_job,
)
from hca_orchestration.config.dev_refresh.dev_refresh import (
run_config_for_per_project_dataset_partition,
)
from hca_orchestration.config.prod_migration.prod_migration import (
run_config_per_project_snapshot_job_dev,
)
Expand All @@ -35,7 +32,6 @@
staging_area_validator,
validate_ingress_graph,
)
from hca_orchestration.repositories.common import copy_project_to_new_dataset_job
from hca_orchestration.resources import bigquery_service, load_tag
from hca_orchestration.resources.config.dagit import dagit_config
from hca_orchestration.resources.config.datasets import (
Expand Down Expand Up @@ -127,15 +123,12 @@ def per_project_load_hca() -> PipelineDefinition:
@repository
def all_jobs() -> list[PipelineDefinition]:
jobs = [
copy_project_to_new_dataset_job("prod", "dev"),
make_snapshot_public_job("dev", "dev"),
cut_project_snapshot_job("dev", "dev", "[email protected]"),
legacy_cut_snapshot_job("dev", "[email protected]"),
per_project_load_hca(),
validate_ingress_job()
]
jobs += configure_partitions_for_pipeline("copy_project_to_new_dataset",
run_config_for_per_project_dataset_partition)
jobs += configure_partitions_for_pipeline("make_snapshot_public_job_dev",
run_config_per_project_public_snapshot_job)
jobs += configure_partitions_for_pipeline("cut_project_snapshot_job_dev",
Expand Down
Loading

0 comments on commit 8881ec0

Please sign in to comment.