Skip to content

Commit

Permalink
Fix stale log index when there is an snapshot but no log in disk
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Jun 15, 2024
1 parent b5ec8d1 commit ac939f8
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 10 deletions.
4 changes: 1 addition & 3 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,11 @@ int NuRaftLogSegment::truncate(const UInt64 last_index_kept)
return ret;
}

ptr<LogSegmentStore> LogSegmentStore::segment_store = nullptr;

ptr<LogSegmentStore> LogSegmentStore::getInstance(const String & log_dir_, bool force_new)
{
static ptr<LogSegmentStore> segment_store;
if (segment_store == nullptr || force_new)
segment_store = cs_new<LogSegmentStore>(log_dir_);

return segment_store;
}

Expand Down
9 changes: 3 additions & 6 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class NuRaftLogSegment
class LogSegmentStore
{
public:
using SegmentVector = std::vector<ptr<NuRaftLogSegment>>;
using Segments = std::vector<ptr<NuRaftLogSegment>>;

static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; //1G, 0.3K/Log, 3M logs
static constexpr UInt32 MAX_SEGMENT_COUNT = 50; //50G
Expand Down Expand Up @@ -296,7 +296,7 @@ class LogSegmentStore
int reset(UInt64 next_log_index);

/// get closed segments
SegmentVector & getClosedSegments() { return segments; }
Segments & getClosedSegments() { return segments; }

/// get file format version
LogVersion getVersion(UInt64 index);
Expand All @@ -312,9 +312,6 @@ class LogSegmentStore
/// find segment by log index
int getSegment(UInt64 log_index, ptr<NuRaftLogSegment> & ptr);

/// global instance
static ptr<LogSegmentStore> segment_store;

/// file log store directory
String log_dir;

Expand All @@ -331,7 +328,7 @@ class LogSegmentStore
Poco::Logger * log;

/// closed segments
SegmentVector segments;
Segments segments;

/// open segments
ptr<NuRaftLogSegment> open_segment;
Expand Down
2 changes: 1 addition & 1 deletion src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ NuRaftStateMachine::NuRaftStateMachine(
{
LOG_INFO(log, "No previous last commit idx found, skip replaying logs.");
}
else if (previous_last_commit_id < last_committed_idx)
else if (previous_last_commit_id <= last_committed_idx)
{
LOG_WARNING(
log,
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_snapshots/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import random
import string
import time

import pytest

Expand Down Expand Up @@ -72,6 +73,7 @@ def test_state_after_restart(started_cluster, node):
finally:
close_zk_clients([node_zk, node_zk2])


@pytest.mark.parametrize(
'node',
[
Expand Down Expand Up @@ -115,3 +117,28 @@ def test_ephemeral_after_restart(started_cluster, node):
assert list(sorted(existing_children)) == list(sorted(node_zk2.get_children("/test_ephemeral_after_restart")))
finally:
close_zk_clients([node_zk, node_zk2])


@pytest.mark.parametrize(
'node',
[
cluster.add_instance('node5', main_configs=['configs/enable_keeper.xml'], with_zookeeper=True, stay_alive=True),
cluster.add_instance('node6', main_configs=['configs/enable_async_snapshot_keeper.xml'], with_zookeeper=True, stay_alive=True)
]
)
def test_restart_with_no_log(started_cluster, node):
node_zk = node_zk2 = None
try:
node_zk = node.get_fake_zk()
node_zk.create("/test_restart_with_no_log", b"somevalue")

node.send_4lw_cmd(cmd="csnp")
time.sleep(1) # wait for snapshot to be taken

node.restart_raftkeeper(kill=True)
node.wait_for_join_cluster()

node_zk2 = node.get_fake_zk()
assert node_zk2.exists("/test_restart_with_no_log")
finally:
close_zk_clients([node_zk, node_zk2])

0 comments on commit ac939f8

Please sign in to comment.