From 2425e1f28c431958b20953df90d72cb820d127c2 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Mon, 28 Aug 2023 09:24:49 -1000 Subject: [PATCH 01/11] add raw s3 functions to parsons --- parsons/aws/s3.py | 275 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index e62080bbbb..e9bc837c8c 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -455,3 +455,278 @@ def transfer_bucket( object_acl.put(ACL="public-read") logger.info(f"Finished syncing {len(key_list)} keys") + + def get_buckets_type(self, regex): + """ + Grabs a type of bucket based on naming convention. + + `Args:` + regex: str + This will most commonly be 'member' or 'vendor' + + `Returns:` + list + list of buckets + + """ + + all_buckets = self.s3.list_buckets() + buckets = [x for x in all_buckets if regex in x.split('-')] + + return buckets + + def s3_to_redshift( + bucket, + key, + rs_table, + if_exists="truncate", + delimiter="tab", + errors=10, + truncate_columns=True, + ): + """ + Moves a tab-delimited file from s3 to Redshift. + + `Args:` + + bucket: str + S3 bucket + + key: str + S3 key + + rs_table: str + Redshift table. This MUST EXIST already. + + if_exists: str + options are: "drop", "append", "truncate", "fail" + + delimiter: str + options are "tab" and "pipe" + + errors: int + the amount of errors you are willing to accept + + truncatecolumns: bool + if you want to truncate long running text + + Returns: + None + """ + + rs = Redshift() + s3 = S3() + + if ".zip" in key: + raise Exception(".zip files won't work with copy. We need unzipped or GZIP.") + + full_key = f"s3://{bucket}/{key}" + logger.info(f"Starting download of {full_key}...") + zip = "gzip" if ".gz" in key else "" + + if delimiter == "tab": + delimiter_line = "delimiter '\t'" + elif delimiter == "pipe": + delimiter_line = "CSV delimiter '|'" + elif delimiter == "comma": + delimiter_line = "CSV" + else: + raise Exception("ERROR: Your options here are tab, pipe, or comma.") + + truncate = "TRUNCATECOLUMNS" if truncate_columns == True else "" + + if rs.table_exists(rs_table): + if if_exists == "fail": + raise Exception("Table already exists, and you said fail. Maybe try drop?") + elif if_exists in ["drop", "truncate"]: + copy_query = f"truncate {rs_table};" + copy_table = rs_table + copy_query_end = None + elif if_exists == "append": + copy_table = f"{rs_table}_temp" + copy_query = f"drop table if exists {copy_table}; create table {copy_table} (like {rs_table});" + copy_query_end = f"insert into {rs_table} (select * from {copy_table})" + else: + raise Exception( + "You chose an option that is not 'fail','truncate','append', or 'drop'." + ) + + copy_query += f""" + -- Load the data + copy {copy_table} from '{full_key}' + credentials 'aws_access_key_id={ACCESS_KEY_ID};aws_secret_access_key={SECRET_ACCESS_KEY}' + emptyasnull + blanksasnull + ignoreheader 1 + {delimiter_line} + acceptinvchars + {zip} + maxerror {errors} + COMPUPDATE true + {truncate} + ; + """ + logger.info(f"copying from s3 into {copy_table}...") + rs.query(copy_query) + + if copy_query_end: + logger.info(f"inserting {copy_table} into {rs_table}...") + rs.query(copy_query_end) + rs.query(f"drop table if exists {copy_table};") + + else: + raise Exception("ERROR: You need to have a table already created to copy into.") + + return None + + def drop_and_save( + rs_table, + bucket, + key, + cascade=True, + manifest=True, + header=True, + delimiter="|", + compression="gzip", + add_quotes=True, + escape=True, + allow_overwrite=True, + parallel=True, + max_file_size="6.2 GB", + aws_region=None, + ): + """ + Description: + This function is used to unload data to s3, and then drop Redshift table. + + Args: + rs_table: str + Redshift table. + + bucket: str + S3 bucket + + key: str + S3 key prefix ahead of table name + + cascade: bool + whether to drop cascade + + ***unload params + + Returns: + None + """ + + rs = Redshift() + s3 = S3() + + query_end = "cascade" if cascade else "" + + rs.unload( + sql=f"select * from {rs_table}", + bucket=bucket, + key_prefix=f"{key}/{rs_table.replace('.','_')}/", + manifest=manifest, + header=header, + delimiter=delimiter, + compression=compression, + add_quotes=add_quotes, + escape=escape, + allow_overwrite=allow_overwrite, + parallel=parallel, + max_file_size=max_file_size, + aws_region=aws_region, + ) + + rs.query(f"drop table if exists {rs_table} {query_end}") + + return None + + def process_s3_keys( + s3, + bucket, + incoming_prefix, + processing_prefix, + dest_prefix, + extension=None, + key_limit=None, + key_offset=0, + ): + """ + Process the keys in an S3 bucket under a specific prefix. + + `Args:` + s3: object + The S3 connector to use. + incoming_prefix: str + The prefix to use to search for keys to process. + processing_prefix: str + The prefix to put files under as they are being processed. + dest_prefix: str + The S3 prefix where the files should be put after processing. + extension: str + `Optional:` The extension of files to look for + key_limit: int + `Optional:` The max number of keys to process; if not specified, will process + all keys + key_offset: int + `Optional:` The offset from the first key found to start processing from; if not + specified, defaults to the first key + `Return:` + A list of the final location of all processed keys. + """ + logger.info("Moving any keys from previous runs back to incoming") + merged_regex = re.compile(f"^{processing_prefix}/\\d+/(.+)$") + + previous_keys = s3.list_keys(bucket, processing_prefix) + logger.info("Found %d keys left over from previous runs", len(previous_keys)) + + for key in previous_keys: + match = merged_regex.match(key) + if match: + remainder = match.group(1) + destination = f"{incoming_prefix}/{remainder}" + logger.info("Moving %s back to %s", key, destination) + s3.transfer_bucket(bucket, key, bucket, destination, remove_original=True) + else: + logger.warning("Could not match processing key to expected regex: %s", key) + + all_keys = s3.list_keys(bucket, incoming_prefix, suffix=extension) + + if key_limit: + all_keys = all_keys[key_offset: key_offset + key_limit] + + job_id = f"{random.randrange(0, 100000):06}" + random_processing_prefix = f"{processing_prefix}/{job_id}" + + logger.info( + "Found %s keys in bucket %s under prefix %s with suffix %s", + len(all_keys), + bucket, + incoming_prefix, + extension, + ) + + moved_keys = [] + for from_key in all_keys: + dest_key = from_key.replace(incoming_prefix, random_processing_prefix) + s3.transfer_bucket(bucket, from_key, bucket, dest_key) + s3.remove_file(bucket, from_key) + moved_keys.append(dest_key) + + for key in moved_keys: + yield key + + date = datetime.datetime.now() + date_string = date.strftime("%Y%m%d") + dated_dest_prefix = f"{dest_prefix}/{date_string}" + + final_keys = [] + for from_key in moved_keys: + dest_key = from_key.replace(random_processing_prefix, dated_dest_prefix) + s3.transfer_bucket(bucket, from_key, bucket, dest_key) + s3.remove_file(bucket, from_key) + final_keys.append(dest_key) + + return final_keys From 705c6f83057620d050aec5a5ebdf2362d3e7a57f Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Wed, 30 Aug 2023 02:59:39 -1000 Subject: [PATCH 02/11] add selected functions to s3.py --- parsons/aws/s3.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index e9bc837c8c..0582024e7d 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -1,9 +1,12 @@ import re import boto3 from botocore.client import ClientError +from parsons.databases.redshift.redshift import Redshift from parsons.utilities import files import logging import os +import random +import datetime logger = logging.getLogger(__name__) @@ -470,12 +473,13 @@ def get_buckets_type(self, regex): """ - all_buckets = self.s3.list_buckets() + all_buckets = self.list_buckets() buckets = [x for x in all_buckets if regex in x.split('-')] return buckets def s3_to_redshift( + self, bucket, key, rs_table, @@ -515,7 +519,6 @@ def s3_to_redshift( """ rs = Redshift() - s3 = S3() if ".zip" in key: raise Exception(".zip files won't work with copy. We need unzipped or GZIP.") @@ -533,7 +536,10 @@ def s3_to_redshift( else: raise Exception("ERROR: Your options here are tab, pipe, or comma.") - truncate = "TRUNCATECOLUMNS" if truncate_columns == True else "" + if truncate_columns: + truncate = "TRUNCATECOLUMNS" + else: + truncate = "" if rs.table_exists(rs_table): if if_exists == "fail": @@ -550,11 +556,10 @@ def s3_to_redshift( raise Exception( "You chose an option that is not 'fail','truncate','append', or 'drop'." ) - copy_query += f""" -- Load the data copy {copy_table} from '{full_key}' - credentials 'aws_access_key_id={ACCESS_KEY_ID};aws_secret_access_key={SECRET_ACCESS_KEY}' + credentials 'aws_access_key_id={self.aws_access_key_id};aws_secret_access_key={self.aws_secret_access_key}' emptyasnull blanksasnull ignoreheader 1 @@ -580,6 +585,7 @@ def s3_to_redshift( return None def drop_and_save( + self, rs_table, bucket, key, @@ -596,8 +602,7 @@ def drop_and_save( aws_region=None, ): """ - Description: - This function is used to unload data to s3, and then drop Redshift table. + Unload data to s3, and then drop Redshift table Args: rs_table: str @@ -619,7 +624,6 @@ def drop_and_save( """ rs = Redshift() - s3 = S3() query_end = "cascade" if cascade else "" From ecfa62686051c06358646f17527f18981edbd16f Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 31 Aug 2023 01:19:47 -1000 Subject: [PATCH 03/11] delte redundant functions and move drop_and_save function to redshift.py --- parsons/aws/s3.py | 106 ------------------------- parsons/databases/redshift/redshift.py | 60 ++++++++++++++ 2 files changed, 60 insertions(+), 106 deletions(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index 0582024e7d..77ac81c981 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -478,112 +478,6 @@ def get_buckets_type(self, regex): return buckets - def s3_to_redshift( - self, - bucket, - key, - rs_table, - if_exists="truncate", - delimiter="tab", - errors=10, - truncate_columns=True, - ): - """ - Moves a tab-delimited file from s3 to Redshift. - - `Args:` - - bucket: str - S3 bucket - - key: str - S3 key - - rs_table: str - Redshift table. This MUST EXIST already. - - if_exists: str - options are: "drop", "append", "truncate", "fail" - - delimiter: str - options are "tab" and "pipe" - - errors: int - the amount of errors you are willing to accept - - truncatecolumns: bool - if you want to truncate long running text - - Returns: - None - """ - - rs = Redshift() - - if ".zip" in key: - raise Exception(".zip files won't work with copy. We need unzipped or GZIP.") - - full_key = f"s3://{bucket}/{key}" - logger.info(f"Starting download of {full_key}...") - zip = "gzip" if ".gz" in key else "" - - if delimiter == "tab": - delimiter_line = "delimiter '\t'" - elif delimiter == "pipe": - delimiter_line = "CSV delimiter '|'" - elif delimiter == "comma": - delimiter_line = "CSV" - else: - raise Exception("ERROR: Your options here are tab, pipe, or comma.") - - if truncate_columns: - truncate = "TRUNCATECOLUMNS" - else: - truncate = "" - - if rs.table_exists(rs_table): - if if_exists == "fail": - raise Exception("Table already exists, and you said fail. Maybe try drop?") - elif if_exists in ["drop", "truncate"]: - copy_query = f"truncate {rs_table};" - copy_table = rs_table - copy_query_end = None - elif if_exists == "append": - copy_table = f"{rs_table}_temp" - copy_query = f"drop table if exists {copy_table}; create table {copy_table} (like {rs_table});" - copy_query_end = f"insert into {rs_table} (select * from {copy_table})" - else: - raise Exception( - "You chose an option that is not 'fail','truncate','append', or 'drop'." - ) - copy_query += f""" - -- Load the data - copy {copy_table} from '{full_key}' - credentials 'aws_access_key_id={self.aws_access_key_id};aws_secret_access_key={self.aws_secret_access_key}' - emptyasnull - blanksasnull - ignoreheader 1 - {delimiter_line} - acceptinvchars - {zip} - maxerror {errors} - COMPUPDATE true - {truncate} - ; - """ - logger.info(f"copying from s3 into {copy_table}...") - rs.query(copy_query) - - if copy_query_end: - logger.info(f"inserting {copy_table} into {rs_table}...") - rs.query(copy_query_end) - rs.query(f"drop table if exists {copy_table};") - - else: - raise Exception("ERROR: You need to have a table already created to copy into.") - - return None - def drop_and_save( self, rs_table, diff --git a/parsons/databases/redshift/redshift.py b/parsons/databases/redshift/redshift.py index 0fc35e2c75..6fc8b15e52 100644 --- a/parsons/databases/redshift/redshift.py +++ b/parsons/databases/redshift/redshift.py @@ -799,6 +799,66 @@ def unload( return self.query(statement) + def drop_and_unload( + self, + rs_table, + bucket, + key, + cascade=True, + manifest=True, + header=True, + delimiter="|", + compression="gzip", + add_quotes=True, + escape=True, + allow_overwrite=True, + parallel=True, + max_file_size="6.2 GB", + aws_region=None, + ): + """ + Unload data to s3, and then drop Redshift table + + Args: + rs_table: str + Redshift table. + + bucket: str + S3 bucket + + key: str + S3 key prefix ahead of table name + + cascade: bool + whether to drop cascade + + ***unload params + + Returns: + None + """ + query_end = "cascade" if cascade else "" + + self.unload( + sql=f"select * from {rs_table}", + bucket=bucket, + key_prefix=f"{key}/{rs_table.replace('.','_')}/", + manifest=manifest, + header=header, + delimiter=delimiter, + compression=compression, + add_quotes=add_quotes, + escape=escape, + allow_overwrite=allow_overwrite, + parallel=parallel, + max_file_size=max_file_size, + aws_region=aws_region, + ) + + self.query(f"drop table if exists {rs_table} {query_end}") + + return None + def generate_manifest( self, buckets, From 47293e6ad5282f8a3a78c9b39db640c368e3e318 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 31 Aug 2023 01:20:03 -1000 Subject: [PATCH 04/11] create test file --- parsons/aws/s3_canales_test.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 parsons/aws/s3_canales_test.py diff --git a/parsons/aws/s3_canales_test.py b/parsons/aws/s3_canales_test.py new file mode 100644 index 0000000000..76b0154102 --- /dev/null +++ b/parsons/aws/s3_canales_test.py @@ -0,0 +1,7 @@ +# import parsons.aws.s3 +from s3 import S3 + +# aws_access_key_id = 'tmc-engineering-sharinef' +# aws_secret_access_key = 'cmt.cpu_xum3CNU*uyx' + +# s3 = S3() From 25a2d8965999d21594298774a7dd6a685824af94 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 31 Aug 2023 05:19:41 -1000 Subject: [PATCH 05/11] add s3 unit tests --- parsons/aws/s3.py | 89 +++++++---------------------------------------- test/test_s3.py | 25 +++++++++++++ 2 files changed, 37 insertions(+), 77 deletions(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index 77ac81c981..c4803cbdb0 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -459,13 +459,13 @@ def transfer_bucket( logger.info(f"Finished syncing {len(key_list)} keys") - def get_buckets_type(self, regex): + def get_buckets_with_subname(self, bucket_subname): """ Grabs a type of bucket based on naming convention. `Args:` - regex: str - This will most commonly be 'member' or 'vendor' + subname: str + This will most commonly be a 'vendor' `Returns:` list @@ -474,75 +474,12 @@ def get_buckets_type(self, regex): """ all_buckets = self.list_buckets() - buckets = [x for x in all_buckets if regex in x.split('-')] + buckets = [x for x in all_buckets if bucket_subname in x.split('-')] return buckets - def drop_and_save( - self, - rs_table, - bucket, - key, - cascade=True, - manifest=True, - header=True, - delimiter="|", - compression="gzip", - add_quotes=True, - escape=True, - allow_overwrite=True, - parallel=True, - max_file_size="6.2 GB", - aws_region=None, - ): - """ - Unload data to s3, and then drop Redshift table - - Args: - rs_table: str - Redshift table. - - bucket: str - S3 bucket - - key: str - S3 key prefix ahead of table name - - cascade: bool - whether to drop cascade - - ***unload params - - Returns: - None - """ - - rs = Redshift() - - query_end = "cascade" if cascade else "" - - rs.unload( - sql=f"select * from {rs_table}", - bucket=bucket, - key_prefix=f"{key}/{rs_table.replace('.','_')}/", - manifest=manifest, - header=header, - delimiter=delimiter, - compression=compression, - add_quotes=add_quotes, - escape=escape, - allow_overwrite=allow_overwrite, - parallel=parallel, - max_file_size=max_file_size, - aws_region=aws_region, - ) - - rs.query(f"drop table if exists {rs_table} {query_end}") - - return None - def process_s3_keys( - s3, + self, bucket, incoming_prefix, processing_prefix, @@ -555,8 +492,6 @@ def process_s3_keys( Process the keys in an S3 bucket under a specific prefix. `Args:` - s3: object - The S3 connector to use. incoming_prefix: str The prefix to use to search for keys to process. processing_prefix: str @@ -577,7 +512,7 @@ def process_s3_keys( logger.info("Moving any keys from previous runs back to incoming") merged_regex = re.compile(f"^{processing_prefix}/\\d+/(.+)$") - previous_keys = s3.list_keys(bucket, processing_prefix) + previous_keys = self.list_keys(bucket, processing_prefix) logger.info("Found %d keys left over from previous runs", len(previous_keys)) for key in previous_keys: @@ -586,11 +521,11 @@ def process_s3_keys( remainder = match.group(1) destination = f"{incoming_prefix}/{remainder}" logger.info("Moving %s back to %s", key, destination) - s3.transfer_bucket(bucket, key, bucket, destination, remove_original=True) + self.transfer_bucket(bucket, key, bucket, destination, remove_original=True) else: logger.warning("Could not match processing key to expected regex: %s", key) - all_keys = s3.list_keys(bucket, incoming_prefix, suffix=extension) + all_keys = self.list_keys(bucket, incoming_prefix, suffix=extension) if key_limit: all_keys = all_keys[key_offset: key_offset + key_limit] @@ -609,8 +544,8 @@ def process_s3_keys( moved_keys = [] for from_key in all_keys: dest_key = from_key.replace(incoming_prefix, random_processing_prefix) - s3.transfer_bucket(bucket, from_key, bucket, dest_key) - s3.remove_file(bucket, from_key) + self.transfer_bucket(bucket, from_key, bucket, dest_key) + self.remove_file(bucket, from_key) moved_keys.append(dest_key) for key in moved_keys: @@ -623,8 +558,8 @@ def process_s3_keys( final_keys = [] for from_key in moved_keys: dest_key = from_key.replace(random_processing_prefix, dated_dest_prefix) - s3.transfer_bucket(bucket, from_key, bucket, dest_key) - s3.remove_file(bucket, from_key) + self.transfer_bucket(bucket, from_key, bucket, dest_key) + self.remove_file(bucket, from_key) final_keys.append(dest_key) return final_keys diff --git a/test/test_s3.py b/test/test_s3.py index f64943dd26..1991b0fd51 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -30,10 +30,16 @@ def setUp(self): self.tbl = Table([{"first": "Bob", "last": "Smith"}]) csv_path = self.tbl.to_csv() + self.test_incoming_prefix = "incoming" + self.test_processing_prefix = "processing" + self.test_dest_prefix = "archive" + self.test_key_2 = "test2.csv" self.tbl_2 = Table([{"first": "Jack", "last": "Smith"}]) csv_path_2 = self.tbl_2.to_csv() + self.test_bucket_subname = self.test_bucket.split('-')[0] + # Sometimes it runs into issues putting the file retry = 1 @@ -183,3 +189,22 @@ def test_transfer_bucket(self): result_tbl_2 = Table.from_csv(path_2) assert_matching_tables(self.tbl_2, result_tbl_2) self.assertFalse(self.s3.key_exists(self.test_bucket, self.test_key_2)) + + def test_get_buckets_with_subname(self): + + buckets_with_subname_true = self.s3.get_buckets_type(self.test_bucket_subname) + self.assertTrue(self.test_bucket in buckets_with_subname_true) + + buckets_with_subname_false = self.s3.get_buckets_type('bucketsubnamedoesnotexist') + self.assertFalse(self.test_bucket in buckets_with_subname_false) + + def test_process_s3_keys(self): + + # Put a file in the bucket + csv_path = self.tbl.to_csv() + key = "test/test.csv" + self.s3.put_file(self.test_bucket, key, csv_path) + + processed_key = self.s3.process_s3_keys(self.test_bucket, self.test_incoming_prefix, + self.test_processing_prefix, self.test_dest_prefix) + self.assertTrue(self.s3.key_exists(self.test_bucket, processed_key)) From 51b271e838dd343ccd78187cdcb0478020c16acd Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 31 Aug 2023 05:48:35 -1000 Subject: [PATCH 06/11] add rs.drop_and_unload unit test --- test/test_redshift.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/test_redshift.py b/test/test_redshift.py index 9c45fe66d4..fd664fede5 100644 --- a/test/test_redshift.py +++ b/test/test_redshift.py @@ -644,6 +644,28 @@ def test_unload(self): # Check that files are there self.assertTrue(self.s3.key_exists(self.temp_s3_bucket, "unload_test")) + def test_drop_and_unload(self): + + rs_table_test = f"{self.temp_schema}.test_copy" + # Copy a table to Redshift + self.rs.copy(self.tbl, rs_table_test, if_exists="drop") + + key = "unload_test" + + # Unload a table to S3 + self.rs.drop_and_unload( + rs_table=rs_table_test, + bucket=self.temp_s3_bucket, + key=key, + ) + + key_prefix = f"{key}/{self.tbl.replace('.','_')}/" + + # Check that files are there + self.assertTrue(self.s3.key_exists(self.temp_s3_bucket, key_prefix)) + + self.assertFalse(self.rs.table_exists(rs_table_test)) + def test_to_from_redshift(self): # Test the parsons table methods From 5b4575c04499051d433322640b01419605578df3 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Wed, 6 Sep 2023 05:50:09 -1000 Subject: [PATCH 07/11] add printing for debugging --- parsons/aws/s3.py | 6 +++++- parsons/aws/s3_canales_test.py | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index c4803cbdb0..60c766c14b 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -1,7 +1,7 @@ import re import boto3 from botocore.client import ClientError -from parsons.databases.redshift.redshift import Redshift +# from parsons.databases.redshift.redshift import Redshift from parsons.utilities import files import logging import os @@ -509,12 +509,16 @@ def process_s3_keys( `Return:` A list of the final location of all processed keys. """ + print('what') logger.info("Moving any keys from previous runs back to incoming") merged_regex = re.compile(f"^{processing_prefix}/\\d+/(.+)$") + print('hiya') previous_keys = self.list_keys(bucket, processing_prefix) logger.info("Found %d keys left over from previous runs", len(previous_keys)) + print(previous_keys) + for key in previous_keys: match = merged_regex.match(key) if match: diff --git a/parsons/aws/s3_canales_test.py b/parsons/aws/s3_canales_test.py index 76b0154102..587efb8455 100644 --- a/parsons/aws/s3_canales_test.py +++ b/parsons/aws/s3_canales_test.py @@ -1,6 +1,17 @@ # import parsons.aws.s3 from s3 import S3 +AWS_ACCESS_KEY_ID = 'AKIARJVLBM6AZYDCUQ7Y' + +AWS_SECRET_ACCESS_KEY = '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' +bucket = 'tmc-internal' +s3 = S3(aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) + +output = s3.process_s3_keys(bucket=bucket, incoming_prefix='test-incoming', + processing_prefix='test-processing', dest_prefix='test-dest') +print(output) +# buckets = s3.get_buckets_with_subname('actblue') +# print(buckets) # aws_access_key_id = 'tmc-engineering-sharinef' # aws_secret_access_key = 'cmt.cpu_xum3CNU*uyx' From 4956241bcb73f6eef367ac01a05c50cd04eb99a9 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Wed, 6 Sep 2023 06:18:48 -1000 Subject: [PATCH 08/11] remove testing file --- parsons/aws/s3_canales_test.py | 18 ------------------ test/test_s3.py | 11 ----------- 2 files changed, 29 deletions(-) delete mode 100644 parsons/aws/s3_canales_test.py diff --git a/parsons/aws/s3_canales_test.py b/parsons/aws/s3_canales_test.py deleted file mode 100644 index 587efb8455..0000000000 --- a/parsons/aws/s3_canales_test.py +++ /dev/null @@ -1,18 +0,0 @@ -# import parsons.aws.s3 -from s3 import S3 - -AWS_ACCESS_KEY_ID = 'AKIARJVLBM6AZYDCUQ7Y' - -AWS_SECRET_ACCESS_KEY = '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' -bucket = 'tmc-internal' -s3 = S3(aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) - -output = s3.process_s3_keys(bucket=bucket, incoming_prefix='test-incoming', - processing_prefix='test-processing', dest_prefix='test-dest') -print(output) -# buckets = s3.get_buckets_with_subname('actblue') -# print(buckets) -# aws_access_key_id = 'tmc-engineering-sharinef' -# aws_secret_access_key = 'cmt.cpu_xum3CNU*uyx' - -# s3 = S3() diff --git a/test/test_s3.py b/test/test_s3.py index 1991b0fd51..d752317df6 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -197,14 +197,3 @@ def test_get_buckets_with_subname(self): buckets_with_subname_false = self.s3.get_buckets_type('bucketsubnamedoesnotexist') self.assertFalse(self.test_bucket in buckets_with_subname_false) - - def test_process_s3_keys(self): - - # Put a file in the bucket - csv_path = self.tbl.to_csv() - key = "test/test.csv" - self.s3.put_file(self.test_bucket, key, csv_path) - - processed_key = self.s3.process_s3_keys(self.test_bucket, self.test_incoming_prefix, - self.test_processing_prefix, self.test_dest_prefix) - self.assertTrue(self.s3.key_exists(self.test_bucket, processed_key)) From cd1d6d9ccdbebef244bad7e1757f1b8e1202a0d6 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Wed, 6 Sep 2023 06:19:24 -1000 Subject: [PATCH 09/11] unsaved changes --- parsons/aws/s3.py | 90 ----------------------------------------------- 1 file changed, 90 deletions(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index 60c766c14b..80c33918ce 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -477,93 +477,3 @@ def get_buckets_with_subname(self, bucket_subname): buckets = [x for x in all_buckets if bucket_subname in x.split('-')] return buckets - - def process_s3_keys( - self, - bucket, - incoming_prefix, - processing_prefix, - dest_prefix, - extension=None, - key_limit=None, - key_offset=0, - ): - """ - Process the keys in an S3 bucket under a specific prefix. - - `Args:` - incoming_prefix: str - The prefix to use to search for keys to process. - processing_prefix: str - The prefix to put files under as they are being processed. - dest_prefix: str - The S3 prefix where the files should be put after processing. - extension: str - `Optional:` The extension of files to look for - key_limit: int - `Optional:` The max number of keys to process; if not specified, will process - all keys - key_offset: int - `Optional:` The offset from the first key found to start processing from; if not - specified, defaults to the first key - `Return:` - A list of the final location of all processed keys. - """ - print('what') - logger.info("Moving any keys from previous runs back to incoming") - merged_regex = re.compile(f"^{processing_prefix}/\\d+/(.+)$") - - print('hiya') - previous_keys = self.list_keys(bucket, processing_prefix) - logger.info("Found %d keys left over from previous runs", len(previous_keys)) - - print(previous_keys) - - for key in previous_keys: - match = merged_regex.match(key) - if match: - remainder = match.group(1) - destination = f"{incoming_prefix}/{remainder}" - logger.info("Moving %s back to %s", key, destination) - self.transfer_bucket(bucket, key, bucket, destination, remove_original=True) - else: - logger.warning("Could not match processing key to expected regex: %s", key) - - all_keys = self.list_keys(bucket, incoming_prefix, suffix=extension) - - if key_limit: - all_keys = all_keys[key_offset: key_offset + key_limit] - - job_id = f"{random.randrange(0, 100000):06}" - random_processing_prefix = f"{processing_prefix}/{job_id}" - - logger.info( - "Found %s keys in bucket %s under prefix %s with suffix %s", - len(all_keys), - bucket, - incoming_prefix, - extension, - ) - - moved_keys = [] - for from_key in all_keys: - dest_key = from_key.replace(incoming_prefix, random_processing_prefix) - self.transfer_bucket(bucket, from_key, bucket, dest_key) - self.remove_file(bucket, from_key) - moved_keys.append(dest_key) - - for key in moved_keys: - yield key - - date = datetime.datetime.now() - date_string = date.strftime("%Y%m%d") - dated_dest_prefix = f"{dest_prefix}/{date_string}" - - final_keys = [] - for from_key in moved_keys: - dest_key = from_key.replace(random_processing_prefix, dated_dest_prefix) - self.transfer_bucket(bucket, from_key, bucket, dest_key) - self.remove_file(bucket, from_key) - final_keys.append(dest_key) - - return final_keys From 3a815c364c0d5259890e85f67a78533eb9d346fa Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Tue, 12 Sep 2023 06:11:11 -1000 Subject: [PATCH 10/11] remove unused packages --- parsons/aws/s3.py | 5 ++--- test/test_s3.py | 6 ++++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index 80c33918ce..1ee20765bd 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -1,12 +1,11 @@ import re import boto3 from botocore.client import ClientError + # from parsons.databases.redshift.redshift import Redshift from parsons.utilities import files import logging import os -import random -import datetime logger = logging.getLogger(__name__) @@ -474,6 +473,6 @@ def get_buckets_with_subname(self, bucket_subname): """ all_buckets = self.list_buckets() - buckets = [x for x in all_buckets if bucket_subname in x.split('-')] + buckets = [x for x in all_buckets if bucket_subname in x.split("-")] return buckets diff --git a/test/test_s3.py b/test/test_s3.py index d752317df6..fbc8054440 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -38,7 +38,7 @@ def setUp(self): self.tbl_2 = Table([{"first": "Jack", "last": "Smith"}]) csv_path_2 = self.tbl_2.to_csv() - self.test_bucket_subname = self.test_bucket.split('-')[0] + self.test_bucket_subname = self.test_bucket.split("-")[0] # Sometimes it runs into issues putting the file retry = 1 @@ -195,5 +195,7 @@ def test_get_buckets_with_subname(self): buckets_with_subname_true = self.s3.get_buckets_type(self.test_bucket_subname) self.assertTrue(self.test_bucket in buckets_with_subname_true) - buckets_with_subname_false = self.s3.get_buckets_type('bucketsubnamedoesnotexist') + buckets_with_subname_false = self.s3.get_buckets_type( + "bucketsubnamedoesnotexist" + ) self.assertFalse(self.test_bucket in buckets_with_subname_false) From 5f09cd7fd13906c8f54bb9e4fb2331764a0753df Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 5 Oct 2023 22:45:46 -1000 Subject: [PATCH 11/11] remove unneeded module --- parsons/aws/s3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsons/aws/s3.py b/parsons/aws/s3.py index 1ee20765bd..66d7254556 100644 --- a/parsons/aws/s3.py +++ b/parsons/aws/s3.py @@ -2,7 +2,6 @@ import boto3 from botocore.client import ClientError -# from parsons.databases.redshift.redshift import Redshift from parsons.utilities import files import logging import os