diff --git a/orchestration/hca_manage/manifest.py b/orchestration/hca_manage/manifest.py index a1041a91..32b88f87 100644 --- a/orchestration/hca_manage/manifest.py +++ b/orchestration/hca_manage/manifest.py @@ -92,8 +92,9 @@ def _sanitize_gs_path(path: str) -> str: def _parse_csv(csv_path: str, env: str, project_id_only: bool = False, - include_release_tag: bool = False, release_tag: str = "") -> list[list[str]]: + include_release_tag: bool = False, release_tag: str = "") -> tuple[list[list[str]], list[list[str]]]: keys = set() + public_projects = set() with open(csv_path, "r") as f: reader = csv.reader(f) for row in reader: @@ -101,12 +102,12 @@ def _parse_csv(csv_path: str, env: str, project_id_only: bool = False, logging.debug("Empty path detected, skipping") continue - assert len(row) == 2 + assert len(row) == 3, "CSV must have 3 columns: institution, project_id, and Yes or No for Public" row = [x.strip() for x in row] institution = row[0].upper() project_id = find_project_id_in_str(row[1]) + public = row[2].lower() - key = None if project_id_only: project_id = row[1] key = project_id @@ -126,18 +127,25 @@ def _parse_csv(csv_path: str, env: str, project_id_only: bool = False, key = key + f",{release_tag}" keys.add(key) + # make a separate set of public projects + if public == "no": + public_projects.add(key) + chunked_paths = chunked(keys, MAX_STAGING_AREAS_PER_PARTITION_SET) - return [chunk for chunk in chunked_paths] + chunked_paths_no_ma = chunked(public_projects, MAX_STAGING_AREAS_PER_PARTITION_SET) + return [chunk for chunk in chunked_paths], [chunk for chunk in chunked_paths_no_ma] def parse_and_load_manifest(env: str, csv_path: str, release_tag: str, pipeline_name: str, project_id_only: bool = False, - include_release_tag: bool = False) -> None: + include_release_tag: bool = False, no_ma: bool = False) -> None: chunked_paths = _parse_csv(csv_path, env, project_id_only, include_release_tag, release_tag) + paths_to_use = chunked_paths[1] if no_ma else chunked_paths[0] + print(f"paths_to_use: {paths_to_use}") storage_client = Client() bucket: Bucket = storage_client.bucket(bucket_name=ETL_PARTITION_BUCKETS[env]) - for pos, chunk in enumerate(chunked_paths): + for pos, chunk in enumerate(paths_to_use): assert len(chunk), "At least one import path is required" qualifier = chr(pos + 97) # dcp11_a, dcp11_b, etc. blob_name = f"{pipeline_name}/{release_tag}_{qualifier}_manifest.csv" @@ -146,8 +154,9 @@ def parse_and_load_manifest(env: str, csv_path: str, release_tag: str, if not query_yes_no(f"Manifest {blob.name} already exists for pipeline {pipeline_name}, overwrite?"): return - logging.info(f"Uploading manifest [bucket={bucket.name}, name={blob_name}]") - blob.upload_from_string(data="\n".join(chunk)) + # TODO turn back on + # logging.info(f"Uploading manifest [bucket={bucket.name}, name={blob_name}]") + # blob.upload_from_string(data="\n".join(chunk)) def _get_dagster_client() -> DagsterGraphQLClient: @@ -196,9 +205,11 @@ def load(args: argparse.Namespace) -> None: args.release_tag, f"make_snapshot_public_job_{ENV_PIPELINE_ENDINGS[args.env]}", project_id_only=True, - include_release_tag=True + include_release_tag=True, + no_ma=True ) - _reload_repository(_get_dagster_client()) + # TODO turn back on + # _reload_repository(_get_dagster_client()) def enumerate_manifests(args: argparse.Namespace) -> None: