Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15456
Browse files Browse the repository at this point in the history
close tikv#15457

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
bufferflies authored and ti-chi-bot committed Sep 5, 2023
1 parent f11bf5c commit 73ce8b5
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 19 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions components/raftstore-v2/src/operation/command/admin/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub struct SplitResult {
// The index of the derived region in `regions`
pub derived_index: usize,
pub tablet_index: u64,
// new regions will share the region size if it's true.
// otherwise, the new region's size will be 0.
pub share_source_region_size: bool,
// Hack: in common case we should use generic, but split is an infrequent
// event that performance is not critical. And using `Any` can avoid polluting
// all existing code.
Expand Down Expand Up @@ -146,6 +149,9 @@ pub struct RequestSplit {
pub epoch: RegionEpoch,
pub split_keys: Vec<Vec<u8>>,
pub source: Cow<'static, str>,
// new regions will share the region size if it's true.
// otherwise, the new region's size will be 0.
pub share_source_region_size: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -207,6 +213,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
{
return true;
}
fail_point!("on_split_region_check_tick", |_| true);
if ctx.schedulers.split_check.is_busy() {
return false;
}
Expand Down Expand Up @@ -297,7 +304,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
ch.set_result(cmd_resp::new_error(e));
return;
}
self.ask_batch_split_pd(ctx, rs.split_keys, ch);
self.ask_batch_split_pd(ctx, rs.split_keys, rs.share_source_region_size, ch);
}

pub fn on_request_half_split<T>(
Expand Down Expand Up @@ -435,6 +442,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
let derived_req = &[derived_req];

let right_derive = split_reqs.get_right_derive();
let share_source_region_size = split_reqs.get_share_source_region_size();
let reqs = if right_derive {
split_reqs.get_requests().iter().chain(derived_req)
} else {
Expand Down Expand Up @@ -521,6 +529,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
derived_index,
tablet_index: log_index,
tablet: Box::new(tablet),
share_source_region_size,
}),
))
}
Expand Down Expand Up @@ -556,6 +565,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
fail_point!("on_split", self.peer().get_store_id() == 3, |_| {});

let derived = &res.regions[res.derived_index];
let share_source_region_size = res.share_source_region_size;
let region_id = derived.get_id();

