Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Redshift batch export uses spmc consumer #26897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
BATCH_EXPORT_BIGQUERY_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
"BATCH_EXPORT_BIGQUERY_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 0, type_cast=int
)
BATCH_EXPORT_REDSHIFT_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 8 # 8MB
BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
"BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 1024 * 1024 * 300, type_cast=int
)
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_HTTP_BATCH_SIZE: int = 5000
BATCH_EXPORT_BUFFER_QUEUE_MAX_SIZE_BYTES: int = 1024 * 1024 * 300 # 300MB
Expand Down
32 changes: 20 additions & 12 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
Consumer,
Producer,
RecordBatchQueue,
run_consumer_loop,
run_consumer,
wait_for_schema_or_producer,
)
from posthog.temporal.batch_exports.temporary_file import (
Expand Down Expand Up @@ -519,12 +519,19 @@ def __init__(
heartbeater: Heartbeater,
heartbeat_details: BigQueryHeartbeatDetails,
data_interval_start: dt.datetime | str | None,
data_interval_end: dt.datetime | str,
writer_format: WriterFormat,
bigquery_client: BigQueryClient,
bigquery_table: bigquery.Table,
table_schema: list[BatchExportField],
table_schema: list[bigquery.SchemaField],
):
super().__init__(heartbeater, heartbeat_details, data_interval_start, writer_format)
super().__init__(
heartbeater=heartbeater,
heartbeat_details=heartbeat_details,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
writer_format=writer_format,
)
self.bigquery_client = bigquery_client
self.bigquery_table = bigquery_table
self.table_schema = table_schema
Expand Down Expand Up @@ -629,11 +636,10 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
)
records_completed = 0

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return records_completed
return 0

record_batch_schema = pa.schema(
# NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other
Expand Down Expand Up @@ -700,21 +706,23 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
create=can_perform_merge,
delete=can_perform_merge,
) as bigquery_stage_table:
records_completed = await run_consumer_loop(
queue=queue,
consumer_cls=BigQueryConsumer,
producer_task=producer_task,
consumer = BigQueryConsumer(
heartbeater=heartbeater,
heartbeat_details=details,
data_interval_end=data_interval_end,
data_interval_start=data_interval_start,
schema=record_batch_schema,
writer_format=WriterFormat.PARQUET if can_perform_merge else WriterFormat.JSONL,
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=() if can_perform_merge else json_columns,
bigquery_client=bq_client,
bigquery_table=bigquery_stage_table if can_perform_merge else bigquery_table,
table_schema=stage_schema if can_perform_merge else schema,
)
records_completed = await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
schema=record_batch_schema,
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=() if can_perform_merge else json_columns,
writer_file_kwargs={"compression": "zstd"} if can_perform_merge else {},
multiple_files=True,
)
Expand Down
Loading
Loading