Skip to content

Commit

Permalink
[qs] batch fetcher with dedup
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jan 17, 2025
1 parent 1d8460a commit 580c794
Show file tree
Hide file tree
Showing 16 changed files with 457 additions and 608 deletions.
44 changes: 2 additions & 42 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use aptos_crypto::{
HashValue,
};
use aptos_crypto_derive::CryptoHasher;
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::{
account_address::AccountAddress, transaction::SignedTransaction,
Expand All @@ -25,10 +23,8 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fmt::{self, Write},
sync::Arc,
u64,
};
use tokio::sync::oneshot;

/// The round of a block is a consensus-internal counter, which starts with 0 and increases
/// monotonically. It is used for the protocol safety and liveness (please see the detailed
Expand Down Expand Up @@ -127,51 +123,22 @@ pub struct RejectedTransactionSummary {
pub reason: DiscardedVMStatus,
}

#[derive(Debug)]
pub enum DataStatus {
Cached(Vec<SignedTransaction>),
Requested(
Vec<(
HashValue,
oneshot::Receiver<ExecutorResult<Vec<SignedTransaction>>>,
)>,
),
}

impl DataStatus {
pub fn extend(&mut self, other: DataStatus) {
match (self, other) {
(DataStatus::Requested(v1), DataStatus::Requested(v2)) => v1.extend(v2),
(_, _) => unreachable!(),
}
}

pub fn take(&mut self) -> DataStatus {
std::mem::replace(self, DataStatus::Requested(vec![]))
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct ProofWithData {
pub proofs: Vec<ProofOfStore>,
#[serde(skip)]
pub status: Arc<Mutex<Option<DataStatus>>>,
}

impl PartialEq for ProofWithData {
fn eq(&self, other: &Self) -> bool {
self.proofs == other.proofs && Arc::as_ptr(&self.status) == Arc::as_ptr(&other.status)
self.proofs == other.proofs
}
}

impl Eq for ProofWithData {}

impl ProofWithData {
pub fn new(proofs: Vec<ProofOfStore>) -> Self {
Self {
proofs,
status: Arc::new(Mutex::new(None)),
}
Self { proofs }
}

pub fn empty() -> Self {
Expand All @@ -180,14 +147,7 @@ impl ProofWithData {

#[allow(clippy::unwrap_used)]
pub fn extend(&mut self, other: ProofWithData) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
self.proofs.extend(other.proofs);
let mut status = self.status.lock();
if status.is_none() {
*status = Some(other_data_status);
} else {
status.as_mut().unwrap().extend(other_data_status);
}
}

pub fn len(&self) -> usize {
Expand Down
47 changes: 0 additions & 47 deletions consensus/consensus-types/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,12 @@

use crate::proof_of_store::{BatchInfo, ProofOfStore};
use anyhow::ensure;
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
use aptos_types::{transaction::SignedTransaction, PeerId};
use core::fmt;
use futures::{
future::{BoxFuture, Shared},
FutureExt,
};
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};

pub type OptBatches = BatchPointer<BatchInfo>;
Expand All @@ -32,37 +25,9 @@ pub trait TDataInfo {
fn signers(&self, ordered_authors: &[PeerId]) -> Vec<PeerId>;
}

pub struct DataFetchFut {
pub iteration: u32,
pub responders: Vec<Arc<Mutex<Vec<PeerId>>>>,
pub fut: Shared<BoxFuture<'static, ExecutorResult<Vec<SignedTransaction>>>>,
}

impl fmt::Debug for DataFetchFut {
fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl DataFetchFut {
pub fn extend(&mut self, other: DataFetchFut) {
let self_fut = self.fut.clone();
self.fut = async move {
let result1 = self_fut.await?;
let result2 = other.fut.await?;
let result = [result1, result2].concat();
Ok(result)
}
.boxed()
.shared();
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct BatchPointer<T> {
pub batch_summary: Vec<T>,
#[serde(skip)]
pub data_fut: Arc<Mutex<Option<DataFetchFut>>>,
}

impl<T> BatchPointer<T>
Expand All @@ -72,21 +37,11 @@ where
pub fn new(metadata: Vec<T>) -> Self {
Self {
batch_summary: metadata,
data_fut: Arc::new(Mutex::new(None)),
}
}

pub fn extend(&mut self, other: BatchPointer<T>) {
let other_data_status = other.data_fut.lock().take().expect("must be initialized");
self.batch_summary.extend(other.batch_summary);
let mut status = self.data_fut.lock();
*status = match &mut *status {
None => Some(other_data_status),
Some(status) => {
status.extend(other_data_status);
return;
},
};
}

pub fn num_txns(&self) -> usize {
Expand Down Expand Up @@ -115,15 +70,13 @@ where
fn from(value: Vec<T>) -> Self {
Self {
batch_summary: value,
data_fut: Arc::new(Mutex::new(None)),
}
}
}

impl<T: PartialEq> PartialEq for BatchPointer<T> {
fn eq(&self, other: &Self) -> bool {
self.batch_summary == other.batch_summary
&& Arc::as_ptr(&self.data_fut) == Arc::as_ptr(&other.data_fut)
}
}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,11 @@ impl BlockStore {
);

if let Some(payload) = block.payload() {
self.payload_manager
.prefetch_payload_data(payload, block.timestamp_usecs());
self.payload_manager.prefetch_payload_data(
payload,
block.author().expect("Payload block must have author"),
block.timestamp_usecs(),
);
}

let pipelined_block = PipelinedBlock::new_ordered(block.clone());
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ impl BlockStore {
for (i, block) in blocks.iter().enumerate() {
assert_eq!(block.id(), quorum_certs[i].certified_block().id());
if let Some(payload) = block.payload() {
payload_manager.prefetch_payload_data(payload, block.timestamp_usecs());
payload_manager.prefetch_payload_data(
payload,
block.author().expect("payload block must have author"),
block.timestamp_usecs(),
);
}
}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,11 @@ impl DagStore {
self.storage.save_certified_node(&node)?;

debug!("Added node {}", node.id());
self.payload_manager
.prefetch_payload_data(node.payload(), node.metadata().timestamp());
self.payload_manager.prefetch_payload_data(
node.payload(),
*node.author(),
node.metadata().timestamp(),
);

self.dag.write().add_validated_node(node)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(super) struct MockPayloadManager {}

#[async_trait]
impl TPayloadManager for MockPayloadManager {
fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}
fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) {}

fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,8 +1646,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
proposal_event @ VerifiedEvent::ProposalMsg(_) => {
if let VerifiedEvent::ProposalMsg(p) = &proposal_event {
if let Some(payload) = p.proposal().payload() {
payload_manager
.prefetch_payload_data(payload, p.proposal().timestamp_usecs());
payload_manager.prefetch_payload_data(
payload,
p.proposer(),
p.proposal().timestamp_usecs(),
);
}
pending_blocks.lock().insert_block(p.proposal().clone());
}
Expand Down
115 changes: 115 additions & 0 deletions consensus/src/payload_manager/co_payload_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
consensus_observer::{
network::observer_message::ConsensusObserverMessage,
observer::payload_store::BlockPayloadStatus,
publisher::consensus_publisher::ConsensusPublisher,
},
payload_manager::TPayloadManager,
};
use aptos_bitvec::BitVec;
use aptos_consensus_types::{
block::Block,
common::{Author, Payload, Round},
};
use aptos_crypto::HashValue;
use aptos_executor_types::{ExecutorError::InternalError, *};
use aptos_infallible::Mutex;
use aptos_types::transaction::SignedTransaction;
use async_trait::async_trait;
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};

/// Returns the transactions for the consensus observer payload manager
async fn get_transactions_for_observer(
block: &Block,
block_payloads: &Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: &Option<Arc<ConsensusPublisher>>,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
// The data should already be available (as consensus observer will only ever
// forward a block to the executor once the data has been received and verified).
let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) {
Entry::Occupied(mut value) => match value.get_mut() {
BlockPayloadStatus::AvailableAndVerified(block_payload) => block_payload.clone(),
BlockPayloadStatus::AvailableAndUnverified(_) => {
// This shouldn't happen (the payload should already be verified)
let error = format!(
"Payload data for block epoch {}, round {} is unverified!",
block.epoch(),
block.round()
);
return Err(InternalError { error });
},
},
Entry::Vacant(_) => {
// This shouldn't happen (the payload should already be present)
let error = format!(
"Missing payload data for block epoch {}, round {}!",
block.epoch(),
block.round()
);
return Err(InternalError { error });
},
};

// If the payload is valid, publish it to any downstream observers
let transaction_payload = block_payload.transaction_payload();
if let Some(consensus_publisher) = consensus_publisher {
let message = ConsensusObserverMessage::new_block_payload_message(
block.gen_block_info(HashValue::zero(), 0, None),
transaction_payload.clone(),
);
consensus_publisher.publish_message(message);
}

// Return the transactions and the transaction limit
Ok((
transaction_payload.transactions(),
transaction_payload.limit(),
))
}

pub struct ConsensusObserverPayloadManager {
txns_pool: Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
}

impl ConsensusObserverPayloadManager {
pub fn new(
txns_pool: Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
) -> Self {
Self {
txns_pool,
consensus_publisher,
}
}
}

#[async_trait]
impl TPayloadManager for ConsensusObserverPayloadManager {
fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {
// noop
}

fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) {
// noop
}

fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> {
unreachable!("this method isn't used in ConsensusObserver")
}

async fn get_transactions(
&self,
block: &Block,
_block_signers: Option<BitVec>,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher)
.await;
}
}
Loading

0 comments on commit 580c794

Please sign in to comment.