Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove raftkeeper 1.0 only code #352

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/how-to-monitor-and-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ max_stored_snapshots=5
shutdown_timeout=5000
startup_timeout=6000000
raft_logs_level=information
rotate_log_storage_interval=100000
log_fsync_mode=fsync_parallel
log_fsync_interval=1000
nuraft_thread_size=16
Expand Down
2 changes: 1 addition & 1 deletion src/Service/ConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Poco/Thread.h>
#include <Poco/Util/ServerApplication.h>

#include <Common/IO/ReadBufferFromString.h>
#include <Common/IO/WriteBufferFromString.h>
#include <Network/SocketAcceptor.h>
#include <Network/SocketNotification.h>
Expand Down Expand Up @@ -62,7 +63,6 @@ class ConnectionHandler
void onSocketError(const Notification &);

/// current connection statistics
ConnectionStats getConnectionStats() const;
void dumpStats(WriteBufferFromOwnString & buf, bool brief);

/// reset current connection statistics
Expand Down
1 change: 1 addition & 0 deletions src/Service/ForwardConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Poco/Net/StreamSocket.h>
#include <Poco/Thread.h>

#include <Common/IO/ReadBufferFromString.h>
#include <Network/SocketAcceptor.h>
#include <Network/SocketNotification.h>
#include <Network/SocketReactor.h>
Expand Down
3 changes: 1 addition & 2 deletions src/Service/FourLetterCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <Poco/Environment.h>
#include <Poco/Path.h>
#include <Poco/String.h>
#include "Common/StringUtils.h"
#include <Common/config_version.h>
#include <Common/getCurrentProcessFDCount.h>
#include <Common/getMaxFileDescriptorCount.h>
Expand Down Expand Up @@ -258,7 +257,7 @@ String MonitorCommand::run()
print(ret, "watch_count", state_machine.getTotalWatchesCount());
print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount());
print(ret, "approximate_data_size", state_machine.getApproximateDataSize());
print(ret, "in_snapshot", state_machine.getSnapshoting());
print(ret, "in_snapshot", state_machine.isCreatingSnapshot());

