Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logical error: last_segment not exist #347

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/Service/KeeperDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,6 @@ void KeeperDispatcher::unRegisterSessionResponseCallbackWithoutLock(int64_t id)
session_response_callbacks.erase(it);
}

[[maybe_unused]] void KeeperDispatcher::registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected)
{
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
registerUserResponseCallBackWithoutLock(session_id, callback, is_reconnected);
}

void KeeperDispatcher::registerUserResponseCallBackWithoutLock(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected)
{
if (session_id == 0)
Expand Down
1 change: 0 additions & 1 deletion src/Service/KeeperDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
void unRegisterForwarderResponseCallBack(ForwardClientId client_id);

/// Register response callback for user request
[[maybe_unused]] void registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false);
void registerUserResponseCallBackWithoutLock(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false);
void unregisterUserResponseCallBack(int64_t session_id);
void unregisterUserResponseCallBackWithoutLock(int64_t session_id);
Expand Down
14 changes: 0 additions & 14 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,6 @@ void KeeperServer::handleRemoteSession(int64_t session_id, int64_t expiration_ti
state_machine->getStore().handleRemoteSession(session_id, expiration_time);
}

[[maybe_unused]] int64_t KeeperServer::getSessionTimeout(int64_t session_id)
{
LOG_DEBUG(log, "New session timeout for {}", session_id);
if (state_machine->getStore().containsSession(session_id))
{
return state_machine->getStore().getSessionAndTimeOut().find(session_id)->second;
}
else
{
LOG_WARNING(log, "Not found session timeout for {}", session_id);
return -1;
}
}

bool KeeperServer::isLeader() const
{
return raft_instance->is_leader();
Expand Down
4 changes: 0 additions & 4 deletions src/Service/KeeperServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ class KeeperServer
/// it will update the snapshot itself.
void handleRemoteSession(int64_t session_id, int64_t expiration_time);

/// Get the initialized timeout for a session.
/// Return initialized timeout, or -1 if session not exist.
[[maybe_unused]] int64_t getSessionTimeout(int64_t session_id);

/// will invoke waitInit
void startup();

Expand Down
10 changes: 5 additions & 5 deletions src/Service/NuRaftFileLogStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ ulong NuRaftFileLogStore::append(ptr<log_entry> & entry)

void NuRaftFileLogStore::write_at(ulong index, ptr<log_entry> & entry)
{
if (segment_store->writeAt(index, entry) == index)
log_queue.clear();
segment_store->writeAt(index, entry);

log_queue.clear();
last_log_entry = entry;

/// notify parallel fsync thread
/// log store file fsync
if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL && entry->get_val_type() != log_val_type::app_log)
parallel_fsync_event->set();

Expand Down Expand Up @@ -255,8 +255,8 @@ ptr<log_entry> NuRaftFileLogStore::entry_at(ulong index)

ulong NuRaftFileLogStore::term_at(ulong index)
{
if (entry_at(index))
return entry_at(index)->get_term();
if (auto entry = entry_at(index))
return entry->get_term();
return 0;
}

Expand Down
60 changes: 23 additions & 37 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include <Service/NuRaftLogSegment.h>

#include <charconv>
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <unistd.h>
#include <charconv>

#include <Poco/File.h>

Expand Down Expand Up @@ -33,16 +33,6 @@ namespace ErrorCodes

using namespace nuraft;

[[maybe_unused]] int ftruncateUninterrupted(int fd, off_t length)
{
int rc;
do
{
rc = ftruncate(fd, length);
} while (rc == -1 && errno == EINTR);
return rc;
}

bool compareSegment(const ptr<NuRaftLogSegment> & lhs, const ptr<NuRaftLogSegment> & rhs)
{
return lhs->firstIndex() < rhs->firstIndex();
Expand Down Expand Up @@ -661,14 +651,13 @@ UInt64 LogSegmentStore::appendEntry(const ptr<log_entry> & entry)
return open_segment->appendEntry(entry, last_log_index);
}

UInt64 LogSegmentStore::writeAt(UInt64 index, const ptr<log_entry> & entry)
void LogSegmentStore::writeAt(UInt64 index, const ptr<log_entry> & entry)
{
truncateLog(index - 1);
if (index == lastLogIndex() + 1)
return appendEntry(entry);

LOG_WARNING(log, "writeAt log index {} failed, firstLogIndex {}, lastLogIndex {}.", index, firstLogIndex(), lastLogIndex());
return -1;
appendEntry(entry);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Fail to write log at {}, log store index range [{}, {}].", index, firstLogIndex(), lastLogIndex());
}

ptr<log_entry> LogSegmentStore::getEntry(UInt64 index) const
Expand All @@ -680,18 +669,15 @@ ptr<log_entry> LogSegmentStore::getEntry(UInt64 index) const
return seg->getEntry(index);
}

void LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index, const ptr<std::vector<ptr<log_entry>>> & entries)
std::vector<ptr<log_entry>> LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index) const
{
if (entries == nullptr)
{
LOG_ERROR(log, "Entry vector is nullptr.");
return;
}
std::vector<ptr<log_entry>> entries;
for (UInt64 index = start_index; index <= end_index; index++)
{
auto entry_pt = getEntry(index);
entries->push_back(entry_pt);
entries.push_back(entry_pt);
}
return entries;
}

int LogSegmentStore::removeSegment(UInt64 first_index_kept)
Expand Down Expand Up @@ -813,9 +799,6 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept)
}
}

if (!last_segment)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found a segment to truncate, last_index_kept {}.", last_index_kept);

/// remove files
for (auto & to_removed : to_removed_segments)
{
Expand All @@ -824,20 +807,23 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept)
to_removed = nullptr;
}

bool is_open_before_truncate = last_segment->isOpen();
bool removed_something = last_segment->truncate(last_index_kept);
if (last_segment)
{
bool is_open_before_truncate = last_segment->isOpen();
bool removed_something = last_segment->truncate(last_index_kept);

if (!removed_something && last_segment->lastIndex() != last_index_kept)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName());
if (!removed_something && last_segment->lastIndex() != last_index_kept)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName());

if (!is_open_before_truncate && !last_segment->isOpen())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName());
if (!is_open_before_truncate && !last_segment->isOpen())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName());

if (!is_open_before_truncate)
{
open_segment = last_segment;
if (!closed_segments.empty())
closed_segments.erase(closed_segments.end() - 1);
if (!is_open_before_truncate)
{
open_segment = last_segment;
if (!closed_segments.empty())
closed_segments.erase(closed_segments.end() - 1);
}
}

last_log_index.store(last_index_kept, std::memory_order_release);
Expand Down
10 changes: 6 additions & 4 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ class LogSegmentStore final
UInt64 appendEntry(const ptr<log_entry> & entry);

/// First truncate log whose index is larger than or equals with index of entry, then append it.
UInt64 writeAt(UInt64 index, const ptr<log_entry> & entry);
void writeAt(UInt64 index, const ptr<log_entry> & entry);
ptr<log_entry> getEntry(UInt64 index) const;

/// Just for test, collection entries in [start_index, end_index]
void getEntries(UInt64 start_index, UInt64 end_index, const ptr<std::vector<ptr<log_entry>>> & entries);
/// Just for test
std::vector<ptr<log_entry>> getEntries(UInt64 start_index, UInt64 end_index) const;

/// Remove segments from storage's head, logs in [1, first_index_kept) will be discarded, usually invoked when compaction.
/// return number of segments removed
Expand All @@ -219,7 +219,7 @@ class LogSegmentStore final
/// Return true if some logs are removed
bool truncateLog(UInt64 last_index_kept);