let region_locks = self.txn_context().split(&res.regions, derived);
Expand Down Expand Up @@ -586,8 +596,14 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {

let new_region_count = res.regions.len() as u64;
let control = self.split_flow_control_mut();
let estimated_size = control.approximate_size.map(|v| v / new_region_count);
let estimated_keys = control.approximate_keys.map(|v| v / new_region_count);
// if share_source_region_size is true, it means the new region contains any
// data from the origin region.
let mut share_size = None;
let mut share_keys = None;
if share_source_region_size {
share_size = control.approximate_size.map(|v| v / new_region_count);
share_keys = control.approximate_keys.map(|v| v / new_region_count);
}

self.post_split();

Expand All @@ -605,8 +621,11 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
// After split, the peer may need to update its metrics.
let control = self.split_flow_control_mut();
control.may_skip_split_check = false;
control.approximate_size = estimated_size;
control.approximate_keys = estimated_keys;
if share_source_region_size {
control.approximate_size = share_size;
control.approximate_keys = share_keys;
}

self.add_pending_tick(PeerTick::SplitRegionCheck);
}
self.storage_mut().set_has_dirty_data(true);
Expand Down Expand Up @@ -651,8 +670,8 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
derived_region_id: region_id,
check_split: last_region_id == new_region_id,
scheduled: false,
approximate_size: estimated_size,
approximate_keys: estimated_keys,
approximate_size: share_size,
approximate_keys: share_keys,
locks,
}));

Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/operation/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,15 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
&self,
ctx: &StoreContext<EK, ER, T>,
split_keys: Vec<Vec<u8>>,
share_source_region_size: bool,
ch: CmdResChannel,
) {
let task = pd::Task::AskBatchSplit {
region: self.region().clone(),
split_keys,
peer: self.peer().clone(),
right_derive: ctx.cfg.right_derive_when_split,
share_source_region_size,
ch,
};
if let Err(e) = ctx.schedulers.pd.schedule(task) {
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/router/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<EK: KvEngine, ER: RaftEngine> raftstore::coprocessor::StoreHandle for Store
split_keys: Vec<Vec<u8>>,
source: Cow<'static, str>,
) {
let (msg, _) = PeerMsg::request_split(region_epoch, split_keys, source.to_string());
let (msg, _) = PeerMsg::request_split(region_epoch, split_keys, source.to_string(), true);
let res = self.send(region_id, msg);
if let Err(e) = res {
warn!(
Expand Down
27 changes: 27 additions & 0 deletions components/raftstore-v2/src/router/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl PeerMsg {
epoch: metapb::RegionEpoch,
split_keys: Vec<Vec<u8>>,
source: String,
share_source_region_size: bool,
) -> (Self, CmdResSubscriber) {
let (ch, sub) = CmdResChannel::pair();
(
Expand All @@ -290,12 +291,38 @@ impl PeerMsg {
epoch,
split_keys,
source: source.into(),
share_source_region_size,
},
ch,
},
sub,
)
}
<<<<<<< HEAD
=======

#[cfg(feature = "testexport")]
pub fn request_split_with_callback(
epoch: metapb::RegionEpoch,
split_keys: Vec<Vec<u8>>,
source: String,
f: Box<dyn FnOnce(&mut kvproto::raft_cmdpb::RaftCmdResponse) + Send>,
) -> (Self, CmdResSubscriber) {
let (ch, sub) = CmdResChannel::with_callback(f);
(
PeerMsg::RequestSplit {
request: RequestSplit {
epoch,
split_keys,
source: source.into(),
share_source_region_size: false,
},
ch,
},
sub,
)
}
>>>>>>> 640143a2da (raftstore: region initial size depends on the split resource . (#15456))
}

#[derive(Debug)]
Expand Down
11 changes: 10 additions & 1 deletion components/raftstore-v2/src/worker/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub enum Task {
split_keys: Vec<Vec<u8>>,
peer: metapb::Peer,
right_derive: bool,
share_source_region_size: bool,
ch: CmdResChannel,
},
ReportBatchSplit {
Expand Down Expand Up @@ -276,7 +277,15 @@ where
peer,
right_derive,
ch,
} => self.handle_ask_batch_split(region, split_keys, peer, right_derive, ch),
share_source_region_size,
} => self.handle_ask_batch_split(
region,
split_keys,
peer,
right_derive,
share_source_region_size,
ch,
),
Task::ReportBatchSplit { regions } => self.handle_report_batch_split(regions),
Task::AutoSplit { split_infos } => self.handle_auto_split(split_infos),
Task::UpdateMaxTimestamp {
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/worker/pd/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ where
epoch,
split_keys: split_region.take_keys().into(),
source: "pd".into(),
share_source_region_size: false,
},
ch,
}
Expand Down
8 changes: 8 additions & 0 deletions components/raftstore-v2/src/worker/pd/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ fn new_batch_split_region_request(
split_keys: Vec<Vec<u8>>,
ids: Vec<pdpb::SplitId>,
right_derive: bool,
share_source_region_size: bool,
) -> AdminRequest {
let mut req = AdminRequest::default();
req.set_cmd_type(AdminCmdType::BatchSplit);
req.mut_splits().set_right_derive(right_derive);
req.mut_splits()
.set_share_source_region_size(share_source_region_size);
let mut requests = Vec::with_capacity(ids.len());
for (mut id, key) in ids.into_iter().zip(split_keys) {
let mut split = SplitRequest::default();
Expand All @@ -46,6 +49,7 @@ where
split_keys: Vec<Vec<u8>>,
peer: metapb::Peer,
right_derive: bool,
share_source_region_size: bool,
ch: CmdResChannel,
) {
Self::ask_batch_split_imp(
Expand All @@ -57,6 +61,7 @@ where
split_keys,
peer,
right_derive,
share_source_region_size,
Some(ch),
);
}
Expand All @@ -70,6 +75,7 @@ where
split_keys: Vec<Vec<u8>>,
peer: metapb::Peer,
right_derive: bool,
share_source_region_size: bool,
ch: Option<CmdResChannel>,
) {
if split_keys.is_empty() {
Expand Down Expand Up @@ -98,6 +104,7 @@ where
split_keys,
resp.take_ids().into(),
right_derive,
share_source_region_size,
);
let region_id = region.get_id();
let epoch = region.take_region_epoch();
Expand Down Expand Up @@ -148,6 +155,7 @@ where
vec![split_key],
split_info.peer,
true,
false,
None,
);
// Try to split the region on half within the given key
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl<EK: KvEngine, ER: RaftEngine> crate::coprocessor::StoreHandle for RaftRoute
split_keys,
callback: Callback::None,
source,
share_source_region_size: true,
},
) {
warn!(
Expand Down
7 changes: 7 additions & 0 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ pub enum ExecResult<S> {
regions: Vec<Region>,
derived: Region,
new_split_regions: HashMap<u64, NewSplitPeer>,
share_source_region_size: bool,
},
PrepareMerge {
region: Region,
Expand Down Expand Up @@ -2515,6 +2516,9 @@ where
admin_req
.mut_splits()
.set_right_derive(split.get_right_derive());
admin_req
.mut_split()
.set_share_source_region_size(split.get_share_source_region_size());
admin_req.mut_splits().mut_requests().push(split);
// This method is executed only when there are unapplied entries after being
// restarted. So there will be no callback, it's OK to return a response
Expand Down Expand Up @@ -2559,6 +2563,7 @@ where
derived.mut_region_epoch().set_version(new_version);

let right_derive = split_reqs.get_right_derive();
let share_source_region_size = split_reqs.get_share_source_region_size();
let mut regions = Vec::with_capacity(new_region_cnt + 1);
// Note that the split requests only contain ids for new regions, so we need
// to handle new regions and old region separately.
Expand Down Expand Up @@ -2723,6 +2728,7 @@ where
regions,
derived,
new_split_regions,
share_source_region_size,
}),
))
}
Expand Down Expand Up @@ -7075,6 +7081,7 @@ mod tests {
regions,
derived: _,
new_split_regions: _,
share_source_region_size: _,
} = apply_res.exec_res.front().unwrap()
{
let r8 = regions.get(0).unwrap();
Expand Down
Loading

0 comments on commit 73ce8b5

Please sign in to comment.