Skip to content

Commit

Permalink
refactor: Redshift batch export uses spmc consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Dec 13, 2024
1 parent 46dc898 commit 014db52
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 73 deletions.
4 changes: 4 additions & 0 deletions posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
BATCH_EXPORT_BIGQUERY_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
"BATCH_EXPORT_BIGQUERY_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 0, type_cast=int
)
BATCH_EXPORT_REDSHIFT_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 8 # 8MB
BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
"BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 1024 * 1024 * 300, type_cast=int
)
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_HTTP_BATCH_SIZE: int = 5000
BATCH_EXPORT_BUFFER_QUEUE_MAX_SIZE_BYTES: int = 1024 * 1024 * 300 # 300MB
Expand Down
173 changes: 102 additions & 71 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import collections.abc
import contextlib
import dataclasses
Expand All @@ -9,6 +8,7 @@
import psycopg
import pyarrow as pa
import structlog
from django.conf import settings
from psycopg import sql
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
Expand All @@ -28,9 +28,7 @@
default_fields,
execute_batch_export_insert_activity,
get_data_interval,
raise_on_produce_task_failure,
start_batch_export_run,
start_produce_batch_export_record_batches,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
Expand All @@ -44,6 +42,14 @@
PostgreSQLClient,
PostgreSQLField,
)
from posthog.temporal.batch_exports.spmc import (
Consumer,
Producer,
RecordBatchQueue,
run_consumer,
wait_for_schema_or_producer,
)
from posthog.temporal.batch_exports.temporary_file import BatchExportTemporaryFile, WriterFormat
from posthog.temporal.batch_exports.utils import (
JsonType,
apeek_first_and_rewind,
Expand Down Expand Up @@ -395,6 +401,66 @@ async def flush_to_redshift(batch):
return total_rows_exported


class RedshiftConsumer(Consumer):
def __init__(
self,
heartbeater: Heartbeater,
heartbeat_details: RedshiftHeartbeatDetails,
data_interval_start: dt.datetime | str | None,
data_interval_end: dt.datetime | str,
redshift_client: RedshiftClient,
redshift_table: str,
):
"""Implementation of a record batch consumer for Redshift batch export.
This consumer will execute an INSERT query on every flush using provided
Redshift client. The recommended way to insert multiple values into Redshift
is using a COPY statement (see:
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html). However,
Redshift cannot COPY from local files like Postgres, but only from files in
S3 or executing commands in SSH hosts. Setting that up would add complexity
and require more configuration from the user compared to the old Redshift
export plugin. For these reasons, we are going with basic INSERT statements
for now, but should eventually migrate to COPY from S3 for performance.
"""
super().__init__(
heartbeater,
heartbeat_details,
data_interval_start,
data_interval_end,
writer_format=WriterFormat.REDSHIFT_INSERT,
)
self.redshift_client = redshift_client
self.redshift_table = redshift_table

async def flush(
self,
batch_export_file: BatchExportTemporaryFile,
records_since_last_flush: int,
bytes_since_last_flush: int,
flush_counter: int,
last_date_range: DateRange,
is_last: bool,
error: Exception | None,
):
await self.logger.adebug(
"Loading %s records in query of size %s bytes to Redshift table '%s'",
records_since_last_flush,
bytes_since_last_flush,
self.redshift_table,
)

async with self.redshift_client.async_client_cursor() as cursor:
async with self.redshift_client.connection.transaction():
await cursor.execute(batch_export_file.read())

await self.logger.adebug("Loaded %s to Redshift table '%s'", records_since_last_flush, self.redshift_table)
self.rows_exported_counter.add(records_since_last_flush)
self.bytes_exported_counter.add(bytes_since_last_flush)

self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)


