Skip to content

Commit

Permalink
Remove raftkeeper 1.0 only code (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo authored Sep 9, 2024
1 parent 97d58cd commit 78c8285
Show file tree
Hide file tree
Showing 22 changed files with 203 additions and 375 deletions.
1 change: 0 additions & 1 deletion docs/how-to-monitor-and-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ max_stored_snapshots=5
shutdown_timeout=5000
startup_timeout=6000000
raft_logs_level=information
rotate_log_storage_interval=100000
log_fsync_mode=fsync_parallel
log_fsync_interval=1000
nuraft_thread_size=16
Expand Down
2 changes: 1 addition & 1 deletion src/Service/ConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Poco/Thread.h>
#include <Poco/Util/ServerApplication.h>

#include <Common/IO/ReadBufferFromString.h>
#include <Common/IO/WriteBufferFromString.h>
#include <Network/SocketAcceptor.h>
#include <Network/SocketNotification.h>
Expand Down Expand Up @@ -62,7 +63,6 @@ class ConnectionHandler
void onSocketError(const Notification &);

/// current connection statistics
ConnectionStats getConnectionStats() const;
void dumpStats(WriteBufferFromOwnString & buf, bool brief);

/// reset current connection statistics
Expand Down
1 change: 1 addition & 0 deletions src/Service/ForwardConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Poco/Net/StreamSocket.h>
#include <Poco/Thread.h>

#include <Common/IO/ReadBufferFromString.h>
#include <Network/SocketAcceptor.h>
#include <Network/SocketNotification.h>
#include <Network/SocketReactor.h>
Expand Down
3 changes: 1 addition & 2 deletions src/Service/FourLetterCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <Poco/Environment.h>
#include <Poco/Path.h>
#include <Poco/String.h>
#include "Common/StringUtils.h"
#include <Common/config_version.h>
#include <Common/getCurrentProcessFDCount.h>
#include <Common/getMaxFileDescriptorCount.h>
Expand Down Expand Up @@ -258,7 +257,7 @@ String MonitorCommand::run()
print(ret, "watch_count", state_machine.getTotalWatchesCount());
print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount());
print(ret, "approximate_data_size", state_machine.getApproximateDataSize());
print(ret, "in_snapshot", state_machine.getSnapshoting());
print(ret, "in_snapshot", state_machine.isCreatingSnapshot());

