Skip to content

Commit

Permalink
raftstore: region initial size depends on the split resource . (tikv#…
Browse files Browse the repository at this point in the history
…15456) (tikv#15509)

close tikv#15457

there are three triggers will split the regions:
1. load split include sizekeys, load etc. In this cases, the new region should contains the data  after split.
2. tidb split tables or partition table, such like `create table test.t1(id int,b int) shard_row_id_bits=4 partition by hash(id) partitions 2000`. 
In this cases , the new region shouldn't contains any data  after split.

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: bufferflies <[email protected]>

Co-authored-by: buffer <[email protected]>
Co-authored-by: bufferflies <[email protected]>
  • Loading branch information
ti-chi-bot and bufferflies committed Sep 6, 2023
1 parent a6b6f6f commit 2cede16
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 9 deletions.
7 changes: 7 additions & 0 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub enum ExecResult<S> {
regions: Vec<Region>,
derived: Region,
new_split_regions: HashMap<u64, NewSplitPeer>,
share_source_region_size: bool,
},
PrepareMerge {
region: Region,
Expand Down Expand Up @@ -2436,6 +2437,9 @@ where
admin_req
.mut_splits()
.set_right_derive(split.get_right_derive());
admin_req
.mut_split()
.set_share_source_region_size(split.get_share_source_region_size());
admin_req.mut_splits().mut_requests().push(split);
// This method is executed only when there are unapplied entries after being
// restarted. So there will be no callback, it's OK to return a response
Expand Down Expand Up @@ -2480,6 +2484,7 @@ where
derived.mut_region_epoch().set_version(new_version);

let right_derive = split_reqs.get_right_derive();
let share_source_region_size = split_reqs.get_share_source_region_size();
let mut regions = Vec::with_capacity(new_region_cnt + 1);
// Note that the split requests only contain ids for new regions, so we need
// to handle new regions and old region separately.
Expand Down Expand Up @@ -2644,6 +2649,7 @@ where
regions,
derived,
new_split_regions,
share_source_region_size,
}),
))
}
Expand Down Expand Up @@ -6216,6 +6222,7 @@ mod tests {
regions,
derived: _,
new_split_regions: _,
share_source_region_size: _,
} = apply_res.exec_res.front().unwrap()
{
let r8 = regions.get(0).unwrap();
Expand Down
43 changes: 34 additions & 9 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,8 +1034,15 @@ where
split_keys,
callback,
source,
share_source_region_size,
} => {
self.on_prepare_split_region(region_epoch, split_keys, callback, &source);
self.on_prepare_split_region(
region_epoch,
split_keys,
callback,
&source,
share_source_region_size,
);
}
CasualMessage::ComputeHashResult {
index,
Expand Down Expand Up @@ -3876,6 +3883,7 @@ where
derived: metapb::Region,
regions: Vec<metapb::Region>,
new_split_regions: HashMap<u64, apply::NewSplitPeer>,
share_source_region_size: bool,
) {
fail_point!("on_split", self.ctx.store_id() == 3, |_| {});

Expand All @@ -3897,8 +3905,15 @@ where

// Roughly estimate the size and keys for new regions.
let new_region_count = regions.len() as u64;
let estimated_size = self.fsm.peer.approximate_size.map(|v| v / new_region_count);
let estimated_keys = self.fsm.peer.approximate_keys.map(|v| v / new_region_count);
let mut share_size = None;
let mut share_keys = None;
// if share_source_region_size is true, it means the new region contains any
// data from the origin region
if share_source_region_size {
share_size = self.fsm.peer.approximate_size.map(|v| v / new_region_count);
share_keys = self.fsm.peer.approximate_keys.map(|v| v / new_region_count);
}

let mut meta = self.ctx.store_meta.lock().unwrap();
meta.set_region(
&self.ctx.coprocessor_host,
Expand All @@ -3913,8 +3928,10 @@ where

let is_leader = self.fsm.peer.is_leader();
if is_leader {
self.fsm.peer.approximate_size = estimated_size;
self.fsm.peer.approximate_keys = estimated_keys;
if share_source_region_size {
self.fsm.peer.approximate_size = share_size;
self.fsm.peer.approximate_keys = share_keys;
}
self.fsm.peer.heartbeat_pd(self.ctx);
// Notify pd immediately to let it update the region meta.
info!(
Expand Down Expand Up @@ -4043,8 +4060,8 @@ where
new_peer.has_ready |= campaigned;

if is_leader {
new_peer.peer.approximate_size = estimated_size;
new_peer.peer.approximate_keys = estimated_keys;
new_peer.peer.approximate_size = share_size;
new_peer.peer.approximate_keys = share_keys;
*new_peer.peer.txn_ext.pessimistic_locks.write() = locks;
// The new peer is likely to become leader, send a heartbeat immediately to
// reduce client query miss.
Expand Down Expand Up @@ -4866,7 +4883,13 @@ where
derived,
regions,
new_split_regions,
} => self.on_ready_split_region(derived, regions, new_split_regions),
share_source_region_size,
} => self.on_ready_split_region(
derived,
regions,
new_split_regions,
share_source_region_size,
),
ExecResult::PrepareMerge { region, state } => {
self.on_ready_prepare_merge(region, state)
}
Expand Down Expand Up @@ -5488,7 +5511,7 @@ where
return;
}

fail_point!("on_split_region_check_tick");
fail_point!("on_split_region_check_tick", |_| {});
self.register_split_region_check_tick();

// To avoid frequent scan, we only add new scan tasks if all previous tasks
Expand Down Expand Up @@ -5546,6 +5569,7 @@ where
split_keys: Vec<Vec<u8>>,
cb: Callback<EK::Snapshot>,
source: &str,
share_source_region_size: bool,
) {
info!(
"on split";
Expand All @@ -5564,6 +5588,7 @@ where
split_keys,
peer: self.fsm.peer.peer.clone(),
right_derive: self.ctx.cfg.right_derive_when_split,
share_source_region_size,
callback: cb,
};
if let Err(ScheduleError::Stopped(t)) = self.ctx.pd_scheduler.schedule(task) {
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ pub enum CasualMessage<EK: KvEngine> {
split_keys: Vec<Vec<u8>>,
callback: Callback<EK::Snapshot>,
source: Cow<'static, str>,
share_source_region_size: bool,
},

/// Hash result of ComputeHash command.
Expand Down
19 changes: 19 additions & 0 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ where
peer: metapb::Peer,
// If true, right Region derives origin region_id.
right_derive: bool,
share_source_region_size: bool,
callback: Callback<EK::Snapshot>,
},
AskBatchSplit {
Expand All @@ -144,6 +145,7 @@ where
peer: metapb::Peer,
// If true, right Region derives origin region_id.
right_derive: bool,
share_source_region_size: bool,
callback: Callback<EK::Snapshot>,
},
AutoSplit {
Expand Down Expand Up @@ -996,6 +998,7 @@ where
split_key: Vec<u8>,
peer: metapb::Peer,
right_derive: bool,
share_source_region_size: bool,
callback: Callback<EK::Snapshot>,
task: String,
) {
Expand All @@ -1017,6 +1020,7 @@ where
resp.get_new_region_id(),
resp.take_new_peer_ids(),
right_derive,
share_source_region_size,
);
let region_id = region.get_id();
let epoch = region.take_region_epoch();
Expand Down Expand Up @@ -1066,6 +1070,7 @@ where
mut split_keys: Vec<Vec<u8>>,
peer: metapb::Peer,
right_derive: bool,
share_source_region_size: bool,
callback: Callback<EK::Snapshot>,
task: String,
remote: Remote<yatp::task::future::TaskCell>,
Expand All @@ -1091,6 +1096,7 @@ where
split_keys,
resp.take_ids().into(),
right_derive,
share_source_region_size,
);
let region_id = region.get_id();
let epoch = region.take_region_epoch();
Expand Down Expand Up @@ -1119,6 +1125,7 @@ where
split_key: split_keys.pop().unwrap(),
peer,
right_derive,
share_source_region_size,
callback,
};
if let Err(ScheduleError::Stopped(t)) = scheduler.schedule(task) {
Expand Down Expand Up @@ -1540,6 +1547,7 @@ where
split_keys: split_region.take_keys().into(),
callback: Callback::None,
source: "pd".into(),
share_source_region_size: false,
}
} else {
CasualMessage::HalfSplitRegion {
Expand Down Expand Up @@ -1919,12 +1927,14 @@ where
split_key,
peer,
right_derive,
share_source_region_size,
callback,
} => self.handle_ask_split(
region,
split_key,
peer,
right_derive,
share_source_region_size,
callback,
String::from("ask_split"),
),
Expand All @@ -1933,6 +1943,7 @@ where
split_keys,
peer,
right_derive,
share_source_region_size,
callback,
} => Self::handle_ask_batch_split(
self.router.clone(),
Expand All @@ -1942,6 +1953,7 @@ where
split_keys,
peer,
right_derive,
share_source_region_size,
callback,
String::from("batch_split"),
self.remote.clone(),
Expand All @@ -1967,6 +1979,7 @@ where
vec![split_key],
split_info.peer,
true,
false,
Callback::None,
String::from("auto_split"),
remote.clone(),
Expand Down Expand Up @@ -2252,24 +2265,30 @@ fn new_split_region_request(
new_region_id: u64,
peer_ids: Vec<u64>,
right_derive: bool,
share_source_region_size: bool,
) -> AdminRequest {
let mut req = AdminRequest::default();
req.set_cmd_type(AdminCmdType::Split);
req.mut_split().set_split_key(split_key);
req.mut_split().set_new_region_id(new_region_id);
req.mut_split().set_new_peer_ids(peer_ids);
req.mut_split().set_right_derive(right_derive);
req.mut_split()
.set_share_source_region_size(share_source_region_size);
req
}

fn new_batch_split_region_request(
split_keys: Vec<Vec<u8>>,
ids: Vec<pdpb::SplitId>,
right_derive: bool,
share_source_region_size: bool,
) -> AdminRequest {
let mut req = AdminRequest::default();
req.set_cmd_type(AdminCmdType::BatchSplit);
req.mut_splits().set_right_derive(right_derive);
req.mut_splits()
.set_share_source_region_size(share_source_region_size);
let mut requests = Vec::with_capacity(ids.len());
for (mut id, key) in ids.into_iter().zip(split_keys) {
let mut split = SplitRequest::default();
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/worker/split_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,5 +692,6 @@ where
split_keys,
callback: Callback::None,
source: source.into(),
share_source_region_size: true,
}
}
1 change: 1 addition & 0 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,7 @@ impl<T: Simulator> Cluster<T> {
split_keys: vec![split_key],
callback: cb,
source: "test".into(),
share_source_region_size: false,
},
)
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/server/raftkv/raft_extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ where
split_keys,
callback: raftstore::store::Callback::write(cb),
source: source.into(),
share_source_region_size: false,
};
let res = self.router.send_casual_msg(region_id, req);
Box::pin(async move {
Expand Down
62 changes: 62 additions & 0 deletions tests/failpoints/cases/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use kvproto::{
Mutation, Op, PessimisticLockRequest, PrewriteRequest, PrewriteRequestPessimisticAction::*,
},
metapb::Region,
pdpb::CheckPolicy,
raft_serverpb::RaftMessage,
tikvpb::TikvClient,
};
Expand Down Expand Up @@ -261,6 +262,67 @@ impl Filter for PrevoteRangeFilter {
}
}

