Skip to content

Commit

Permalink
Form the second QC at the end of an epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszrzasik committed Nov 27, 2024
1 parent 8d399ca commit b632b9f
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 64 deletions.
1 change: 1 addition & 0 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
network: Arc::clone(&handle.hotshot.network),
membership: (*handle.hotshot.memberships).clone().into(),
vote_collectors: BTreeMap::default(),
next_epoch_vote_collectors: BTreeMap::default(),
timeout_vote_collectors: BTreeMap::default(),
cur_view: handle.cur_view().await,
cur_view_time: Utc::now().timestamp(),
Expand Down
46 changes: 35 additions & 11 deletions crates/task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,30 @@

use std::{sync::Arc, time::Duration};

use super::ConsensusTaskState;
use crate::{
consensus::Versions, events::HotShotEvent, helpers::broadcast_event,
vote_collection::handle_vote,
};
use async_broadcast::Sender;
use chrono::Utc;
use hotshot_types::simple_vote::HasEpoch;
use hotshot_types::vote::Vote;
use hotshot_types::{
event::{Event, EventType},
simple_vote::{QuorumVote2, TimeoutData, TimeoutVote},
traits::{
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
},
utils::EpochTransitionIndicator,
vote::HasViewNumber,
};
use tokio::{spawn, time::sleep};
use tracing::instrument;
use utils::anytrace::*;
use vbs::version::StaticVersionType;

use super::ConsensusTaskState;
use crate::{
consensus::Versions, events::HotShotEvent, helpers::broadcast_event,
vote_collection::handle_vote,
};

/// Handle a `QuorumVoteRecv` event.
pub(crate) async fn handle_quorum_vote_recv<
TYPES: NodeType,
Expand All @@ -46,7 +48,7 @@ pub(crate) async fn handle_quorum_vote_recv<
.is_high_qc_for_last_block();
let we_are_leader = task_state
.membership
.leader(vote.view_number() + 1, task_state.cur_epoch)?
.leader(vote.view_number() + 1, vote.data.epoch)?
== task_state.public_key;
ensure!(
in_transition || we_are_leader,
Expand All @@ -56,20 +58,43 @@ pub(crate) async fn handle_quorum_vote_recv<
)
);

let transition_indicator = if in_transition {
EpochTransitionIndicator::InTransition
} else {
EpochTransitionIndicator::NotInTransition
};
handle_vote(
&mut task_state.vote_collectors,
vote,
task_state.public_key.clone(),
&task_state.membership,
task_state.cur_epoch,
task_state.id,
&event,
sender,
&task_state.upgrade_lock,
!in_transition,
transition_indicator.clone(),
)
.await?;

// If the vote sender belongs to the next epoch, collect it separately to form the second QC
if task_state
.membership
.has_stake(&vote.signing_key(), vote.epoch() + 1)
{
handle_vote(
&mut task_state.next_epoch_vote_collectors,
&vote.clone().into(),
task_state.public_key.clone(),
&task_state.membership,
task_state.id,
&event,
sender,
&task_state.upgrade_lock,
transition_indicator,
)
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -101,12 +126,11 @@ pub(crate) async fn handle_timeout_vote_recv<
vote,
task_state.public_key.clone(),
&task_state.membership,
task_state.cur_epoch,
task_state.id,
&event,
sender,
&task_state.upgrade_lock,
true,
EpochTransitionIndicator::NotInTransition,
)
.await?;

