Skip to content

Commit

Permalink
daemon: decouple transaction validation from the preparation phase
Browse files Browse the repository at this point in the history
Refactor the code such that the configuration being committed is
fully validated before starting the two-phase transaction protocol.

This eliminates the unnecessary overhead associated with creating
and aborting a transaction when the configuration being committed
is invalid.

Signed-off-by: Renato Westphal <[email protected]>
  • Loading branch information
rwestphal committed May 9, 2023
1 parent 504e1ed commit f472e95
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 5 deletions.
40 changes: 40 additions & 0 deletions holo-daemon/src/northbound/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ impl Northbound {
) -> Result<u32> {
let candidate = Arc::new(candidate);

// Validate the candidate configuration.
self.validate_notify(&candidate)
.await
.map_err(Error::TransactionValidation)?;

// Compute diff between the running config and the candidate config.
let diff = self
.running_config
Expand Down Expand Up @@ -409,11 +414,46 @@ impl Northbound {
&changes,
)
.await;

Err(Error::TransactionPreparation(error))
}
}
}

// Request all data providers to validate the candidate configuration.
async fn validate_notify(
&mut self,
candidate: &Arc<DataTree>,
) -> std::result::Result<(), northbound::error::Error> {
let mut handles = Vec::new();

// Spawn one task per data provider.
for daemon_tx in self.providers.iter() {
// Prepare request.
let (responder_tx, responder_rx) = oneshot::channel();
let request = papi::daemon::Request::Validate(
papi::daemon::ValidateRequest {
config: candidate.clone(),
responder: Some(responder_tx),
},
);

// Spawn task to send the request and receive the response.
let daemon_tx = daemon_tx.clone();
let handle = tokio::spawn(async move {
daemon_tx.send(request).await.unwrap();
responder_rx.await.unwrap()
});
handles.push(handle);
}
// Wait for all tasks to complete.
for handle in handles {
handle.await.unwrap()?;
}

Ok(())
}

// Notifies all data providers of the configuration changes associated to an
// on-going transaction.
async fn commit_phase_notify(
Expand Down
13 changes: 13 additions & 0 deletions holo-northbound/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod daemon {
pub enum Request {
// Request to get all loaded YANG callbacks.
GetCallbacks(GetCallbacksRequest),
// Request to validate a candidate configuration.
Validate(ValidateRequest),
// Request to change the running configuration.
Commit(CommitRequest),
// Request to get state data.
Expand All @@ -42,6 +44,17 @@ pub mod daemon {
pub callbacks: HashSet<CallbackKey>,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct ValidateRequest {
#[serde(with = "holo_yang::serde::data_tree::arc")]
pub config: Arc<DataTree>,
#[serde(skip)]
pub responder: Option<Responder<Result<ValidateResponse, Error>>>,
}

#[derive(Debug)]
pub struct ValidateResponse {}

#[derive(Debug, Deserialize, Serialize)]
pub struct CommitRequest {
pub phase: CommitPhase,
Expand Down
39 changes: 34 additions & 5 deletions holo-northbound/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub trait Provider: ProviderBase {
None
}

fn relay_validation(&self) -> Vec<NbDaemonSender> {
vec![]
}

fn relay_changes(
&self,
_changes: ConfigChanges,
Expand Down Expand Up @@ -615,6 +619,36 @@ pub fn changes_from_diff(diff: &DataDiff) -> ConfigChanges {
changes
}

pub(crate) async fn process_validate<P>(
provider: &mut P,
config: Arc<DataTree>,
) -> Result<api::daemon::ValidateResponse, Error>
where
P: Provider,
{
// Validate local subtree.
validate_configuration::<P>(provider, &config).await?;

// Validate nested subtrees.
for nb_tx in provider.relay_validation() {
// Send request to child task.
let (responder_tx, responder_rx) = oneshot::channel();
let relayed_req = api::daemon::ValidateRequest {
config: config.clone(),
responder: Some(responder_tx),
};
nb_tx
.send(api::daemon::Request::Validate(relayed_req))
.await
.unwrap();

// Receive response.
let _ = responder_rx.await.unwrap()?;
}

Ok(api::daemon::ValidateResponse {})
}

pub(crate) async fn process_commit<P>(
provider: &mut P,
phase: CommitPhase,
Expand All @@ -626,11 +660,6 @@ pub(crate) async fn process_commit<P>(
where
P: Provider,
{
// Perform code-level validation before the preparation phase.
if phase == CommitPhase::Prepare {
validate_configuration::<P>(provider, &new_config).await?;
}

// Move to a separate vector the changes that need to be relayed.
let callbacks = P::callbacks().unwrap();
let relayed_changes = changes
Expand Down
7 changes: 7 additions & 0 deletions holo-northbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ pub async fn process_northbound_msg<Provider>(
responder.send(response).unwrap();
}
}
api::daemon::Request::Validate(request) => {
let response =
configuration::process_validate(provider, request.config).await;
if let Some(responder) = request.responder {
responder.send(response).unwrap();
}
}
api::daemon::Request::Commit(request) => {
let response = configuration::process_commit(
provider,
Expand Down
4 changes: 4 additions & 0 deletions holo-routing/src/northbound/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ impl Provider for Master {
.collect::<Vec<_>>()
}

fn relay_validation(&self) -> Vec<NbDaemonSender> {
self.instances.values().cloned().collect()
}

async fn process_event(&mut self, event: Event) {
match event {
Event::InstanceStart { protocol, name } => {
Expand Down

0 comments on commit f472e95

Please sign in to comment.