Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Hide replicas from CLUSTER subcmds in managed mode #4174

Merged
merged 2 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ ABSL_FLAG(std::string, cluster_node_id, "",
"ID within a cluster, used for slot assignment. MUST be unique. If empty, uses master "
"replication ID (random string)");

ABSL_FLAG(bool, managed_service_info, false,
"Hides some implementation details from users when true (i.e. in managed service env)");

ABSL_DECLARE_FLAG(int32_t, port);
ABSL_DECLARE_FLAG(uint16_t, announce_port);

Expand Down Expand Up @@ -122,10 +125,12 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co

info.master = {.id = id_, .ip = preferred_endpoint, .port = preferred_port};

for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
}
}
} else {
// TODO: We currently don't save the master's ID in the replica
Expand Down
45 changes: 33 additions & 12 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def is_local_host(ip: str) -> bool:

info = answer[2]
assert len(info) == 3
ip_addr = str(info[0], "utf-8")
ip_addr = info[0]
assert is_local_host(ip_addr)
assert info[1] == port

Expand All @@ -244,29 +244,29 @@ def is_local_host(ip: str) -> bool:
replica = replicas[i - 3]
rep_info = answer[i]
assert len(rep_info) == 3
ip_addr = str(rep_info[0], "utf-8")
ip_addr = rep_info[0]
assert is_local_host(ip_addr)
assert rep_info[1] == replica.port
assert rep_info[2] == replica.id

return True


@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"})
# --managed_service_info means that Dragonfly is running in a managed service, so some details
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT)
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]

df_factory.start_all([master, *replicas])

c_master = aioredis.Redis(port=master.port)
master_id = (await c_master.execute_command("CLUSTER MYID")).decode("utf-8")
c_master = master.client()
c_master_admin = master.admin_client()
master_id = await c_master.execute_command("CLUSTER MYID")

c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
replica_ids = [
(await c_replica.execute_command("CLUSTER MYID")).decode("utf-8")
for c_replica in c_replicas
]
c_replicas = [replica.client() for replica in replicas]
replica_ids = [(await c_replica.execute_command("CLUSTER MYID")) for c_replica in c_replicas]

for replica, c_replica in zip(replicas, c_replicas):
res = await c_replica.execute_command("CLUSTER SLOTS")
Expand All @@ -279,7 +279,7 @@ async def test_emulated_cluster_with_replicas(df_factory):
# Connect replicas to master
for replica, c_replica in zip(replicas, c_replicas):
rc = await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert str(rc, "utf-8") == "OK"
assert rc == "OK"

await asyncio.sleep(0.5)

Expand All @@ -290,6 +290,13 @@ async def test_emulated_cluster_with_replicas(df_factory):
)

res = await c_master.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
port=master.port,
answer=res[0],
replicas=[],
)

res = await c_master_admin.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
port=master.port,
answer=res[0],
Expand All @@ -308,6 +315,20 @@ async def test_emulated_cluster_with_replicas(df_factory):
"node_id": master_id,
"slots": [["0", "16383"]],
},
}

assert await c_master_admin.execute_command("CLUSTER NODES") == {
f"127.0.0.1:{master.port}": {
"connected": True,
"epoch": "0",
"flags": "myself,master",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": "-",
"migrations": [],
"node_id": master_id,
"slots": [["0", "16383"]],
},
f"127.0.0.1:{replicas[0].port}": {
"connected": True,
"epoch": "0",
Expand Down
Loading