/// get closed segments, only for tests
/// Just for tests
Segments getClosedSegments()
{
std::shared_lock read_lock(seg_mutex);
Expand All @@ -232,8 +232,10 @@ class LogSegmentStore final
private:
/// open a new segment, invoked when init
void openNewSegmentIfNeeded();

/// list segments, invoked when init
void loadSegmentMetaData();

/// load listed segments, invoked when init
void loadSegments();

Expand Down
5 changes: 0 additions & 5 deletions src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,6 @@ nuraft::ptr<nuraft::buffer> NuRaftStateMachine::commit(const ulong log_idx, buff
return commit(log_idx, data, false);
}

[[maybe_unused]] void NuRaftStateMachine::processReadRequest(const RequestForSession & request_for_session)
{
store.processRequest(responses_queue, request_for_session);
}

std::vector<int64_t> NuRaftStateMachine::getDeadSessions()
{
return store.getDeadSessions();
Expand Down
3 changes: 0 additions & 3 deletions src/Service/NuRaftStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,6 @@ class NuRaftStateMachine : public nuraft::state_machine

KeeperStore & getStore() { return store; }

/// process read request
[[maybe_unused]] void processReadRequest(const RequestForSession & request_for_session);

/// get expired session
std::vector<int64_t> getDeadSessions();

Expand Down
54 changes: 0 additions & 54 deletions src/Service/SnapshotCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,60 +229,6 @@ void serializeAclsV2(const NumToACLMap & acl_map, String path, UInt32 save_batch
LOG_INFO(log, "Finish create snapshot acl object, acl size {}, path {}", acl_map.size(), path);
}

[[maybe_unused]] size_t serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size)
{
Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore"));
LOG_INFO(log, "Begin create snapshot ephemeral object, node size {}, path {}", ephemerals.size(), path);

ptr<SnapshotBatchBody> batch;

std::lock_guard lock(mutex);

if (ephemerals.empty())
{
LOG_INFO(log, "Ephemeral nodes size is 0");
return 0;
}

auto out = cs_new<WriteBufferFromFile>(path);
uint64_t index = 0;
for (auto & ephemeral_it : ephemerals)
{
/// flush and rebuild batch
if (index % save_batch_size == 0)
{
/// skip flush the first batch
if (index != 0)
{
/// write data in batch to file
saveBatchV2(out, batch);
}
batch = cs_new<SnapshotBatchBody>();
batch->type = SnapshotBatchType::SNAPSHOT_TYPE_DATA_EPHEMERAL;
}

/// append to batch
WriteBufferFromNuraftBuffer buf;
Coordination::write(ephemeral_it.first, buf);
Coordination::write(ephemeral_it.second.size(), buf);

for (const auto & node_path : ephemeral_it.second)
{
Coordination::write(node_path, buf);
}

ptr<buffer> data = buf.getBuffer();
data->pos(0);
batch->add(String(reinterpret_cast<char *>(data->data_begin()), data->size()));

index++;
}

/// flush the last batch
saveBatchV2(out, batch);
out->close();
return 1;
}

void serializeSessionsV2(SessionAndTimeout & session_and_timeout, SessionAndAuth & session_and_auth, UInt32 save_batch_size, SnapshotVersion version, String & path)
{
Expand Down
2 changes: 0 additions & 2 deletions src/Service/SnapshotCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ std::pair<size_t, UInt32>
saveBatchAndUpdateCheckSumV2(ptr<WriteBufferFromFile> & out, ptr<SnapshotBatchBody> & batch, UInt32 checksum);

void serializeAclsV2(const NumToACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version);
[[maybe_unused]] size_t
serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size);

/// Serialize sessions and return the next_session_id before serialize
void serializeSessionsV2(SessionAndTimeout & session_and_timeout, SessionAndAuth & session_and_auth, UInt32 save_batch_size, SnapshotVersion version, String & path);
Expand Down
10 changes: 4 additions & 6 deletions src/Service/tests/gtest_raft_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,10 @@ TEST(RaftLog, getEntries)
String data("CREATE TABLE table1;");
ASSERT_EQ(appendEntry(log_store, term, key, data), i + 1);
}
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
log_store->getEntries(1, 3, ret);
ASSERT_EQ(ret->size(), 3);
ret->clear();
log_store->getEntries(4, 8, ret);
ASSERT_EQ(ret->size(), 5);
std::vector<ptr<log_entry>> ret = log_store->getEntries(1, 3);
ASSERT_EQ(ret.size(), 3);
ret = log_store->getEntries(4, 8);
ASSERT_EQ(ret.size(), 5);
log_store->close();
cleanDirectory(log_dir);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Service/tests/raft_test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TestServer : public Poco::Util::Application, public Loggers
};

static const String LOG_DIR = "./test_raft_log";
[[maybe_unused]] static const String SNAP_DIR = "./test_raft_snapshot";
static const String SNAP_DIR = "./test_raft_snapshot";

void cleanAll();
void cleanDirectory(const String & log_dir, bool remove_dir = true);
Expand Down
Loading