Expand Down
10 changes: 10 additions & 0 deletions crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use async_broadcast::{Receiver, Sender};
use async_trait::async_trait;
use either::Either;
use hotshot_task::task::TaskState;
use hotshot_types::simple_certificate::NextEpochQuorumCertificate2;
use hotshot_types::simple_vote::NextEpochQuorumVote2;
use hotshot_types::{
consensus::OuterConsensus,
event::Event,
Expand Down Expand Up @@ -55,6 +57,14 @@ pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
/// A map of `QuorumVote` collector tasks.
pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>,

/// A map of `QuorumVote` collector tasks. They collect votes from the nodes in the next epoch.
pub next_epoch_vote_collectors: VoteCollectorsMap<
TYPES,
NextEpochQuorumVote2<TYPES>,
NextEpochQuorumCertificate2<TYPES>,
V,
>,

/// A map of `TimeoutVote` collector tasks.
pub timeout_vote_collectors:
VoteCollectorsMap<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>, V>,
Expand Down
4 changes: 2 additions & 2 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::utils::EpochTransitionIndicator;
use hotshot_types::{
consensus::{Consensus, OuterConsensus},
data::{DaProposal, PackedBundle},
Expand Down Expand Up @@ -277,12 +278,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
vote,
self.public_key.clone(),
&self.membership,
epoch,
self.id,
&event,
&event_stream,
&self.upgrade_lock,
true,
EpochTransitionIndicator::NotInTransition,
)
.await?;
}
Expand Down
22 changes: 18 additions & 4 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

use std::fmt::Display;

