Skip to content

Commit

Permalink
feat: update page size for query result, add logging for data sync, w…
Browse files Browse the repository at this point in the history
…rite parquet real time
  • Loading branch information
daoleno committed Dec 19, 2023
1 parent f3a007e commit 4030cd0
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions bq-syncer/sync_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,21 @@ def sync_table(table_item, index, total_tables):
query = f"SELECT {', '.join(fields)} FROM `{table_ref}` ORDER BY RAND() LIMIT 1000"

query_job = bqclient.query(query)
iterator = query_job.result(page_size=10000)
iterator = query_job.result(page_size=100000)

# Configure Arrow writer to append to the .parquet file
pages_received = 0
for page in iterator.pages:
pages_received += 1
items = list(page)
page_size = len(items)

if len(items) == 0:
if page_size == 0:
print(
f"[{datetime.now()}] No data received for table {index}/{total_tables}: {table_id}"
)
continue

# Create DataFrame with explicit types
data = {
field: [item.get(field, None) for item in items] for field, _ in fields
Expand Down Expand Up @@ -157,7 +160,13 @@ def sync_table(table_item, index, total_tables):
)

df = pl.concat([df_old, df])
df.write_parquet(parquet_file_path)
df.write_parquet(parquet_file_path)
else:
df.write_parquet(parquet_file_path)

print(
f"[{datetime.now()}] Data sync of table {index}/{total_tables}: {table_id} - Received page {pages_received} with size {page_size}."
)

print(
f"[{datetime.now()}] Data sync of table {index}/{total_tables}: {table_id} completed."
Expand Down

0 comments on commit 4030cd0

Please sign in to comment.