Skip to content

Commit

Permalink
Fix logical error last_segment not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Sep 2, 2024
1 parent d2936b8 commit 5c3a35d
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 142 deletions.
6 changes: 0 additions & 6 deletions src/Service/KeeperDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,6 @@ void KeeperDispatcher::unRegisterSessionResponseCallbackWithoutLock(int64_t id)
session_response_callbacks.erase(it);
}

[[maybe_unused]] void KeeperDispatcher::registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected)
{
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
registerUserResponseCallBackWithoutLock(session_id, callback, is_reconnected);
}

void KeeperDispatcher::registerUserResponseCallBackWithoutLock(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected)
{
if (session_id == 0)
Expand Down
1 change: 0 additions & 1 deletion src/Service/KeeperDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
void unRegisterForwarderResponseCallBack(ForwardClientId client_id);

/// Register response callback for user request
[[maybe_unused]] void registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false);
void registerUserResponseCallBackWithoutLock(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false);
void unregisterUserResponseCallBack(int64_t session_id);
void unregisterUserResponseCallBackWithoutLock(int64_t session_id);
Expand Down
14 changes: 0 additions & 14 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,6 @@ void KeeperServer::handleRemoteSession(int64_t session_id, int64_t expiration_ti
state_machine->getStore().handleRemoteSession(session_id, expiration_time);
}

[[maybe_unused]] int64_t KeeperServer::getSessionTimeout(int64_t session_id)
{
LOG_DEBUG(log, "New session timeout for {}", session_id);
if (state_machine->getStore().containsSession(session_id))
{
return state_machine->getStore().getSessionAndTimeOut().find(session_id)->second;
}
else
{
LOG_WARNING(log, "Not found session timeout for {}", session_id);
return -1;
}
}

bool KeeperServer::isLeader() const
{
return raft_instance->is_leader();
Expand Down
4 changes: 0 additions & 4 deletions src/Service/KeeperServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ class KeeperServer
/// it will update the snapshot itself.
void handleRemoteSession(int64_t session_id, int64_t expiration_time);

/// Get the initialized timeout for a session.
/// Return initialized timeout, or -1 if session not exist.
[[maybe_unused]] int64_t getSessionTimeout(int64_t session_id);

/// will invoke waitInit
void startup();

Expand Down
10 changes: 5 additions & 5 deletions src/Service/NuRaftFileLogStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ ulong NuRaftFileLogStore::append(ptr<log_entry> & entry)

void NuRaftFileLogStore::write_at(ulong index, ptr<log_entry> & entry)
{
if (segment_store->writeAt(index, entry) == index)
log_queue.clear();
segment_store->writeAt(index, entry);

log_queue.clear();
last_log_entry = entry;

/// notify parallel fsync thread
/// log store file fsync
if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL && entry->get_val_type() != log_val_type::app_log)
parallel_fsync_event->set();

Expand Down Expand Up @@ -255,8 +255,8 @@ ptr<log_entry> NuRaftFileLogStore::entry_at(ulong index)

ulong NuRaftFileLogStore::term_at(ulong index)
{
if (entry_at(index))
return entry_at(index)->get_term();
if (auto entry = entry_at(index))
return entry->get_term();
return 0;
}

Expand Down
60 changes: 23 additions & 37 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include <Service/NuRaftLogSegment.h>

#include <charconv>
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <unistd.h>
#include <charconv>

#include <Poco/File.h>

Expand Down Expand Up @@ -33,16 +33,6 @@ namespace ErrorCodes

using namespace nuraft;

[[maybe_unused]] int ftruncateUninterrupted(int fd, off_t length)
{
int rc;
do
{
rc = ftruncate(fd, length);
} while (rc == -1 && errno == EINTR);
return rc;
}

bool compareSegment(const ptr<NuRaftLogSegment> & lhs, const ptr<NuRaftLogSegment> & rhs)
{
return lhs->firstIndex() < rhs->firstIndex();
Expand Down Expand Up @@ -661,14 +651,13 @@ UInt64 LogSegmentStore::appendEntry(const ptr<log_entry> & entry)
return open_segment->appendEntry(entry, last_log_index);
}

UInt64 LogSegmentStore::writeAt(UInt64 index, const ptr<log_entry> & entry)
void LogSegmentStore::writeAt(UInt64 index, const ptr<log_entry> & entry)
{
truncateLog(index - 1);
if (index == lastLogIndex() + 1)
return appendEntry(entry);

LOG_WARNING(log, "writeAt log index {} failed, firstLogIndex {}, lastLogIndex {}.", index, firstLogIndex(), lastLogIndex());
return -1;
appendEntry(entry);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Fail to write log at {}, log store index range [{}, {}].", index, firstLogIndex(), lastLogIndex());
}

ptr<log_entry> LogSegmentStore::getEntry(UInt64 index) const
Expand All @@ -680,18 +669,15 @@ ptr<log_entry> LogSegmentStore::getEntry(UInt64 index) const
return seg->getEntry(index);
}

void LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index, const ptr<std::vector<ptr<log_entry>>> & entries)
std::vector<ptr<log_entry>> LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index) const
{
if (entries == nullptr)
{
LOG_ERROR(log, "Entry vector is nullptr.");
return;
}
std::vector<ptr<log_entry>> entries;
for (UInt64 index = start_index; index <= end_index; index++)
{
auto entry_pt = getEntry(index);
entries->push_back(entry_pt);
entries.push_back(entry_pt);
}
return entries;
}

int LogSegmentStore::removeSegment(UInt64 first_index_kept)
Expand Down Expand Up @@ -813,9 +799,6 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept)
}
}

if (!last_segment)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found a segment to truncate, last_index_kept {}.", last_index_kept);

/// remove files
for (auto & to_removed : to_removed_segments)
{
Expand All @@ -824,20 +807,23 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept)
to_removed = nullptr;
}

bool is_open_before_truncate = last_segment->isOpen();
bool removed_something = last_segment->truncate(last_index_kept);
if (last_segment)
{
bool is_open_before_truncate = last_segment->isOpen();
bool removed_something = last_segment->truncate(last_index_kept);

if (!removed_something && last_segment->lastIndex() != last_index_kept)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName());
if (!removed_something && last_segment->lastIndex() != last_index_kept)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName());

if (!is_open_before_truncate && !last_segment->isOpen())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName());
if (!is_open_before_truncate && !last_segment->isOpen())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName());

if (!is_open_before_truncate)
{
open_segment = last_segment;
if (!closed_segments.empty())
closed_segments.erase(closed_segments.end() - 1);
if (!is_open_before_truncate)
{
open_segment = last_segment;
if (!closed_segments.empty())
closed_segments.erase(closed_segments.end() - 1);
}
}

last_log_index.store(last_index_kept, std::memory_order_release);
Expand Down
10 changes: 6 additions & 4 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ class LogSegmentStore final
UInt64 appendEntry(const ptr<log_entry> & entry);

/// First truncate log whose index is larger than or equals with index of entry, then append it.
UInt64 writeAt(UInt64 index, const ptr<log_entry> & entry);
void writeAt(UInt64 index, const ptr<log_entry> & entry);
ptr<log_entry> getEntry(UInt64 index) const;

/// Just for test, collection entries in [start_index, end_index]
void getEntries(UInt64 start_index, UInt64 end_index, const ptr<std::vector<ptr<log_entry>>> & entries);
/// Just for test
std::vector<ptr<log_entry>> getEntries(UInt64 start_index, UInt64 end_index) const;

/// Remove segments from storage's head, logs in [1, first_index_kept) will be discarded, usually invoked when compaction.
/// return number of segments removed
Expand All @@ -219,7 +219,7 @@ class LogSegmentStore final
/// Return true if some logs are removed
bool truncateLog(UInt64 last_index_kept);