#if defined(__linux__) || defined(__APPLE__)
print(ret, "open_file_descriptor_count", getCurrentProcessFDCount());
Expand Down
6 changes: 3 additions & 3 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ ptr<nuraft::cmd_result<ptr<buffer>>> KeeperServer::pushRequestBatch(const std::v
{
LOG_DEBUG(log, "Push batch requests of size {}", request_batch.size());
std::vector<ptr<buffer>> entries;
for (const auto & request_session : request_batch)
for (const auto & request : request_batch)
{
LOG_TRACE(log, "Push request {}", request_session.toSimpleString());
entries.push_back(serializeKeeperRequest(request_session));
LOG_TRACE(log, "Push request {}", request.toSimpleString());
entries.push_back(serializeKeeperRequest(request));
}
/// append_entries write request
ptr<nuraft::cmd_result<ptr<buffer>>> result = raft_instance->append_entries(entries);
Expand Down
20 changes: 8 additions & 12 deletions src/Service/KeeperUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ ptr<buffer> serializeKeeperRequest(const RequestForSession & request)
return out.getBuffer();
}

RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
ptr<RequestForSession> deserializeKeeperRequest(nuraft::buffer & data)
{
ptr<RequestForSession> request = cs_new<RequestForSession>();
ReadBufferFromNuRaftBuffer buffer(data);
RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
readIntBinary(request->session_id, buffer);

int32_t length;
Coordination::read(length, buffer);
Expand All @@ -64,17 +64,13 @@ RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
// bool is_internal;
// Coordination::read(is_internal, buffer);

request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
request->request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->request->xid = xid;
request->request->readImpl(buffer);

if (!buffer.eof())
Coordination::read(request_for_session.create_time, buffer);
else /// backward compatibility
request_for_session.create_time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
Coordination::read(request->create_time, buffer);

return request_for_session;
return request;
}

ptr<log_entry> cloneLogEntry(const ptr<log_entry> & entry)
Expand Down
15 changes: 11 additions & 4 deletions src/Service/KeeperUtils.h
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
#pragma once

#include <fstream>
#include <time.h>
#include <Service/Crc32.h>
#include <ZooKeeper/IKeeper.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <libnuraft/log_entry.hxx>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>
#include <Service/KeeperCommon.h>


namespace RK
{

inline UInt64 getCurrentTimeMilliseconds()
{
return duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

inline UInt64 getCurrentTimeMicroseconds()
{
return duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

/// Serialize and deserialize ZooKeeper request to log
nuraft::ptr<nuraft::buffer> serializeKeeperRequest(const RequestForSession & request);
RequestForSession deserializeKeeperRequest(nuraft::buffer & data);
nuraft::ptr<RequestForSession> deserializeKeeperRequest(nuraft::buffer & data);

nuraft::ptr<nuraft::log_entry> cloneLogEntry(const nuraft::ptr<nuraft::log_entry> & entry);

Expand Down
8 changes: 2 additions & 6 deletions src/Service/LastCommittedIndexManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}

inline UInt64 getCurrentTimeMicroseconds()
{
return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

LastCommittedIndexManager::LastCommittedIndexManager(const String & log_dir) : log(&Poco::Logger::get("LastCommittedIndexManager"))
{
Expand All @@ -35,7 +31,7 @@ LastCommittedIndexManager::LastCommittedIndexManager(const String & log_dir) : l
throwFromErrno("Failed to open committed log index file", ErrorCodes::CANNOT_OPEN_FILE);

previous_persist_time = getCurrentTimeMicroseconds();
persist_thread = ThreadFromGlobalPool([this] { persistThread(); });
bg_persist_thread = ThreadFromGlobalPool([this] { persistThread(); });
}

LastCommittedIndexManager::~LastCommittedIndexManager()
Expand Down Expand Up @@ -117,7 +113,7 @@ void LastCommittedIndexManager::shutDown()
if (!is_shut_down)
{
is_shut_down = true;
persist_thread.join();
bg_persist_thread.join();

::close(persist_file_fd);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Service/LastCommittedIndexManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LastCommittedIndexManager
UInt64 static constexpr PERSIST_INTERVAL_US = 100 * 1000;
std::string_view static constexpr FILE_NAME = "last_committed_index.bin";

ThreadFromGlobalPool persist_thread;
ThreadFromGlobalPool bg_persist_thread;
std::atomic<bool> is_shut_down{false};

String persist_file_name;
Expand Down
5 changes: 0 additions & 5 deletions src/Service/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
namespace RK
{

inline UInt64 getCurrentTimeMilliseconds()
{
return duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
};


/**
* Uses the reservoir sampling algorithm to sample statistical values
Expand Down
7 changes: 3 additions & 4 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
#include <stdio.h>
#include <unistd.h>

#include <common/find_symbols.h>

#include <Poco/DateTime.h>
#include <Poco/DateTimeFormatter.h>
#include <Poco/File.h>
Expand All @@ -15,6 +13,7 @@
#include <Common/Exception.h>
#include <Common/Stopwatch.h>

#include <Service/Crc32.h>
#include <Service/KeeperUtils.h>
#include <Service/NuRaftLogSnapshot.h>
#include <Service/ReadBufferFromNuRaftBuffer.h>
Expand Down Expand Up @@ -776,12 +775,12 @@ bool KeeperSnapshotManager::receiveSnapshotMeta(snapshot & meta)
return true;
}

bool KeeperSnapshotManager::existSnapshot(const snapshot & meta)
bool KeeperSnapshotManager::existSnapshot(const snapshot & meta) const
{
return snapshots.find(getSnapshotStoreMapKey(meta)) != snapshots.end();
}

bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id)
bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id) const
{
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
if (it == snapshots.end())
Expand Down
4 changes: 2 additions & 2 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ class KeeperSnapshotManager
bool saveSnapshotObject(snapshot & meta, ulong obj_id, buffer & buffer);

/// whether snapshot exists
bool existSnapshot(const snapshot & meta);
bool existSnapshot(const snapshot & meta) const;

/// whether snapshot object exists
bool existSnapshotObject(const snapshot & meta, ulong obj_id);
bool existSnapshotObject(const snapshot & meta, ulong obj_id) const;

/// load snapshot object, invoked when leader should send snapshot to others.
bool loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr<buffer> & buffer);
Expand Down
Loading

0 comments on commit 78c8285

Please sign in to comment.