Skip to content

Commit

Permalink
feat: add try catch
Browse files Browse the repository at this point in the history
  • Loading branch information
daoleno committed Jan 15, 2024
1 parent f259336 commit 55fbd56
Showing 1 changed file with 61 additions and 56 deletions.
117 changes: 61 additions & 56 deletions bq-syncer/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,62 +133,67 @@ def delete_old_dir():


def process_table(table, conn, dataset_ref, bqclient, idx):
table_id = table.table_id
table_ref = dataset_ref.table(table_id)

cursor = conn.cursor()

# Check if table exists in DuckDB
cursor.execute(
f"SELECT count(*) FROM information_schema.tables WHERE table_name = '{table_id}'"
)
if cursor.fetchone()[0] == 0:
# Create table in DuckDB
logging.info(f"Creating table: {table_id}")
table_schema = bqclient.get_table(table_ref).schema
converted_schema = convert_schema(table_schema)
ddl = f"CREATE TABLE {table_id} ({', '.join(converted_schema)})"
cursor.execute(ddl)

# Retrieve and sync data
last_timestamp_result = cursor.execute(
f"SELECT MAX(source_timestamp) FROM {table_id}"
).fetchone()
last_timestamp = (
last_timestamp_result[0] if last_timestamp_result[0] is not None else 0
)

# Build and execute query
fields = [
f.name
if f.name != "datastream_metadata"
else "datastream_metadata.source_timestamp"
for f in table_schema
]
query = f"""
SELECT {', '.join(fields)}
FROM `{table_ref}`
WHERE datastream_metadata.source_timestamp > {last_timestamp}
ORDER BY datastream_metadata.source_timestamp ASC
"""
query_job = bqclient.query(query)
pageNum = 0
for page in query_job.result(page_size=10000).pages:
pageNum += 1
items = list(page)
if items:
df = pl.DataFrame(
{
field.name: list(data)
for field, data in zip(table_schema, zip(*items))
}
)
cursor.register("df", df)
cursor.execute(f"INSERT INTO {table_id} SELECT * FROM df")
logging.info(
f"Synced table {idx}: {table_id} - page {pageNum} - {len(items)} rows"
)
logging.info(f"Synced table {idx}: {table_id}")
try:
table_id = table.table_id
table_ref = dataset_ref.table(table_id)

cursor = conn.cursor()

# Check if table exists in DuckDB
cursor.execute(
f"SELECT count(*) FROM information_schema.tables WHERE table_name = '{table_id}'"
)
if cursor.fetchone()[0] == 0:
# Create table in DuckDB
logging.info(f"Creating table: {table_id}")
table_schema = bqclient.get_table(table_ref).schema
converted_schema = convert_schema(table_schema)
ddl = f"CREATE TABLE {table_id} ({', '.join(converted_schema)})"
cursor.execute(ddl)

# Retrieve and sync data
last_timestamp_result = cursor.execute(
f"SELECT MAX(source_timestamp) FROM {table_id}"
).fetchone()
last_timestamp = (
last_timestamp_result[0] if last_timestamp_result[0] is not None else 0
)

# Build and execute query
fields = [
f.name
if f.name != "datastream_metadata"
else "datastream_metadata.source_timestamp"
for f in table_schema
]
query = f"""
SELECT {', '.join(fields)}
FROM `{table_ref}`
WHERE datastream_metadata.source_timestamp > {last_timestamp}
ORDER BY datastream_metadata.source_timestamp ASC
"""
query_job = bqclient.query(query)
pageNum = 0
for page in query_job.result(page_size=10000).pages:
pageNum += 1
items = list(page)
if items:
df = pl.DataFrame(
{
field.name: list(data)
for field, data in zip(table_schema, zip(*items))
}
)
cursor.register("df", df)
cursor.execute(f"INSERT INTO {table_id} SELECT * FROM df")
logging.info(
f"Synced table {idx}: {table_id} - page {pageNum} - {len(items)} rows"
)
logging.info(f"Synced table {idx}: {table_id}")
except Exception as e:
logging.error(f"An error occurred: {e}")
finally:
cursor.close()


def perform_sync_task():
Expand Down

0 comments on commit 55fbd56

Please sign in to comment.