Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only aggregate heaviest fork at the coordinator. #3115

16 changes: 6 additions & 10 deletions wen-restart/proto/wen_restart.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,12 @@ message HeaviestForkRecord {
uint64 total_active_stake = 3;
uint32 shred_version = 4;
uint64 wallclock = 5;
}

message HeaviestForkAggregateFinal {
uint64 total_active_stake = 1;
uint64 total_active_stake_seen_supermajority = 2;
uint64 total_active_stake_agreed_with_me = 3;
string from = 6;
}

message HeaviestForkAggregateRecord {
map<string, HeaviestForkRecord> received = 1;
optional HeaviestForkAggregateFinal final_result = 2;
repeated HeaviestForkRecord received = 1;
uint64 total_active_stake = 2;
}

message GenerateSnapshotRecord {
Expand All @@ -70,6 +65,7 @@ message WenRestartProgress {
optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3;
optional HeaviestForkRecord my_heaviest_fork = 4;
optional HeaviestForkAggregateRecord heaviest_fork_aggregate = 5;
optional GenerateSnapshotRecord my_snapshot = 6;
map<string, ConflictMessage> conflict_message = 7;
optional HeaviestForkRecord coordinator_heaviest_fork = 6;
optional GenerateSnapshotRecord my_snapshot = 7;
map<string, ConflictMessage> conflict_message = 8;
}
129 changes: 39 additions & 90 deletions wen-restart/src/heaviest_fork_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use {
};

pub(crate) struct HeaviestForkAggregate {
supermajority_threshold: f64,
my_shred_version: u16,
my_pubkey: Pubkey,
// We use the epoch_stakes of the Epoch our heaviest bank is in. Proceed and exit only if
Expand All @@ -21,14 +20,6 @@ pub(crate) struct HeaviestForkAggregate {
heaviest_forks: HashMap<Pubkey, RestartHeaviestFork>,
block_stake_map: HashMap<(Slot, Hash), u64>,
active_peers: HashSet<Pubkey>,
active_peers_seen_supermajority: HashSet<Pubkey>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct HeaviestForkFinalResult {
pub block_stake_map: HashMap<(Slot, Hash), u64>,
pub total_active_stake: u64,
pub total_active_stake_seen_supermajority: u64,
}

#[derive(Debug, PartialEq)]
Expand All @@ -42,7 +33,6 @@ pub enum HeaviestForkAggregateResult {

impl HeaviestForkAggregate {
pub(crate) fn new(
wait_for_supermajority_threshold_percent: u64,
my_shred_version: u16,
epoch_stakes: &EpochStakes,
my_heaviest_fork_slot: Slot,
Expand All @@ -57,23 +47,20 @@ impl HeaviestForkAggregate {
epoch_stakes.node_id_to_stake(my_pubkey).unwrap_or(0),
);
Self {
supermajority_threshold: wait_for_supermajority_threshold_percent as f64 / 100.0,
my_shred_version,
my_pubkey: *my_pubkey,
epoch_stakes: epoch_stakes.clone(),
heaviest_forks: HashMap::new(),
block_stake_map,
active_peers,
active_peers_seen_supermajority: HashSet::new(),
}
}

pub(crate) fn aggregate_from_record(
&mut self,
key_string: &str,
record: &HeaviestForkRecord,
) -> Result<HeaviestForkAggregateResult> {
let from = Pubkey::from_str(key_string)?;
let from = Pubkey::from_str(&record.from)?;
let bankhash = Hash::from_str(&record.bankhash)?;
let restart_heaviest_fork = RestartHeaviestFork {
from,
Expand All @@ -100,7 +87,6 @@ impl HeaviestForkAggregate {
}
if current_heaviest_fork == new_heaviest_fork
|| current_heaviest_fork.wallclock > new_heaviest_fork.wallclock
|| current_heaviest_fork.observed_stake == new_heaviest_fork.observed_stake
{
return HeaviestForkAggregateResult::AlreadyExists;
}
Expand All @@ -110,14 +96,14 @@ impl HeaviestForkAggregate {
total_active_stake: new_heaviest_fork.observed_stake,
shred_version: new_heaviest_fork.shred_version as u32,
wallclock: new_heaviest_fork.wallclock,
from: new_heaviest_fork.from.to_string(),
})
}

pub(crate) fn aggregate(
&mut self,
received_heaviest_fork: RestartHeaviestFork,
) -> HeaviestForkAggregateResult {
let total_stake = self.epoch_stakes.total_stake();
let from = &received_heaviest_fork.from;
let sender_stake = self.epoch_stakes.node_id_to_stake(from).unwrap_or(0);
if sender_stake == 0 {
Expand Down Expand Up @@ -161,22 +147,11 @@ impl HeaviestForkAggregate {
total_active_stake: received_heaviest_fork.observed_stake,
shred_version: received_heaviest_fork.shred_version as u32,
wallclock: received_heaviest_fork.wallclock,
from: from.to_string(),
})
};
self.heaviest_forks
.insert(*from, received_heaviest_fork.clone());
if received_heaviest_fork.observed_stake as f64 / total_stake as f64
>= self.supermajority_threshold
{
self.active_peers_seen_supermajority.insert(*from);
}
if !self
.active_peers_seen_supermajority
.contains(&self.my_pubkey)
&& self.total_active_stake() as f64 / total_stake as f64 >= self.supermajority_threshold
{
self.active_peers_seen_supermajority.insert(self.my_pubkey);
}
result
}

Expand All @@ -186,14 +161,6 @@ impl HeaviestForkAggregate {
})
}

pub(crate) fn total_active_stake_seen_supermajority(&self) -> u64 {
self.active_peers_seen_supermajority
.iter()
.fold(0, |sum: u64, pubkey| {
sum.saturating_add(self.epoch_stakes.node_id_to_stake(pubkey).unwrap_or(0))
})
}

pub(crate) fn block_stake_map(self) -> HashMap<(Slot, Hash), u64> {
self.block_stake_map
}
Expand Down Expand Up @@ -244,7 +211,6 @@ mod tests {
let heaviest_hash = Hash::new_unique();
TestAggregateInitResult {
heaviest_fork_aggregate: HeaviestForkAggregate::new(
75,
SHRED_VERSION,
root_bank.epoch_stakes(root_bank.epoch()).unwrap(),
heaviest_slot,
Expand Down Expand Up @@ -285,6 +251,7 @@ mod tests {
total_active_stake: 100,
shred_version: SHRED_VERSION as u32,
wallclock: timestamp1,
from: pubkey.to_string(),
}),
);
}
Expand Down Expand Up @@ -316,6 +283,7 @@ mod tests {
total_active_stake: 100,
shred_version: SHRED_VERSION as u32,
wallclock: now,
from: new_active_validator.to_string(),
}),
);
let expected_total_active_stake = (initial_num_active_validators + 2) as u64 * 100;
Expand Down Expand Up @@ -399,19 +367,14 @@ mod tests {
total_active_stake: 1400,
shred_version: SHRED_VERSION as u32,
wallclock: now,
from: pubkey.to_string(),
}),
);
}
assert_eq!(
test_state.heaviest_fork_aggregate.total_active_stake(),
1400
);
assert_eq!(
test_state
.heaviest_fork_aggregate
.total_active_stake_seen_supermajority(),
0
);