use crate::view_sync::ViewSyncPhase;
use async_broadcast::Sender;
use either::Either;
use hotshot_task::task::TaskEvent;
use hotshot_types::simple_certificate::NextEpochQuorumCertificate2;
use hotshot_types::{
data::{
DaProposal, Leaf2, PackedBundle, QuorumProposal2, UpgradeProposal, VidDisperse,
Expand All @@ -35,8 +37,6 @@ use hotshot_types::{
};
use vec1::Vec1;

use crate::view_sync::ViewSyncPhase;

impl<TYPES: NodeType> TaskEvent for HotShotEvent<TYPES> {
fn shutdown_event() -> Self {
HotShotEvent::Shutdown
Expand Down Expand Up @@ -124,6 +124,8 @@ pub enum HotShotEvent<TYPES: NodeType> {
QcFormed(Either<QuorumCertificate<TYPES>, TimeoutCertificate<TYPES>>),
/// The next leader has collected enough votes to form a QC; emitted by the next leader in the consensus task; an internal event only
Qc2Formed(Either<QuorumCertificate2<TYPES>, TimeoutCertificate<TYPES>>),
/// The next leader has collected enough votes from the next epoch nodes to form a QC; emitted by the next leader in the consensus task; an internal event only
NextEpochQc2Formed(Either<NextEpochQuorumCertificate2<TYPES>, TimeoutCertificate<TYPES>>),
/// The DA leader has collected enough votes to form a DAC; emitted by the DA leader in the DA task; sent to the entire network via the networking task
DacSend(DaCertificate<TYPES>, TYPES::SignatureKey),
/// The current view has changed; emitted by the replica in the consensus task or replica in the view sync task; received by almost all other tasks
Expand Down Expand Up @@ -279,6 +281,10 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
either::Left(qc) => Some(qc.view_number()),
either::Right(tc) => Some(tc.view_number()),
},
HotShotEvent::NextEpochQc2Formed(cert) => match cert {
either::Left(qc) => Some(qc.view_number()),
either::Right(tc) => Some(tc.view_number()),
},
HotShotEvent::ViewSyncCommitVoteSend(vote)
| HotShotEvent::ViewSyncCommitVoteRecv(vote) => Some(vote.view_number()),
HotShotEvent::ViewSyncPreCommitVoteRecv(vote)
Expand Down Expand Up @@ -402,8 +408,16 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
either::Right(tc) => write!(f, "QcFormed(view_number={:?})", tc.view_number()),
},
HotShotEvent::Qc2Formed(cert) => match cert {
either::Left(qc) => write!(f, "QcFormed(view_number={:?})", qc.view_number()),
either::Right(tc) => write!(f, "QcFormed(view_number={:?})", tc.view_number()),
either::Left(qc) => write!(f, "QcFormed2(view_number={:?})", qc.view_number()),
either::Right(tc) => write!(f, "QcFormed2(view_number={:?})", tc.view_number()),
},
HotShotEvent::NextEpochQc2Formed(cert) => match cert {
either::Left(qc) => {
write!(f, "NextEpochQc2Formed(view_number={:?})", qc.view_number())
}
either::Right(tc) => {
write!(f, "NextEpochQc2Formed(view_number={:?})", tc.view_number())
}
},
HotShotEvent::DacSend(cert, _) => {
write!(f, "DacSend(view_number={:?})", cert.view_number())
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I
.await
}

/// Returns true if we got the data we wanted, a shutdown even was received, or the view has moved on.
/// Returns true if we got the data we wanted, a shutdown event was received, or the view has moved on.
async fn cancel_vid_request_task(
consensus: &OuterConsensus<TYPES>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
Expand Down
15 changes: 7 additions & 8 deletions crates/task-impls/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@

use std::{marker::PhantomData, sync::Arc, time::SystemTime};

use crate::{
events::HotShotEvent,
helpers::broadcast_event,
vote_collection::{handle_vote, VoteCollectorsMap},
};
use async_broadcast::{Receiver, Sender};
use async_trait::async_trait;
use committable::Committable;
use hotshot_task::task::TaskState;
use hotshot_types::utils::EpochTransitionIndicator;
use hotshot_types::{
constants::{
UPGRADE_BEGIN_OFFSET, UPGRADE_DECIDE_BY_OFFSET, UPGRADE_FINISH_OFFSET,
Expand All @@ -31,12 +37,6 @@ use tracing::instrument;
use utils::anytrace::*;
use vbs::version::StaticVersionType;

use crate::{
events::HotShotEvent,
helpers::broadcast_event,
vote_collection::{handle_vote, VoteCollectorsMap},
};

/// Tracks state of an upgrade task
pub struct UpgradeTaskState<TYPES: NodeType, V: Versions> {
/// Output events to application
Expand Down Expand Up @@ -238,12 +238,11 @@ impl<TYPES: NodeType, V: Versions> UpgradeTaskState<TYPES, V> {
vote,
self.public_key.clone(),
&self.quorum_membership,
self.cur_epoch,
self.id,
&event,
&tx,
&self.upgrade_lock,
true,
EpochTransitionIndicator::NotInTransition,
)
.await?;
}
Expand Down
10 changes: 4 additions & 6 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::utils::EpochTransitionIndicator;
use hotshot_types::{
message::{GeneralConsensusMessage, UpgradeLock},
simple_certificate::{
Expand Down Expand Up @@ -318,15 +319,14 @@ impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
public_key: self.public_key.clone(),
membership: Arc::clone(&self.membership),
view: vote_view,
epoch: self.cur_epoch,
id: self.id,
};
let vote_collector = create_vote_accumulator(
&info,
event,
&event_stream,
self.upgrade_lock.clone(),
true,
EpochTransitionIndicator::NotInTransition,
)
.await?;

Expand Down Expand Up @@ -363,7 +363,6 @@ impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
public_key: self.public_key.clone(),
membership: Arc::clone(&self.membership),
view: vote_view,
epoch: self.cur_epoch,
id: self.id,
};

Expand All @@ -372,7 +371,7 @@ impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
event,
&event_stream,
self.upgrade_lock.clone(),
true,
EpochTransitionIndicator::NotInTransition,
)
.await?;
relay_map.insert(relay, vote_collector);
Expand Down Expand Up @@ -408,15 +407,14 @@ impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
public_key: self.public_key.clone(),
membership: Arc::clone(&self.membership),
view: vote_view,
epoch: self.cur_epoch,
id: self.id,
};
let vote_collector = create_vote_accumulator(
&info,
event,
&event_stream,
self.upgrade_lock.clone(),
true,
EpochTransitionIndicator::NotInTransition,
)
.await;
if let Ok(vote_task) = vote_collector {
Expand Down
Loading

0 comments on commit b632b9f

Please sign in to comment.