Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stale log index when there is an snapshot but no log in disk #311

Merged
merged 1 commit into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,11 @@ int NuRaftLogSegment::truncate(const UInt64 last_index_kept)
return ret;
}

ptr<LogSegmentStore> LogSegmentStore::segment_store = nullptr;

ptr<LogSegmentStore> LogSegmentStore::getInstance(const String & log_dir_, bool force_new)
{
static ptr<LogSegmentStore> segment_store;
if (segment_store == nullptr || force_new)
segment_store = cs_new<LogSegmentStore>(log_dir_);

return segment_store;
}

Expand Down
9 changes: 3 additions & 6 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class NuRaftLogSegment
class LogSegmentStore
{
public:
using SegmentVector = std::vector<ptr<NuRaftLogSegment>>;
using Segments = std::vector<ptr<NuRaftLogSegment>>;

static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; //1G, 0.3K/Log, 3M logs
static constexpr UInt32 MAX_SEGMENT_COUNT = 50; //50G
Expand Down Expand Up @@ -296,7 +296,7 @@ class LogSegmentStore
int reset(UInt64 next_log_index);

/// get closed segments
SegmentVector & getClosedSegments() { return segments; }
Segments & getClosedSegments() { return segments; }

/// get file format version
LogVersion getVersion(UInt64 index);
Expand All @@ -312,9 +312,6 @@ class LogSegmentStore
/// find segment by log index
int getSegment(UInt64 log_index, ptr<NuRaftLogSegment> & ptr);

/// global instance
static ptr<LogSegmentStore> segment_store;

/// file log store directory
String log_dir;

Expand All @@ -331,7 +328,7 @@ class LogSegmentStore
Poco::Logger * log;

/// closed segments
SegmentVector segments;
Segments segments;

/// open segments
ptr<NuRaftLogSegment> open_segment;
Expand Down
2 changes: 1 addition & 1 deletion src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ NuRaftStateMachine::NuRaftStateMachine(
{
LOG_INFO(log, "No previous last commit idx found, skip replaying logs.");
}
else if (previous_last_commit_id < last_committed_idx)
else if (previous_last_commit_id <= last_committed_idx)
{
LOG_WARNING(
log,
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_snapshots/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import random
import string
import time

import pytest

Expand Down Expand Up @@ -72,6 +73,7 @@ def test_state_after_restart(started_cluster, node):
finally:
close_zk_clients([node_zk, node_zk2])


@pytest.mark.parametrize(
'node',
[
Expand Down Expand Up @@ -115,3 +117,28 @@ def test_ephemeral_after_restart(started_cluster, node):
assert list(sorted(existing_children)) == list(sorted(node_zk2.get_children("/test_ephemeral_after_restart")))
finally:
close_zk_clients([node_zk, node_zk2])


@pytest.mark.parametrize(
'node',
[
cluster.add_instance('node5', main_configs=['configs/enable_keeper.xml'], with_zookeeper=True, stay_alive=True),
cluster.add_instance('node6', main_configs=['configs/enable_async_snapshot_keeper.xml'], with_zookeeper=True, stay_alive=True)
]
)
def test_restart_with_no_log(started_cluster, node):
node_zk = node_zk2 = None
try:
node_zk = node.get_fake_zk()
node_zk.create("/test_restart_with_no_log", b"somevalue")

node.send_4lw_cmd(cmd="csnp")
time.sleep(1) # wait for snapshot to be taken

node.restart_raftkeeper(kill=True)
node.wait_for_join_cluster()

node_zk2 = node.get_fake_zk()
assert node_zk2.exists("/test_restart_with_no_log")
finally:
close_zk_clients([node_zk, node_zk2])
Loading