Skip to content

Commit

Permalink
Add system nodes for keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed Jun 12, 2024
1 parent 9a57a66 commit f291f4b
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/Service/KeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,22 @@ namespace RK
using RunnerId = size_t;
using ThreadPoolPtr = std::shared_ptr<ThreadPool>;

const String CLICKHOUSE_KEEPER_SYSTEM_PATH = "/keeper";
const String CLICKHOUSE_KEEPER_API_VERSION_PATH = CLICKHOUSE_KEEPER_SYSTEM_PATH + "/api_version";

enum class KeeperApiVersion : uint8_t
{
ZOOKEEPER_COMPATIBLE = 0,
WITH_FILTERED_LIST,
WITH_MULTI_READ
};

inline constexpr auto CURRENT_KEEPER_API_VERSION = KeeperApiVersion::WITH_MULTI_READ;

const String ZOOKEEPER_SYSTEM_PATH= "/zookeeper";
const String ZOOKEEPER_CONFIG_NODE= ZOOKEEPER_SYSTEM_PATH + "/config";


struct RequestId;

/// Attached session id to request
Expand Down
13 changes: 13 additions & 0 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ KeeperServer::KeeperServer(
checkAndGetSuperdigest(settings->super_digest),
MAX_OBJECT_NODE_SIZE,
request_processor_);

#ifdef COMPATIBLE_MODE_ZOOKEEPER
auto cluster_config = state_manager->getClusterConfig();

String data;
for (const auto & s : cluster_config->get_servers())
{
data += fmt::format("server.{}={}:participant\n", s->get_id(), s->get_endpoint());
}
data += "version=0";
state_machine->getStore().getNode(ZOOKEEPER_CONFIG_NODE)->data = data;
#endif

}
namespace
{
Expand Down
23 changes: 23 additions & 0 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,4 +1596,27 @@ uint64_t KeeperStore::getApproximateDataSize() const
return size_bytes;
}

void KeeperStore::initializeSystemNodes()
{
auto add_node = [&](const String & path)
{
if (!data_tree.count(path))
{
data_tree.emplace(path, std::make_shared<KeeperNode>());
getNode(getParentPath(path))->children.insert(getBaseName(path));
}
};

#ifdef COMPATIBLE_MODE_ZOOKEEPER
add_node(ZOOKEEPER_SYSTEM_PATH);
add_node(ZOOKEEPER_CONFIG_NODE);

#else
add_node(CLICKHOUSE_KEEPER_SYSTEM_PATH);
add_node(CLICKHOUSE_KEEPER_API_VERSION_PATH);

data_tree.get(CLICKHOUSE_KEEPER_API_VERSION_PATH)->data = toString(static_cast<uint8_t>(CURRENT_KEEPER_API_VERSION));
#endif
}

}
3 changes: 3 additions & 0 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,14 @@ class KeeperStore
{
watch_manager.dumpWatches(buf);
}

void dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
watch_manager.dumpWatchesByPath(buf);
}

void initializeSystemNodes();

mutable std::shared_mutex auth_mutex;
SessionAndAuth session_and_auth;

Expand Down
2 changes: 2 additions & 0 deletions src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ NuRaftStateMachine::NuRaftStateMachine(
store.getSessionIDCounter(),
store.getZxid());

store.initializeSystemNodes();

LOG_INFO(log, "Starting background creating snapshot thread.");
snap_thread = ThreadFromGlobalPool([this] { snapThread(); });
}
Expand Down
13 changes: 13 additions & 0 deletions tests/integration/test_back_to_back/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,3 +718,16 @@ def call(total):
assert dumb_watch_triggered_counter == watches_must_be_triggered
finally:
close_zk_clients([fake_zk])


def test_system_nodes(started_cluster):
genuine_zk = fake_zk = None
try:
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
cluster_config = (b'server.1=node1:8103:participant\nserver.2=node2:8103:participant\nserver.3=node3:8103'
b':participant\nversion=0')
assert fake_zk.get('/zookeeper/config')[0] == cluster_config
finally:
close_zk_clients([genuine_zk, fake_zk])

0 comments on commit f291f4b

Please sign in to comment.