#if defined(__linux__) || defined(__APPLE__)
print(ret, "open_file_descriptor_count", getCurrentProcessFDCount());
Expand Down
6 changes: 3 additions & 3 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ ptr<nuraft::cmd_result<ptr<buffer>>> KeeperServer::pushRequestBatch(const std::v
{
LOG_DEBUG(log, "Push batch requests of size {}", request_batch.size());
std::vector<ptr<buffer>> entries;
for (const auto & request_session : request_batch)
for (const auto & request : request_batch)
{
LOG_TRACE(log, "Push request {}", request_session.toSimpleString());
entries.push_back(serializeKeeperRequest(request_session));
LOG_TRACE(log, "Push request {}", request.toSimpleString());
entries.push_back(serializeKeeperRequest(request));
}
/// append_entries write request
ptr<nuraft::cmd_result<ptr<buffer>>> result = raft_instance->append_entries(entries);
Expand Down
20 changes: 8 additions & 12 deletions src/Service/KeeperUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ ptr<buffer> serializeKeeperRequest(const RequestForSession & request)
return out.getBuffer();
}

RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
ptr<RequestForSession> deserializeKeeperRequest(nuraft::buffer & data)
{
ptr<RequestForSession> request = cs_new<RequestForSession>();
ReadBufferFromNuRaftBuffer buffer(data);
RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
readIntBinary(request->session_id, buffer);

int32_t length;
Coordination::read(length, buffer);
Expand All @@ -64,17 +64,13 @@ RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
// bool is_internal;
// Coordination::read(is_internal, buffer);

request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
request->request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->request->xid = xid;
request->request->readImpl(buffer);

if (!buffer.eof())
Coordination::read(request_for_session.create_time, buffer);
else /// backward compatibility
request_for_session.create_time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
Coordination::read(request->create_time, buffer);

return request_for_session;
return request;
}

ptr<log_entry> cloneLogEntry(const ptr<log_entry> & entry)
Expand Down
15 changes: 11 additions & 4 deletions src/Service/KeeperUtils.h
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
#pragma once

#include <fstream>
#include <time.h>
#include <Service/Crc32.h>
#include <ZooKeeper/IKeeper.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <libnuraft/log_entry.hxx>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>
#include <Service/KeeperCommon.h>


namespace RK
{

inline UInt64 getCurrentTimeMilliseconds()
{
return duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

inline UInt64 getCurrentTimeMicroseconds()
{
return duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

/// Serialize and deserialize ZooKeeper request to log
nuraft::ptr<nuraft::buffer> serializeKeeperRequest(const RequestForSession & request);
RequestForSession deserializeKeeperRequest(nuraft::buffer & data);
nuraft::ptr<RequestForSession> deserializeKeeperRequest(nuraft::buffer & data);

nuraft::ptr<nuraft::log_entry> cloneLogEntry(const nuraft::ptr<nuraft::log_entry> & entry);

Expand Down
8 changes: 2 additions & 6 deletions src/Service/LastCommittedIndexManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}

inline UInt64 getCurrentTimeMicroseconds()
{
return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

LastCommittedIndexManager::LastCommittedIndexManager(const String & log_dir) : log(&Poco::Logger::get("LastCommittedIndexManager"))
{
Expand All @@ -35,7 +31,7 @@ LastCommittedIndexManager::LastCommittedIndexManager(const String & log_dir) : l
throwFromErrno("Failed to open committed log index file", ErrorCodes::CANNOT_OPEN_FILE);

previous_persist_time = getCurrentTimeMicroseconds();
persist_thread = ThreadFromGlobalPool([this] { persistThread(); });
bg_persist_thread = ThreadFromGlobalPool([this] { persistThread(); });
}

LastCommittedIndexManager::~LastCommittedIndexManager()
Expand Down Expand Up @@ -117,7 +113,7 @@ void LastCommittedIndexManager::shutDown()
if (!is_shut_down)
{
is_shut_down = true;
persist_thread.join();
bg_persist_thread.join();

::close(persist_file_fd);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Service/LastCommittedIndexManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LastCommittedIndexManager
UInt64 static constexpr PERSIST_INTERVAL_US = 100 * 1000;
std::string_view static constexpr FILE_NAME = "last_committed_index.bin";

ThreadFromGlobalPool persist_thread;
ThreadFromGlobalPool bg_persist_thread;
std::atomic<bool> is_shut_down{false};

String persist_file_name;
Expand Down
5 changes: 0 additions & 5 deletions src/Service/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
namespace RK
{

inline UInt64 getCurrentTimeMilliseconds()
{
return duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
};


/**
* Uses the reservoir sampling algorithm to sample statistical values
Expand Down
7 changes: 3 additions & 4 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
#include <stdio.h>
#include <unistd.h>

#include <common/find_symbols.h>

#include <Poco/DateTime.h>
#include <Poco/DateTimeFormatter.h>
#include <Poco/File.h>
Expand All @@ -15,6 +13,7 @@
#include <Common/Exception.h>
#include <Common/Stopwatch.h>

#include <Service/Crc32.h>
#include <Service/KeeperUtils.h>
#include <Service/NuRaftLogSnapshot.h>
#include <Service/ReadBufferFromNuRaftBuffer.h>
Expand Down Expand Up @@ -776,12 +775,12 @@ bool KeeperSnapshotManager::receiveSnapshotMeta(snapshot & meta)
return true;
}

bool KeeperSnapshotManager::existSnapshot(const snapshot & meta)
bool KeeperSnapshotManager::existSnapshot(const snapshot & meta) const
{
return snapshots.find(getSnapshotStoreMapKey(meta)) != snapshots.end();
}

bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id)
bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id) const
{
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
if (it == snapshots.end())
Expand Down
4 changes: 2 additions & 2 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ class KeeperSnapshotManager
bool saveSnapshotObject(snapshot & meta, ulong obj_id, buffer & buffer);

/// whether snapshot exists
bool existSnapshot(const snapshot & meta);
bool existSnapshot(const snapshot & meta) const;

/// whether snapshot object exists
bool existSnapshotObject(const snapshot & meta, ulong obj_id);
bool existSnapshotObject(const snapshot & meta, ulong obj_id) const;

/// load snapshot object, invoked when leader should send snapshot to others.
bool loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr<buffer> & buffer);
Expand Down
Loading
Loading