#[test]
fn test_region_size_after_split() {
let mut cluster = new_node_cluster(0, 1);
cluster.cfg.raft_store.right_derive_when_split = true;
cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(100);
cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(100);
cluster.cfg.raft_store.region_split_check_diff = Some(ReadableSize(10));
let region_max_size = 1440;
let region_split_size = 960;
cluster.cfg.coprocessor.region_max_size = Some(ReadableSize(region_max_size));
cluster.cfg.coprocessor.region_split_size = ReadableSize(region_split_size);
let pd_client = cluster.pd_client.clone();
pd_client.disable_default_operator();
let _r = cluster.run_conf_change();

// insert 20 key value pairs into the cluster.
// from 000000001 to 000000020
let mut range = 1..;
put_till_size(&mut cluster, region_max_size - 100, &mut range);
sleep_ms(100);
// disable check split.
fail::cfg("on_split_region_check_tick", "return").unwrap();
let max_key = put_till_size(&mut cluster, region_max_size, &mut range);
// split by use key, split region 1 to region 1 and region 2.
// region 1: ["000000010",""]
// region 2: ["","000000010")
let region = pd_client.get_region(&max_key).unwrap();
cluster.must_split(&region, b"000000010");
let size = cluster
.pd_client
.get_region_approximate_size(region.get_id())
.unwrap_or_default();
assert!(size >= region_max_size - 100, "{}", size);

let region = pd_client.get_region(b"000000009").unwrap();
let size1 = cluster
.pd_client
.get_region_approximate_size(region.get_id())
.unwrap_or_default();
assert_eq!(0, size1, "{}", size1);

// split region by size check, the region 1 will be split to region 1 and region
// 3. and the region3 will contains one half region size data.
let region = pd_client.get_region(&max_key).unwrap();
pd_client.split_region(region.clone(), CheckPolicy::Scan, vec![]);
sleep_ms(200);
let size2 = cluster
.pd_client
.get_region_approximate_size(region.get_id())
.unwrap_or_default();
assert!(size > size2, "{}:{}", size, size2);
fail::remove("on_split_region_check_tick");

let region = pd_client.get_region(b"000000010").unwrap();
let size3 = cluster
.pd_client
.get_region_approximate_size(region.get_id())
.unwrap_or_default();
assert!(size3 > 0, "{}", size3);
}

// Test if a peer is created from splitting when another initialized peer with
// the same region id has already existed. In previous implementation, it can be
// created and panic will happen because there are two initialized peer with the
Expand Down

0 comments on commit 2cede16

Please sign in to comment.