From 4030cd08b28f974cc3a6c085e1597dd431d5bc64 Mon Sep 17 00:00:00 2001 From: daoleno Date: Wed, 20 Dec 2023 03:06:48 +0800 Subject: [PATCH] feat: update page size for query result, add logging for data sync, write parquet real time --- bq-syncer/sync_parquet.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/bq-syncer/sync_parquet.py b/bq-syncer/sync_parquet.py index ef47c41..166ef39 100644 --- a/bq-syncer/sync_parquet.py +++ b/bq-syncer/sync_parquet.py @@ -114,18 +114,21 @@ def sync_table(table_item, index, total_tables): query = f"SELECT {', '.join(fields)} FROM `{table_ref}` ORDER BY RAND() LIMIT 1000" query_job = bqclient.query(query) - iterator = query_job.result(page_size=10000) + iterator = query_job.result(page_size=100000) + # Configure Arrow writer to append to the .parquet file pages_received = 0 for page in iterator.pages: pages_received += 1 items = list(page) + page_size = len(items) - if len(items) == 0: + if page_size == 0: print( f"[{datetime.now()}] No data received for table {index}/{total_tables}: {table_id}" ) continue + # Create DataFrame with explicit types data = { field: [item.get(field, None) for item in items] for field, _ in fields @@ -157,7 +160,13 @@ def sync_table(table_item, index, total_tables): ) df = pl.concat([df_old, df]) - df.write_parquet(parquet_file_path) + df.write_parquet(parquet_file_path) + else: + df.write_parquet(parquet_file_path) + + print( + f"[{datetime.now()}] Data sync of table {index}/{total_tables}: {table_id} - Received page {pages_received} with size {page_size}." + ) print( f"[{datetime.now()}] Data sync of table {index}/{total_tables}: {table_id} completed."