// test that when 75% of the stake is seeing supermajority,
// the active percent seeing supermajority is 75%.
Expand All @@ -435,6 +398,7 @@ mod tests {
total_active_stake: 1500,
shred_version: SHRED_VERSION as u32,
wallclock: now,
from: pubkey.to_string(),
}),
);
}
Expand All @@ -443,14 +407,6 @@ mod tests {
test_state.heaviest_fork_aggregate.total_active_stake(),
1500
);
// I myself is seeing supermajority as well, with the 14 validators
// reporting 70%, the total active stake seeing supermajority is 1500 (75%).
assert_eq!(
test_state
.heaviest_fork_aggregate
.total_active_stake_seen_supermajority(),
1500
);

// test that message from my pubkey is ignored.
assert_eq!(
Expand Down Expand Up @@ -483,12 +439,13 @@ mod tests {
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: from.to_string(),
};
assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 100);
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(&from.to_string(), &record,)
.aggregate_from_record(&record)
.unwrap(),
HeaviestForkAggregateResult::Inserted(record.clone()),
);
Expand Down Expand Up @@ -528,6 +485,7 @@ mod tests {
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 200,
from: from.to_string(),
}),
);

Expand Down Expand Up @@ -576,42 +534,39 @@ mod tests {
assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 200);

// Record from validator with zero stake should be ignored.
let zero_stake_validator = Pubkey::new_unique();
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&Pubkey::new_unique().to_string(),
&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
}
)
.aggregate_from_record(&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: zero_stake_validator.to_string(),
})
.unwrap(),
HeaviestForkAggregateResult::ZeroStakeIgnored,
);
// percentage doesn't change since the previous aggregate is ignored.
assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 200);

// Record from my pubkey should be ignored.
let my_pubkey = test_state.validator_voting_keypairs[MY_INDEX]
.node_keypair
.pubkey();
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[MY_INDEX]
.node_keypair
.pubkey()
.to_string(),
&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
}
)
.aggregate_from_record(&HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: my_pubkey.to_string(),
})
.unwrap(),
HeaviestForkAggregateResult::AlreadyExists,
);
Expand All @@ -620,46 +575,40 @@ mod tests {
#[test]
fn test_aggregate_from_record_failures() {
let mut test_state = test_aggregate_init();
let from = test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey();
let mut heaviest_fork_record = HeaviestForkRecord {
wallclock: timestamp(),
slot: test_state.heaviest_slot,
bankhash: test_state.heaviest_hash.to_string(),
shred_version: SHRED_VERSION as u32,
total_active_stake: 100,
from: from.to_string(),
};
// First test that this is a valid record.
assert_eq!(
test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&heaviest_fork_record,
)
.aggregate_from_record(&heaviest_fork_record,)
.unwrap(),
HeaviestForkAggregateResult::Inserted(heaviest_fork_record.clone()),
);
// Then test that it fails if the record is invalid.

// Invalid pubkey.
heaviest_fork_record.from = "invalid_pubkey".to_string();
assert!(test_state
.heaviest_fork_aggregate
.aggregate_from_record("invalid_pubkey", &heaviest_fork_record,)
.aggregate_from_record(&heaviest_fork_record,)
.is_err());

// Invalid hash.
heaviest_fork_record.from = from.to_string();
heaviest_fork_record.bankhash.clear();
assert!(test_state
.heaviest_fork_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&heaviest_fork_record,
)
.aggregate_from_record(&heaviest_fork_record,)
.is_err());
}
}
Loading
Loading