Skip to content

Commit

Permalink
feat: snapshot library CH column migration (#26867)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: James Greenhill <[email protected]>
  • Loading branch information
3 people authored Dec 16, 2024
1 parent ed0ba35 commit eaf9de7
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.session_recordings.sql.session_replay_event_migrations_sql import (
DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_LIBRARY_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_LIBRARY_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_LIBRARY_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
from posthog.session_recordings.sql.session_replay_event_sql import (
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
)

operations = [
# we have to drop materialized view first so that we're no longer pulling from kakfa
# then we drop the kafka table
run_sql_with_exceptions(DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
run_sql_with_exceptions(DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
# now we can alter the target tables
run_sql_with_exceptions(ADD_LIBRARY_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(ADD_LIBRARY_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(ADD_LIBRARY_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()),
# and then recreate the materialized views and kafka tables
run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
]
20 changes: 15 additions & 5 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@
size Int64,
event_count Int64,
message_count Int64,
snapshot_source LowCardinality(Nullable(String))
snapshot_source LowCardinality(Nullable(String)),
snapshot_library Nullable(String)
) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')

'''
Expand Down Expand Up @@ -1496,7 +1497,8 @@
size Int64,
event_count Int64,
message_count Int64,
snapshot_source LowCardinality(Nullable(String))
snapshot_source LowCardinality(Nullable(String)),
snapshot_library Nullable(String)
) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')

'''
Expand Down Expand Up @@ -2198,8 +2200,10 @@
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
-- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent
-- which source the snapshots came from Mobile or Web. Web if absent
snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
-- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter
snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id))

Expand All @@ -2221,6 +2225,7 @@
`console_error_count` Int64, `size` Int64, `message_count` Int64,
`event_count` Int64,
`snapshot_source` AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
`snapshot_library` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
`_timestamp` Nullable(DateTime)
)
AS SELECT
Expand Down Expand Up @@ -2252,6 +2257,7 @@
sum(message_count) as message_count,
sum(event_count) as event_count,
argMinState(snapshot_source, first_timestamp) as snapshot_source,
argMinState(snapshot_library, first_timestamp) as snapshot_library,
max(_timestamp) as _timestamp
FROM posthog_test.kafka_session_replay_events
group by session_id, team_id
Expand Down Expand Up @@ -2787,8 +2793,10 @@
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
-- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent
-- which source the snapshots came from Mobile or Web. Web if absent
snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
-- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter
snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')

Expand Down Expand Up @@ -3977,8 +3985,10 @@
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
-- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent
-- which source the snapshots came from Mobile or Web. Web if absent
snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
-- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter
snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,24 @@
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
)

# migration to add library column to the session replay table
ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN = """
ALTER TABLE {table_name} on CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC'))
"""

ADD_LIBRARY_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN.format(
table_name="session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
)

ADD_LIBRARY_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN.format(
table_name="writable_session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
)

ADD_LIBRARY_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_LIBRARY_COLUMN.format(
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
)
9 changes: 7 additions & 2 deletions posthog/session_recordings/sql/session_replay_event_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
size Int64,
event_count Int64,
message_count Int64,
snapshot_source LowCardinality(Nullable(String))
snapshot_source LowCardinality(Nullable(String)),
snapshot_library Nullable(String)
) ENGINE = {engine}
"""

Expand Down Expand Up @@ -75,8 +76,10 @@
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
-- which source the snapshots came from Android, iOS, Mobile, Web. Web if absent
-- which source the snapshots came from Mobile or Web. Web if absent
snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
-- knowing something is mobile isn't enough, we need to know if e.g. RN or flutter
snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = {engine}
"""
Expand Down Expand Up @@ -147,6 +150,7 @@
sum(message_count) as message_count,
sum(event_count) as event_count,
argMinState(snapshot_source, first_timestamp) as snapshot_source,
argMinState(snapshot_library, first_timestamp) as snapshot_library,
max(_timestamp) as _timestamp
FROM {database}.kafka_session_replay_events
group by session_id, team_id
Expand All @@ -169,6 +173,7 @@
`console_error_count` Int64, `size` Int64, `message_count` Int64,
`event_count` Int64,
`snapshot_source` AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')),
`snapshot_library` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')),
`_timestamp` Nullable(DateTime)
)""",
)
Expand Down

0 comments on commit eaf9de7

Please sign in to comment.