Skip to content

Commit

Permalink
file_content
Browse files Browse the repository at this point in the history
  • Loading branch information
laysabit committed Apr 26, 2024
1 parent 12cdaed commit 8652454
Showing 1 changed file with 16 additions and 47 deletions.
63 changes: 16 additions & 47 deletions dags/stellar_etl_airflow/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,6 @@ def do_query(opType, date):
return query_job


def store_files(blobs, successful_transforms_folders):
for key in successful_transforms_folders.keys():
matching_files = []
# Use a list comprehension with a regex to get the matching blob names
for blob in blobs:
if re.search(rf"{key}", blob.name):
matching_files.append(os.path.basename(blob.name))

if successful_transforms_folders[key] is None:
successful_transforms_folders[key] = matching_files
else:
successful_transforms_folders[key].extend(matching_files)

return successful_transforms_folders


def get_from_stateTables(**context):
successful_transforms = {
"signers": 0,
Expand Down Expand Up @@ -151,42 +135,27 @@ def get_from_stateTables(**context):
)
)

# regex to find the name of each table in file names, example files belonging to "...offers.txt"
successful_transforms_folders = store_files(
blobs, successful_transforms_folders
)

gcs_hook = GCSHook(google_cloud_storage_conn_id="google_cloud_storage_default")

execution_date_strings = []

for dag_run in execution_dates:
execution_date_str = dag_run.execution_date.strftime(
"%Y-%m-%d %H:%M:%S%z"
).replace(" ", "T")
execution_date_str = execution_date_str[:-2] + ":" + execution_date_str[-2:]

execution_date_strings.append(execution_date_str)
gcs_hook = GCSHook(google_cloud_storage_conn_id="google_cloud_storage_default")

for key in successful_transforms_folders.keys():
for files in successful_transforms_folders[key]:
for file in files and execution_date in execution_date_strings:
file_content = gcs_hook.download(
bucket_name="us-central1-test-hubble-2-5f1f2dbf-bucket",
object_name=f"dag-exported/scheduled__{execution_date}/changes_folder/{file}",
)
print(f"The file content is: {file_content}")
for key in successful_transforms.keys():
for blob in blobs:
if re.search(rf"{key}", blob.name):
file_content = gcs_hook.download(
bucket_name="us-central1-test-hubble-2-5f1f2dbf-bucket",
object_name=f"dag-exported/scheduled__{execution_date_str}/changes_folder/{os.path.basename(blob.name)}",
)
print(f"The file content is: {file_content}")

## Decode the bytes object to a string
# file_content = file_content.decode()
## Decode the bytes object to a string
# file_content = file_content.decode()

## Now file_content is a string with the content of the file
# lines = file_content.splitlines()
## Now file_content is a string with the content of the file
# lines = file_content.splitlines()

## Count the number of lines that start with "{"
# count = sum(1 for line in lines if line.startswith("{"))
## Count the number of lines that start with "{"
# count = sum(1 for line in lines if line.startswith("{"))

# print(count)
# print(count)


def get_from_historyTableExport(**context):
Expand Down

0 comments on commit 8652454

Please sign in to comment.