From eaf9de75658b780413fe7870806677fd7964543f Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 16 Dec 2024 20:41:36 +0100 Subject: [PATCH] feat: snapshot library CH column migration (#26867) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: James Greenhill --- .../0095_add_snapshot_library_tracking.py | 26 +++++++++++++++++++ .../test/__snapshots__/test_schema.ambr | 20 ++++++++++---- .../session_replay_event_migrations_sql.py | 21 +++++++++++++++ .../sql/session_replay_event_sql.py | 9 +++++-- 4 files changed, 69 insertions(+), 7 deletions(-) create mode 100644 posthog/clickhouse/migrations/0095_add_snapshot_library_tracking.py diff --git a/posthog/clickhouse/migrations/0095_add_snapshot_library_tracking.py b/posthog/clickhouse/migrations/0095_add_snapshot_library_tracking.py new file mode 100644 index 0000000000000..42391c66fa059 --- /dev/null +++ b/posthog/clickhouse/migrations/0095_add_snapshot_library_tracking.py @@ -0,0 +1,26 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.session_recordings.sql.session_replay_event_migrations_sql import ( + DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL, + DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL, + ADD_LIBRARY_SESSION_REPLAY_EVENTS_TABLE_SQL, + ADD_LIBRARY_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL, + ADD_LIBRARY_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL, +) +from posthog.session_recordings.sql.session_replay_event_sql import ( + SESSION_REPLAY_EVENTS_TABLE_MV_SQL, + KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL, +) + +operations = [ + # we have to drop materialized view first so that we're no longer pulling from kakfa + # then we drop the kafka table + run_sql_with_exceptions(DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL()), + run_sql_with_exceptions(DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()), + # now we can alter the target tables + run_sql_with_exceptions(ADD_LIBRARY_SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(ADD_LIBRARY_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(ADD_LIBRARY_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()), + # and then recreate the materialized views and kafka tables + run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()), + run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()), +] diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index b559f673293ec..406f73008fd28 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -481,7 +481,8 @@ size Int64, event_count Int64, message_count Int64, - snapshot_source LowCardinality(Nullable(String)) + snapshot_source LowCardinality(Nullable(String)), + snapshot_library Nullable(String) ) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow') ''' @@ -1496,7 +1497,8 @@ size Int64, event_count Int64, message_count Int64, - snapshot_source LowCardinality(Nullable(String)) + snapshot_source LowCardinality(Nullable(String)), + snapshot_library Nullable(String) ) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow') ''' @@ -2198,8 +2200,10 @@ -- often very useful in incidents or debugging -- because we batch events we expect message_count to be lower than event_count event_count SimpleAggregateFunction(sum, Int64), - -- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent + -- which source the snapshots came from Mobile or Web. Web if absent snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), + -- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter + snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), _timestamp SimpleAggregateFunction(max, DateTime) ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id)) @@ -2221,6 +2225,7 @@ `console_error_count` Int64, `size` Int64, `message_count` Int64, `event_count` Int64, `snapshot_source` AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), + `snapshot_library` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), `_timestamp` Nullable(DateTime) ) AS SELECT @@ -2252,6 +2257,7 @@ sum(message_count) as message_count, sum(event_count) as event_count, argMinState(snapshot_source, first_timestamp) as snapshot_source, + argMinState(snapshot_library, first_timestamp) as snapshot_library, max(_timestamp) as _timestamp FROM posthog_test.kafka_session_replay_events group by session_id, team_id @@ -2787,8 +2793,10 @@ -- often very useful in incidents or debugging -- because we batch events we expect message_count to be lower than event_count event_count SimpleAggregateFunction(sum, Int64), - -- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent + -- which source the snapshots came from Mobile or Web. Web if absent snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), + -- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter + snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), _timestamp SimpleAggregateFunction(max, DateTime) ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}') @@ -3977,8 +3985,10 @@ -- often very useful in incidents or debugging -- because we batch events we expect message_count to be lower than event_count event_count SimpleAggregateFunction(sum, Int64), - -- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent + -- which source the snapshots came from Mobile or Web. Web if absent snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), + -- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter + snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), _timestamp SimpleAggregateFunction(max, DateTime) ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}') diff --git a/posthog/session_recordings/sql/session_replay_event_migrations_sql.py b/posthog/session_recordings/sql/session_replay_event_migrations_sql.py index 0d875e0e4d9ef..684206e6aef5b 100644 --- a/posthog/session_recordings/sql/session_replay_event_migrations_sql.py +++ b/posthog/session_recordings/sql/session_replay_event_migrations_sql.py @@ -135,3 +135,24 @@ table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(), cluster=settings.CLICKHOUSE_CLUSTER, ) + +# migration to add library column to the session replay table +ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN = """ + ALTER TABLE {table_name} on CLUSTER '{cluster}' + ADD COLUMN IF NOT EXISTS snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')) +""" + +ADD_LIBRARY_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN.format( + table_name="session_replay_events", + cluster=settings.CLICKHOUSE_CLUSTER, +) + +ADD_LIBRARY_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN.format( + table_name="writable_session_replay_events", + cluster=settings.CLICKHOUSE_CLUSTER, +) + +ADD_LIBRARY_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN.format( + table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(), + cluster=settings.CLICKHOUSE_CLUSTER, +) diff --git a/posthog/session_recordings/sql/session_replay_event_sql.py b/posthog/session_recordings/sql/session_replay_event_sql.py index 91dd2ff191fe9..09c8f343540e8 100644 --- a/posthog/session_recordings/sql/session_replay_event_sql.py +++ b/posthog/session_recordings/sql/session_replay_event_sql.py @@ -35,7 +35,8 @@ size Int64, event_count Int64, message_count Int64, - snapshot_source LowCardinality(Nullable(String)) + snapshot_source LowCardinality(Nullable(String)), + snapshot_library Nullable(String) ) ENGINE = {engine} """ @@ -75,8 +76,10 @@ -- often very useful in incidents or debugging -- because we batch events we expect message_count to be lower than event_count event_count SimpleAggregateFunction(sum, Int64), - -- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent + -- which source the snapshots came from Mobile or Web. Web if absent snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), + -- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter + snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), _timestamp SimpleAggregateFunction(max, DateTime) ) ENGINE = {engine} """ @@ -147,6 +150,7 @@ sum(message_count) as message_count, sum(event_count) as event_count, argMinState(snapshot_source, first_timestamp) as snapshot_source, +argMinState(snapshot_library, first_timestamp) as snapshot_library, max(_timestamp) as _timestamp FROM {database}.kafka_session_replay_events group by session_id, team_id @@ -169,6 +173,7 @@ `console_error_count` Int64, `size` Int64, `message_count` Int64, `event_count` Int64, `snapshot_source` AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), +`snapshot_library` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), `_timestamp` Nullable(DateTime) )""", )