@dataclasses.dataclass
class RedshiftInsertInputs(PostgresInsertInputs):
"""Inputs for Redshift insert activity.
Expand Down Expand Up @@ -438,7 +504,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
async with (
Heartbeater() as heartbeater,
set_status_to_running_task(run_id=inputs.run_id, logger=logger),
get_client(team_id=inputs.team_id) as client,
get_client(team_id=inputs.team_id, max_block_size=10) as client,
):
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")
Expand Down Expand Up @@ -474,41 +540,28 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
data_interval_end = dt.datetime.fromisoformat(inputs.data_interval_end)
full_range = (data_interval_start, data_interval_end)

queue, produce_task = start_produce_batch_export_record_batches(
client=client,
queue = RecordBatchQueue(max_size_bytes=settings.BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES)
producer = Producer(clickhouse_client=client)
producer_task = producer.start(
queue=queue,
model_name=model_name,
is_backfill=inputs.is_backfill,
team_id=inputs.team_id,
full_range=full_range,
done_ranges=done_ranges,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
destination_default_fields=redshift_default_fields(),
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
)

get_schema_task = asyncio.create_task(queue.get_schema())
await asyncio.wait(
[get_schema_task, produce_task],
return_when=asyncio.FIRST_COMPLETED,
)

# Finishing producing happens sequentially after putting to queue and setting the schema.
# So, either we finished producing and setting the schema tasks, or we finished without
# putting anything in the queue.
if get_schema_task.done():
# In the first case, we'll land here.
# The schema is available, and the queue is not empty, so we can start the batch export.
record_batch_schema = get_schema_task.result()
else:
# In the second case, we'll land here: We finished producing without putting anything.
# Since we finished producing with an empty queue, there is nothing to batch export.
# We could have also failed, so we need to re-raise that exception to allow a retry if
# that's the case.
await raise_on_produce_task_failure(produce_task)
record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return 0

record_batch_schema = pa.schema(
[field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"]
)
known_super_columns = ["properties", "set", "set_once", "person_properties"]
if inputs.properties_data_type != "varchar":
properties_type = "SUPER"
Expand Down Expand Up @@ -564,52 +617,30 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
):
schema_columns = {field[0] for field in table_fields}

def map_to_record(row: dict) -> tuple[dict, dt.datetime]:
"""Map row to a record to insert to Redshift."""
record = {k: v for k, v in row.items() if k in schema_columns}

for column in known_super_columns:
if record.get(column, None) is not None:
# TODO: We should be able to save a json.loads here.
record[column] = remove_escaped_whitespace_recursive(json.loads(record[column]))

if isinstance(row["_inserted_at"], int):
inserted_at = dt.datetime.fromtimestamp(row["_inserted_at"])
else:
inserted_at = row["_inserted_at"]

return record, inserted_at

async def record_generator() -> (
collections.abc.AsyncGenerator[tuple[dict[str, typing.Any], dt.datetime], None]
):
while not queue.empty() or not produce_task.done():
try:
record_batch = queue.get_nowait()
except asyncio.QueueEmpty:
if produce_task.done():
await logger.adebug(
"Empty queue with no more events being produced, closing consumer loop"
)
return
else:
await asyncio.sleep(0.1)
continue

for record in record_batch.to_pylist():
yield map_to_record(record)

records_completed = await insert_records_to_redshift(
record_generator(),
redshift_client,
inputs.schema,
redshift_stage_table if requires_merge else redshift_table,
consumer = RedshiftConsumer(
heartbeater=heartbeater,
use_super=properties_type == "SUPER",
known_super_columns=known_super_columns,
heartbeat_details=details,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
data_interval_start=data_interval_start,
redshift_client=redshift_client,
redshift_table=redshift_stage_table if requires_merge else redshift_table,
)
records_completed = await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
schema=record_batch_schema,
max_bytes=settings.BATCH_EXPORT_REDSHIFT_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=known_super_columns,
writer_file_kwargs={
"redshift_table": redshift_stage_table if requires_merge else redshift_table,
"redshift_schema": inputs.schema,
"table_columns": schema_columns,
"known_json_columns": set(known_super_columns),
"use_super": properties_type == "SUPER",
"redshift_client": redshift_client,
},
multiple_files=True,
)

if requires_merge:
Expand Down
Loading

0 comments on commit 014db52

Please sign in to comment.