Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for coordinator schemas #28031

Merged
merged 23 commits into from
Jan 31, 2025
Merged

Conversation

Daesgar
Copy link
Contributor

@Daesgar Daesgar commented Jan 29, 2025

Problem

We run ClickHouse topologies in which nodes have different roles. Their schema differs depending on those roles, and we don't support running different schema creation statements depending on them.

Changes

Add support for managing schemas on coordinator nodes. To achieve this:

  • Use the ClickhouseCluster class so we can run statements on any combination of nodes from the cluster
  • Use the posthog_migrations cluster that contains all nodes, including coordinators. The cluster is only used to populate ClickhouseCluster info, so I added a new setting (CLICKHOUSE_MIGRATIONS_CLUSTER) to reference it and keep all the migrations running as expected. Now the HostInfo contains variables for the node role (worker / coordinator) and cluster type (online / offline), that is fed from macros.
    • To use those macros correctly, I changed the query from system.clusters to clusterAllReplicas.
  • The migration function can now receive an additional parameter
  • By default, migrations run on all workers to keep compatibility with what was done until now
    • I will probably update this because most of the queries run an ON CLUSTER, so we would be running an ON CLUSTER query on every worker. This should not be an issue as all existing migrations are applied and locally we only have 1 node in the default cluster, but still.

Before shipping this I need to:

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Does this work well for both Cloud and self-hosted?

Yes.

How did you test this code?

Tested locally running all the migrations and adding new dummy ones to ensure they are created in the expected nodes.

Also tested the ClickHouse configuration for the hobby deployment to ensure that migrations run correctly.

@Daesgar Daesgar changed the title WIP feat: add support for coordinator schemas feat: add support for coordinator schemas Jan 29, 2025
@Daesgar Daesgar marked this pull request as ready for review January 30, 2025 12:40
@Daesgar Daesgar requested a review from a team as a code owner January 30, 2025 12:40
@orian orian self-requested a review January 30, 2025 12:43
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR adds support for managing ClickHouse schemas across different node roles (coordinator/worker) in distributed clusters.

  • Added new posthog_migrations cluster in config.d/worker.xml and config.d/coordinator.xml to handle schema changes across all node types
  • Introduced NodeRole enum (ALL/COORDINATOR/WORKER) in client/connection.py to control migration execution targets
  • Added host_cluster_type and host_cluster_role macros in ClickHouse configs to identify node roles
  • Modified ClickhouseCluster class to filter operations by node role using map_all_hosts(node_role=NodeRole.X)
  • Added CLICKHOUSE_MIGRATIONS_CLUSTER setting (defaults to 'posthog_migrations') to manage schema changes across all nodes

💡 (3/5) Reply to the bot's comments like "Can you suggest a fix for this @greptileai?" or ask follow-up questions!

20 file(s) reviewed, 11 comment(s)
Edit PR Review Bot Settings | Greptile

Comment on lines +85 to +95
volumes:
# this new entrypoint file is to fix a bug detailed here https://github.com/ClickHouse/ClickHouse/pull/59991
# revert this when we upgrade clickhouse
- ./docker/clickhouse/entrypoint.sh:/entrypoint.sh
- ./posthog/idl:/idl
- ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
- ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ./docker/clickhouse/config.d/coordinator.xml:/etc/clickhouse-server/config.d/coordinator.xml
- ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml
- ./docker/clickhouse/user_defined_function.xml:/etc/clickhouse-server/user_defined_function.xml
- ./posthog/user_scripts:/var/lib/clickhouse/user_scripts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: consider deduplicating volume mounts between clickhouse and clickhouse-coordinator services using YAML anchors

.flox/env/manifest.toml Outdated Show resolved Hide resolved
</remote_servers>

<macros>
<shard>02</shard>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: shard number '02' may conflict with existing shards if not carefully coordinated across the cluster

def run_migration():
if node_role == NodeRole.ALL:
logger.info(" Running migration on coordinators and workers")
return cluster.map_all_hosts(lambda client: client.execute(sql)).result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No error handling for failed migrations across nodes - could leave cluster in inconsistent state

