Skip to content

Commit

Permalink
runtime: Remove incorrectly implemented abort requests
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Nov 13, 2023
1 parent a196009 commit e022fa9
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 28 deletions.
26 changes: 2 additions & 24 deletions runtime/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
//! Runtime call dispatcher.
use std::{
convert::TryInto,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Condvar, Mutex,
},
sync::{Arc, Condvar, Mutex},
thread,
};

Expand Down Expand Up @@ -150,15 +147,13 @@ struct State {
#[derive(Debug)]
enum Command {
Request(u64, Body),
Abort(mpsc::Sender<()>),
}

/// Runtime call dispatcher.
pub struct Dispatcher {
logger: Logger,
queue_tx: mpsc::Sender<Command>,
identity: Arc<Identity>,
abort_batch: Arc<AtomicBool>,

state: Mutex<Option<ProtocolState>>,
state_cond: Condvar,
Expand All @@ -179,7 +174,6 @@ impl Dispatcher {
logger: get_logger("runtime/dispatcher"),
queue_tx: tx,
identity,
abort_batch: Arc::new(AtomicBool::new(false)),
state: Mutex::new(None),
state_cond: Condvar::new(),
tokio_runtime,
Expand Down Expand Up @@ -212,17 +206,6 @@ impl Dispatcher {
Ok(())
}

/// Signals to dispatcher that it should abort and waits for the abort to
/// complete.
pub fn abort_and_wait(&self) -> AnyResult<()> {
self.abort_batch.store(true, Ordering::SeqCst);
// Queue an abort command and wait for it to be processed.
let (tx, mut rx) = mpsc::channel(1);
self.queue_tx.blocking_send(Command::Abort(tx))?;
rx.blocking_recv();
Ok(())
}

fn run(self: &Arc<Self>, initializer: Box<dyn Initializer>, mut rx: mpsc::Receiver<Command>) {
// Wait for the state to be available.
let ProtocolState {
Expand All @@ -249,10 +232,9 @@ impl Dispatcher {
consensus_verifier: &consensus_verifier,
};
let post_init_state = initializer.init(pre_init_state);
let mut txn_dispatcher = post_init_state
let txn_dispatcher = post_init_state
.txn_dispatcher
.unwrap_or_else(|| Box::<TxnNoopDispatcher>::default());
txn_dispatcher.set_abort_batch_flag(self.abort_batch.clone());

let state = State {
protocol: protocol.clone(),
Expand Down Expand Up @@ -294,10 +276,6 @@ impl Dispatcher {
protocol.send_response(id, response).unwrap();
});
}
Command::Abort(tx) => {
// Request to abort processing.
tx.send(()).await.unwrap();
}
}
}
});
Expand Down
5 changes: 1 addition & 4 deletions runtime/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,7 @@ impl Protocol {
}
Body::RuntimeAbortRequest {} => {
info!(self.logger, "Received worker abort request");
self.ensure_initialized()?;
self.dispatcher.abort_and_wait()?;
info!(self.logger, "Handled worker abort request");
Ok(Some(Body::RuntimeAbortResponse {}))
Err(ProtocolError::MethodNotSupported.into())
}

// Attestation-related requests.
Expand Down

0 comments on commit e022fa9

Please sign in to comment.