diff --git a/bq-syncer/sync.py b/bq-syncer/sync.py index d07fb92..7bc8c8a 100644 --- a/bq-syncer/sync.py +++ b/bq-syncer/sync.py @@ -133,62 +133,67 @@ def delete_old_dir(): def process_table(table, conn, dataset_ref, bqclient, idx): - table_id = table.table_id - table_ref = dataset_ref.table(table_id) - - cursor = conn.cursor() - - # Check if table exists in DuckDB - cursor.execute( - f"SELECT count(*) FROM information_schema.tables WHERE table_name = '{table_id}'" - ) - if cursor.fetchone()[0] == 0: - # Create table in DuckDB - logging.info(f"Creating table: {table_id}") - table_schema = bqclient.get_table(table_ref).schema - converted_schema = convert_schema(table_schema) - ddl = f"CREATE TABLE {table_id} ({', '.join(converted_schema)})" - cursor.execute(ddl) - - # Retrieve and sync data - last_timestamp_result = cursor.execute( - f"SELECT MAX(source_timestamp) FROM {table_id}" - ).fetchone() - last_timestamp = ( - last_timestamp_result[0] if last_timestamp_result[0] is not None else 0 - ) - - # Build and execute query - fields = [ - f.name - if f.name != "datastream_metadata" - else "datastream_metadata.source_timestamp" - for f in table_schema - ] - query = f""" - SELECT {', '.join(fields)} - FROM `{table_ref}` - WHERE datastream_metadata.source_timestamp > {last_timestamp} - ORDER BY datastream_metadata.source_timestamp ASC - """ - query_job = bqclient.query(query) - pageNum = 0 - for page in query_job.result(page_size=10000).pages: - pageNum += 1 - items = list(page) - if items: - df = pl.DataFrame( - { - field.name: list(data) - for field, data in zip(table_schema, zip(*items)) - } - ) - cursor.register("df", df) - cursor.execute(f"INSERT INTO {table_id} SELECT * FROM df") - logging.info( - f"Synced table {idx}: {table_id} - page {pageNum} - {len(items)} rows" - ) - logging.info(f"Synced table {idx}: {table_id}") + try: + table_id = table.table_id + table_ref = dataset_ref.table(table_id) + + cursor = conn.cursor() + + # Check if table exists in DuckDB + cursor.execute( + f"SELECT count(*) FROM information_schema.tables WHERE table_name = '{table_id}'" + ) + if cursor.fetchone()[0] == 0: + # Create table in DuckDB + logging.info(f"Creating table: {table_id}") + table_schema = bqclient.get_table(table_ref).schema + converted_schema = convert_schema(table_schema) + ddl = f"CREATE TABLE {table_id} ({', '.join(converted_schema)})" + cursor.execute(ddl) + + # Retrieve and sync data + last_timestamp_result = cursor.execute( + f"SELECT MAX(source_timestamp) FROM {table_id}" + ).fetchone() + last_timestamp = ( + last_timestamp_result[0] if last_timestamp_result[0] is not None else 0 + ) + + # Build and execute query + fields = [ + f.name + if f.name != "datastream_metadata" + else "datastream_metadata.source_timestamp" + for f in table_schema + ] + query = f""" + SELECT {', '.join(fields)} + FROM `{table_ref}` + WHERE datastream_metadata.source_timestamp > {last_timestamp} + ORDER BY datastream_metadata.source_timestamp ASC + """ + query_job = bqclient.query(query) + pageNum = 0 + for page in query_job.result(page_size=10000).pages: + pageNum += 1 + items = list(page) + if items: + df = pl.DataFrame( + { + field.name: list(data) + for field, data in zip(table_schema, zip(*items)) + } + ) + cursor.register("df", df) + cursor.execute(f"INSERT INTO {table_id} SELECT * FROM df") + logging.info( + f"Synced table {idx}: {table_id} - page {pageNum} - {len(items)} rows" + ) + logging.info(f"Synced table {idx}: {table_id}") + except Exception as e: + logging.error(f"An error occurred: {e}") + finally: + cursor.close() def perform_sync_task():