Skip to content

Commit

Permalink
more linting
Browse files Browse the repository at this point in the history
  • Loading branch information
snovod committed Oct 2, 2023
1 parent 56cee62 commit 9de3bb0
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions orchestration/hca_manage/deduplicate_staging_areas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import os
import re
import sys
from typing import Optional

import pandas as pd
from google.cloud import storage
from typing import Optional


STAGING_AREA_BUCKETS = {
"prod": {
Expand All @@ -28,7 +29,7 @@


# Function to return the objects in a staging area bucket
def get_staging_area_objects(bucket_name: str, prefix: str, delimiter: Optional[str] = None) -> list[list]:
def get_staging_area_objects(bucket_name: str, prefix: str, delimiter: Optional[str] = None) -> list[list[str]]:
record_list = []
try:
# List blobs in specified bucket/prefix
Expand All @@ -51,7 +52,7 @@ def get_staging_area_objects(bucket_name: str, prefix: str, delimiter: Optional[


# Function to identify outdated entity files
def identify_outdated_files(record_list: Optional[list[list]]) -> list[storage.blob]:
def identify_outdated_files(record_list: Optional[list[list[str]]]) -> list[storage.blob]:
delete_list = []
if record_list:
# Load records into dataframe, group by path and entity, and order by version descending
Expand Down Expand Up @@ -110,21 +111,29 @@ def create_staging_area_json(bucket_name: str, staging_area_json_prefix: str) ->


# Get staging area
def get_staging_area(staging_area: Optional[str], institution: Optional[str], environment: str, uuid : Optional[str]) -> str:
def get_staging_area(
staging_area: Optional[str],
institution: Optional[str],
environment: str, uuid:
Optional[str]
) -> str:
# If institution is provided then infer staging area fom that and uuid
if institution:
# Confirm uuid is passed in if --institution is used
if not uuid:
print("Must provide --uuid if using --institution")
sys.exit(1)
return os.path.join(STAGING_AREA_BUCKETS[environment][institution], uuid)
return staging_area
else:
return staging_area


def check_staging_area_json_exists(bucket: str, prefix: str) -> bool:
storage_client = storage.Client()
gcs_bucket = storage_client.bucket(bucket)
return gcs_bucket.get_blob(prefix)
if gcs_bucket.get_blob(prefix):
return True
return False


# Main function
Expand Down

0 comments on commit 9de3bb0

Please sign in to comment.