Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[qs] dedup requests to batch fetcher and simplify payload manager #15766

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 2 additions & 42 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use aptos_crypto::{
HashValue,
};
use aptos_crypto_derive::CryptoHasher;
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::{
account_address::AccountAddress, transaction::SignedTransaction,
Expand All @@ -25,10 +23,8 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fmt::{self, Write},
sync::Arc,
u64,
};
use tokio::sync::oneshot;

/// The round of a block is a consensus-internal counter, which starts with 0 and increases
/// monotonically. It is used for the protocol safety and liveness (please see the detailed
Expand Down Expand Up @@ -127,51 +123,22 @@ pub struct RejectedTransactionSummary {
pub reason: DiscardedVMStatus,
}

#[derive(Debug)]
pub enum DataStatus {
Cached(Vec<SignedTransaction>),
Requested(
Vec<(
HashValue,
oneshot::Receiver<ExecutorResult<Vec<SignedTransaction>>>,
)>,
),
}

impl DataStatus {
pub fn extend(&mut self, other: DataStatus) {
match (self, other) {
(DataStatus::Requested(v1), DataStatus::Requested(v2)) => v1.extend(v2),
(_, _) => unreachable!(),
}
}

pub fn take(&mut self) -> DataStatus {
std::mem::replace(self, DataStatus::Requested(vec![]))
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct ProofWithData {
pub proofs: Vec<ProofOfStore>,
#[serde(skip)]
pub status: Arc<Mutex<Option<DataStatus>>>,
}

impl PartialEq for ProofWithData {
fn eq(&self, other: &Self) -> bool {
self.proofs == other.proofs && Arc::as_ptr(&self.status) == Arc::as_ptr(&other.status)
self.proofs == other.proofs
}
}

impl Eq for ProofWithData {}

impl ProofWithData {
pub fn new(proofs: Vec<ProofOfStore>) -> Self {
Self {
proofs,
status: Arc::new(Mutex::new(None)),
}
Self { proofs }
}

pub fn empty() -> Self {
Expand All @@ -180,14 +147,7 @@ impl ProofWithData {

#[allow(clippy::unwrap_used)]
pub fn extend(&mut self, other: ProofWithData) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
self.proofs.extend(other.proofs);
let mut status = self.status.lock();
if status.is_none() {
*status = Some(other_data_status);
} else {
status.as_mut().unwrap().extend(other_data_status);
}
}

pub fn len(&self) -> usize {
Expand Down
47 changes: 0 additions & 47 deletions consensus/consensus-types/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,12 @@

use crate::proof_of_store::{BatchInfo, ProofOfStore};
use anyhow::ensure;
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
use aptos_types::{transaction::SignedTransaction, PeerId};
use core::fmt;
use futures::{
future::{BoxFuture, Shared},
FutureExt,
};
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};

pub type OptBatches = BatchPointer<BatchInfo>;
Expand All @@ -32,37 +25,9 @@ pub trait TDataInfo {
fn signers(&self, ordered_authors: &[PeerId]) -> Vec<PeerId>;
}

pub struct DataFetchFut {
pub iteration: u32,
pub responders: Vec<Arc<Mutex<Vec<PeerId>>>>,
pub fut: Shared<BoxFuture<'static, ExecutorResult<Vec<SignedTransaction>>>>,
}

impl fmt::Debug for DataFetchFut {
fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl DataFetchFut {
pub fn extend(&mut self, other: DataFetchFut) {
let self_fut = self.fut.clone();
self.fut = async move {
let result1 = self_fut.await?;
let result2 = other.fut.await?;
let result = [result1, result2].concat();
Ok(result)
}
.boxed()
.shared();
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct BatchPointer<T> {
pub batch_summary: Vec<T>,
#[serde(skip)]
pub data_fut: Arc<Mutex<Option<DataFetchFut>>>,
}

impl<T> BatchPointer<T>
Expand All @@ -72,21 +37,11 @@ where
pub fn new(metadata: Vec<T>) -> Self {
Self {
batch_summary: metadata,
data_fut: Arc::new(Mutex::new(None)),
}
}

pub fn extend(&mut self, other: BatchPointer<T>) {
let other_data_status = other.data_fut.lock().take().expect("must be initialized");
self.batch_summary.extend(other.batch_summary);
let mut status = self.data_fut.lock();
*status = match &mut *status {
None => Some(other_data_status),
Some(status) => {
status.extend(other_data_status);
return;
},
};
}

pub fn num_txns(&self) -> usize {
Expand Down Expand Up @@ -115,15 +70,13 @@ where
fn from(value: Vec<T>) -> Self {
Self {
batch_summary: value,
data_fut: Arc::new(Mutex::new(None)),
}
}
}

impl<T: PartialEq> PartialEq for BatchPointer<T> {
fn eq(&self, other: &Self) -> bool {
self.batch_summary == other.batch_summary
&& Arc::as_ptr(&self.data_fut) == Arc::as_ptr(&other.data_fut)
}
}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,11 @@ impl BlockStore {
);

if let Some(payload) = block.payload() {
self.payload_manager
.prefetch_payload_data(payload, block.timestamp_usecs());
self.payload_manager.prefetch_payload_data(
payload,
block.author().expect("Payload block must have author"),
block.timestamp_usecs(),
);
}

let pipelined_block = PipelinedBlock::new_ordered(block.clone());
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ impl BlockStore {
for (i, block) in blocks.iter().enumerate() {
assert_eq!(block.id(), quorum_certs[i].certified_block().id());
if let Some(payload) = block.payload() {
payload_manager.prefetch_payload_data(payload, block.timestamp_usecs());
payload_manager.prefetch_payload_data(
payload,
block.author().expect("payload block must have author"),
block.timestamp_usecs(),
);
}
}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,11 @@ impl DagStore {
self.storage.save_certified_node(&node)?;

debug!("Added node {}", node.id());
self.payload_manager
.prefetch_payload_data(node.payload(), node.metadata().timestamp());
self.payload_manager.prefetch_payload_data(
node.payload(),
*node.author(),
node.metadata().timestamp(),
);

self.dag.write().add_validated_node(node)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(super) struct MockPayloadManager {}

#[async_trait]
impl TPayloadManager for MockPayloadManager {
fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}
fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) {}

fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,8 +1646,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
proposal_event @ VerifiedEvent::ProposalMsg(_) => {
if let VerifiedEvent::ProposalMsg(p) = &proposal_event {
if let Some(payload) = p.proposal().payload() {
payload_manager
.prefetch_payload_data(payload, p.proposal().timestamp_usecs());
payload_manager.prefetch_payload_data(
payload,
p.proposer(),
p.proposal().timestamp_usecs(),
);
}
pending_blocks.lock().insert_block(p.proposal().clone());
}
Expand Down
115 changes: 115 additions & 0 deletions consensus/src/payload_manager/co_payload_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
consensus_observer::{
network::observer_message::ConsensusObserverMessage,
observer::payload_store::BlockPayloadStatus,
publisher::consensus_publisher::ConsensusPublisher,
},
payload_manager::TPayloadManager,
};
use aptos_bitvec::BitVec;
use aptos_consensus_types::{
block::Block,
common::{Author, Payload, Round},
};
use aptos_crypto::HashValue;
use aptos_executor_types::{ExecutorError::InternalError, *};
use aptos_infallible::Mutex;
use aptos_types::transaction::SignedTransaction;
use async_trait::async_trait;
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};

/// Returns the transactions for the consensus observer payload manager
async fn get_transactions_for_observer(
block: &Block,
block_payloads: &Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: &Option<Arc<ConsensusPublisher>>,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
// The data should already be available (as consensus observer will only ever
// forward a block to the executor once the data has been received and verified).
let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) {
Entry::Occupied(mut value) => match value.get_mut() {
BlockPayloadStatus::AvailableAndVerified(block_payload) => block_payload.clone(),
BlockPayloadStatus::AvailableAndUnverified(_) => {
// This shouldn't happen (the payload should already be verified)
let error = format!(
"Payload data for block epoch {}, round {} is unverified!",
block.epoch(),
block.round()
);
return Err(InternalError { error });
},
},
Entry::Vacant(_) => {
// This shouldn't happen (the payload should already be present)
let error = format!(
"Missing payload data for block epoch {}, round {}!",
block.epoch(),
block.round()
);
return Err(InternalError { error });
},
};

// If the payload is valid, publish it to any downstream observers
let transaction_payload = block_payload.transaction_payload();
if let Some(consensus_publisher) = consensus_publisher {
let message = ConsensusObserverMessage::new_block_payload_message(
block.gen_block_info(HashValue::zero(), 0, None),
transaction_payload.clone(),
);
consensus_publisher.publish_message(message);
}

// Return the transactions and the transaction limit
Ok((
transaction_payload.transactions(),
transaction_payload.limit(),
))
}

pub struct ConsensusObserverPayloadManager {
txns_pool: Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
}

impl ConsensusObserverPayloadManager {
pub fn new(
txns_pool: Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
) -> Self {
Self {
txns_pool,
consensus_publisher,
}
}
}

#[async_trait]
impl TPayloadManager for ConsensusObserverPayloadManager {
fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {
// noop
}

fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) {
// noop
}

fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> {
unreachable!("this method isn't used in ConsensusObserver")
}

async fn get_transactions(
&self,
block: &Block,
_block_signers: Option<BitVec>,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher)
.await;
}
}
Loading
Loading