/// get closed segments, only for tests
/// Just for tests
Segments getClosedSegments()
{
std::shared_lock read_lock(seg_mutex);
Expand All @@ -232,8 +232,10 @@ class LogSegmentStore final
private:
/// open a new segment, invoked when init
void openNewSegmentIfNeeded();

/// list segments, invoked when init
void loadSegmentMetaData();

/// load listed segments, invoked when init
void loadSegments();

Expand Down
5 changes: 0 additions & 5 deletions src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,6 @@ nuraft::ptr<nuraft::buffer> NuRaftStateMachine::commit(const ulong log_idx, buff
return commit(log_idx, data, false);
}

[[maybe_unused]] void NuRaftStateMachine::processReadRequest(const RequestForSession & request_for_session)
{
store.processRequest(responses_queue, request_for_session);
}

std::vector<int64_t> NuRaftStateMachine::getDeadSessions()
{
return store.getDeadSessions();
Expand Down
3 changes: 0 additions & 3 deletions src/Service/NuRaftStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,6 @@ class NuRaftStateMachine : public nuraft::state_machine

KeeperStore & getStore() { return store; }

/// process read request
[[maybe_unused]] void processReadRequest(const RequestForSession & request_for_session);

/// get expired session
std::vector<int64_t> getDeadSessions();

Expand Down
54 changes: 0 additions & 54 deletions src/Service/SnapshotCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,60 +229,6 @@ void serializeAclsV2(const NumToACLMap & acl_map, String path, UInt32 save_batch
LOG_INFO(log, "Finish create snapshot acl object, acl size {}, path {}", acl_map.size(), path);
}

[[maybe_unused]] size_t serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size)
{
Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore"));
LOG_INFO(log, "Begin create snapshot ephemeral object, node size {}, path {}", ephemerals.size(), path);

ptr<SnapshotBatchBody> batch;

std::lock_guard lock(mutex);

if (ephemerals.empty())
{
LOG_INFO(log, "Ephemeral nodes size is 0");
return 0;
}

auto out = cs_new<WriteBufferFromFile>(path);
uint64_t index = 0;
for (auto & ephemeral_it : ephemerals)
{
/// flush and rebuild batch
if (index % save_batch_size == 0)
{
/// skip flush the first batch
if (index != 0)
{
/// write data in batch to file
saveBatchV2(out, batch);
}
batch = cs_new<SnapshotBatchBody>();
batch->type = SnapshotBatchType::SNAPSHOT_TYPE_DATA_EPHEMERAL;
}

/// append to batch
WriteBufferFromNuraftBuffer buf;
Coordination::write(ephemeral_it.first, buf);
Coordination::write(ephemeral_it.second.size(), buf);

for (const auto & node_path : ephemeral_it.second)
{
Coordination::write(node_path, buf);
}

ptr<buffer> data = buf.getBuffer();
data->pos(0);
batch->add(String(reinterpret_cast<char *>(data->data_begin()), data->size()));

index++;
}

/// flush the last batch
saveBatchV2(out, batch);
out->close();
return 1;
}

void serializeSessionsV2(SessionAndTimeout & session_and_timeout, SessionAndAuth & session_and_auth, UInt32 save_batch_size, SnapshotVersion version, String & path)
{
Expand Down
2 changes: 0 additions & 2 deletions src/Service/SnapshotCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ std::pair<size_t, UInt32>
saveBatchAndUpdateCheckSumV2(ptr<WriteBufferFromFile> & out, ptr<SnapshotBatchBody> & batch, UInt32 checksum);

void serializeAclsV2(const NumToACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version);
[[maybe_unused]] size_t
serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size);

/// Serialize sessions and return the next_session_id before serialize
void serializeSessionsV2(SessionAndTimeout & session_and_timeout, SessionAndAuth & session_and_auth, UInt32 save_batch_size, SnapshotVersion version, String & path);
Expand Down
10 changes: 4 additions & 6 deletions src/Service/tests/gtest_raft_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,10 @@ TEST(RaftLog, getEntries)
String data("CREATE TABLE table1;");
ASSERT_EQ(appendEntry(log_store, term, key, data), i + 1);
}
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
log_store->getEntries(1, 3, ret);
ASSERT_EQ(ret->size(), 3);
ret->clear();
log_store->getEntries(4, 8, ret);
ASSERT_EQ(ret->size(), 5);
std::vector<ptr<log_entry>> ret = log_store->getEntries(1, 3);
ASSERT_EQ(ret.size(), 3);
ret = log_store->getEntries(4, 8);
ASSERT_EQ(ret.size(), 5);
log_store->close();
cleanDirectory(log_dir);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Service/tests/raft_test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TestServer : public Poco::Util::Application, public Loggers
};

static const String LOG_DIR = "./test_raft_log";
[[maybe_unused]] static const String SNAP_DIR = "./test_raft_snapshot";
static const String SNAP_DIR = "./test_raft_snapshot";

void cleanAll();
void cleanDirectory(const String & log_dir, bool remove_dir = true);
Expand Down

0 comments on commit 5c3a35d

Please sign in to comment.