Comment on lines +104 to 107
SELECT host_address, port, shard_num, replica_num, getMacro('hostClusterType') as host_cluster_type, getMacro('hostClusterRole') as host_cluster_role
FROM clusterAllReplicas(%(name)s, system.clusters)
WHERE name = %(name)s and is_local
ORDER BY shard_num, replica_num
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Using is_local filter with clusterAllReplicas may return no results if default_database is set in remote_servers config. Ensure this is removed as mentioned in PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR ready for that!

) -> ClickhouseCluster:
extra_hosts = []
for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()):
extra_hosts.append(ConnectionInfo(host_config.pop("host")))
extra_hosts.append(ConnectionInfo(host_config.pop("host"), None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Setting port to None for extra_hosts while adding port support could cause connection issues if the default port is not correct

@@ -53,12 +54,14 @@ def handle(self, *args, **options):
self.migrate(CLICKHOUSE_HTTP_URL, options)

def migrate(self, host, options):
# Infi only creates the DB in one node, but not the rest. Create it before running migrations.
self._create_database_if_not_exists(CLICKHOUSE_DATABASE, CLICKHOUSE_MIGRATIONS_CLUSTER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Database creation should be wrapped in a try/catch block to handle potential connection failures gracefully

Comment on lines 113 to 114
with default_client() as client:
client.execute(f"CREATE DATABASE IF NOT EXISTS {database} ON CLUSTER '{cluster}'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Raw SQL string interpolation creates potential SQL injection risk. Use parameterized query instead

Suggested change
with default_client() as client:
client.execute(f"CREATE DATABASE IF NOT EXISTS {database} ON CLUSTER '{cluster}'")
with default_client() as client:
client.execute("CREATE DATABASE IF NOT EXISTS %(database)s ON CLUSTER %(cluster)s", {'database': database, 'cluster': cluster})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The driver only support parameter replace for select and insert queries. :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some reptile comments are pure guesses, but other are quite good ;)

Comment on lines +93 to +98
def mock_get_task_function(_, host: HostInfo, fn: Callable[[Client], T]) -> Callable[[], T]:
if host.host_cluster_role == NodeRole.WORKER.value.lower():
times_called[NodeRole.WORKER] += 1
elif host.host_cluster_role == NodeRole.COORDINATOR.value.lower():
times_called[NodeRole.COORDINATOR] += 1
return lambda: fn(Mock())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: mock_get_task_function is using a private method name with double underscores. Consider using the public interface or documenting why the private method needs to be mocked.


cluster = ClickhouseCluster(bootstrap_client_mock)

times_called: defaultdict[NodeRole, int] = defaultdict(int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: defaultdict usage here could mask errors if NodeRole values are incorrect. Consider using a regular dict with explicit initialization.

@@ -130,15 +150,23 @@ def any_host(self, fn: Callable[[Client], T]) -> Future[T]:
host = self.__hosts[0]
return executor.submit(self.__get_task_function(host, fn))

def map_all_hosts(self, fn: Callable[[Client], T], concurrency: int | None = None) -> FuturesMap[HostInfo, T]:
def map_all_hosts(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_hosts would be more appropriate, as it's more of map_hosts_by_role ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, I'll update that

Copy link
Contributor

@orian orian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a nontrivial learning read ;-)

Some 10 lines python list-comprehensions are quite hard for me to read.

@Daesgar Daesgar enabled auto-merge (squash) January 31, 2025 08:39
@Daesgar Daesgar merged commit 9b30f8d into master Jan 31, 2025
95 checks passed
@Daesgar Daesgar deleted the coordinator-migrations branch January 31, 2025 09:00
Copy link

sentry-io bot commented Jan 31, 2025

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ **ServerException: DB::Exception: All connection tries failed. Log: ** posthog.clickhouse.cluster in __init__ View Issue
  • ‼️ ServerException: DB::Exception: Received from ch23.posthog.net:9000. DB::Exception: Too many simultaneous queries.... posthog.clickhouse.cluster in __init__ View Issue
  • ‼️ **ServerException: DB::Exception: All connection tries failed. Log: ** posthog.clickhouse.cluster in __init__ View Issue
  • ‼️ ServerException: DB::Exception: There was an error on [ch20.posthog.net:9000]: Code: 202. DB::Exception: Too many ... posthog.management.commands.migrate_clickhouse ... View Issue
  • ‼️ ServerException: DB::Exception: Received from ch23.posthog.net:9000. DB::Exception: Too many simultaneous queries.... posthog.clickhouse.cluster in __init__ View Issue

Did you find this useful? React with a 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants