Skip to content

Commit

Permalink
oximeter integration (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcgoodfellow authored Mar 19, 2024
1 parent 1d8b818 commit de065a8
Show file tree
Hide file tree
Showing 44 changed files with 3,486 additions and 732 deletions.
1,540 changes: 968 additions & 572 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ ciborium = "0.2"
http = "0.2"
humantime = "2.1"
rand = "0.8"
backoff = "0.4"
mg-common = { path = "mg-common" }
chrono = { version = "0.4", features = ["serde"] }
oximeter = { git = "https://github.com/oxidecomputer/omicron", branch = "main"}
oximeter-producer = { git = "https://github.com/oxidecomputer/omicron", branch= "main"}
omicron-common = { git = "https://github.com/oxidecomputer/omicron", branch= "main"}
internal-dns = { git = "https://github.com/oxidecomputer/omicron", branch = "main"}
uuid = { version = "1.7", features = ["serde", "v4"] }

[workspace.dependencies.opte-ioctl]
git = "https://github.com/oxidecomputer/opte"
Expand Down
33 changes: 30 additions & 3 deletions bfd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use slog::{warn, Logger};
use sm::StateMachine;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

pub mod bidi;
Expand Down Expand Up @@ -76,10 +78,29 @@ impl Daemon {
}
}

#[derive(Default)]
pub struct SessionCounters {
pub control_packets_sent: AtomicU64,
pub control_packet_send_failures: AtomicU64,
pub control_packets_received: AtomicU64,
pub admin_down_status_received: AtomicU64,
pub down_status_received: AtomicU64,
pub init_status_received: AtomicU64,
pub up_status_received: AtomicU64,
pub unknown_status_received: AtomicU64,
pub transition_to_init: AtomicU64,
pub transition_to_down: AtomicU64,
pub transition_to_up: AtomicU64,
pub timeout_expired: AtomicU64,
pub message_receive_error: AtomicU64,
pub unexpected_message: AtomicU64,
}

/// A session holds a BFD state machine for a particular peer.
pub struct Session {
pub sm: StateMachine,
pub mode: SessionMode,
pub counters: Arc<SessionCounters>,
}

impl Session {
Expand All @@ -94,10 +115,16 @@ impl Session {
db: rdb::Db,
log: Logger,
) -> Self {
let mut sm =
StateMachine::new(addr, required_rx, detection_multiplier, log);
let counters = Arc::new(SessionCounters::default());
let mut sm = StateMachine::new(
addr,
required_rx,
detection_multiplier,
counters.clone(),
log,
);
sm.run(ep, db);
Session { sm, mode }
Session { sm, mode, counters }
}
}

Expand Down
128 changes: 105 additions & 23 deletions bfd/src/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::err;
use crate::packet::Control;
use crate::packet::{Control, State as PacketState};
use crate::{
bidi, inf, packet, trc, util::update_peer_info, wrn, BfdPeerState, PeerInfo,
};
use crate::{err, SessionCounters};
use anyhow::{anyhow, Result};
use slog::{warn, Logger};
use std::net::IpAddr;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct StateMachine {
required_rx: Duration,
detection_multiplier: u8,
kill_switch: Arc<AtomicBool>,
counters: Arc<SessionCounters>,
log: Logger,
}

Expand All @@ -64,6 +65,7 @@ impl StateMachine {
peer: IpAddr,
required_rx: Duration,
detection_multiplier: u8,
counters: Arc<SessionCounters>,
log: Logger,
) -> Self {
let state = Down::new(peer, log.clone());
Expand All @@ -73,6 +75,7 @@ impl StateMachine {
required_rx,
detection_multiplier,
kill_switch: Arc::new(AtomicBool::new(false)),
counters,
log,
}
}
Expand Down Expand Up @@ -122,6 +125,7 @@ impl StateMachine {
let peer = self.peer;
let kill_switch = self.kill_switch.clone();
let log = self.log.clone();
let counters = self.counters.clone();
spawn(move || loop {
let prev = state.read().unwrap().state();
let (st, ep) = match state.read().unwrap().run(
Expand All @@ -130,6 +134,7 @@ impl StateMachine {
remote.clone(),
kill_switch.clone(),
db.clone(),
counters.clone(),
) {
Ok(result) => result,
Err(_) => break,
Expand All @@ -143,6 +148,23 @@ impl StateMachine {
}

if prev != new {
match new {
BfdPeerState::AdminDown | BfdPeerState::Down => {
counters
.transition_to_down
.fetch_add(1, Ordering::Relaxed);
}
BfdPeerState::Init => {
counters
.transition_to_init
.fetch_add(1, Ordering::Relaxed);
}
BfdPeerState::Up => {
counters
.transition_to_up
.fetch_add(1, Ordering::Relaxed);
}
}
inf!(log, prev, peer; "transition -> {:?}", new);
}
});
Expand All @@ -162,6 +184,7 @@ impl StateMachine {
let peer = self.peer;
let stop = self.kill_switch.clone();
let log = self.log.clone();
let counters = self.counters.clone();
// State does not change for the lifetime of the trait so it's safe to
// just copy it out of self for sending into the spawned thread. The
// reason this is a dynamic method at all is to get runtime polymorphic
Expand Down Expand Up @@ -203,6 +226,13 @@ impl StateMachine {

if let Err(e) = sender.send((peer, pkt)) {
wrn!(log, st, peer; "send: {}", e);
counters
.control_packet_send_failures
.fetch_add(1, Ordering::Relaxed);
} else {
counters
.control_packets_sent
.fetch_add(1, Ordering::Relaxed);
}
});
}
Expand Down Expand Up @@ -255,6 +285,7 @@ pub(crate) trait State: Sync + Send {
remote: Arc<Mutex<PeerInfo>>,
kill_switch: Arc<AtomicBool>,
db: rdb::Db,
counters: Arc<SessionCounters>,
) -> Result<(Box<dyn State>, BfdEndpoint)>;

/// Return the `BfdPeerState` associated with the implementor of this trait.
Expand Down Expand Up @@ -297,13 +328,42 @@ pub(crate) trait State: Sync + Send {
local: PeerInfo,
remote: &Arc<Mutex<PeerInfo>>,
log: Logger,
counters: Arc<SessionCounters>,
) -> Result<RecvResult> {
match endpoint.rx.recv_timeout(
local.required_min_rx * local.detection_multiplier.into(),
) {
Ok((addr, msg)) => {
trc!(log, self.state(), self.peer(); "recv: {:?}", msg);

match msg.state() {
PacketState::Peer(BfdPeerState::AdminDown) => {
counters
.admin_down_status_received
.fetch_add(1, Ordering::Relaxed);
}
PacketState::Peer(BfdPeerState::Down) => {
counters
.down_status_received
.fetch_add(1, Ordering::Relaxed);
}
PacketState::Peer(BfdPeerState::Init) => {
counters
.init_status_received
.fetch_add(1, Ordering::Relaxed);
}
PacketState::Peer(BfdPeerState::Up) => {
counters
.up_status_received
.fetch_add(1, Ordering::Relaxed);
}
PacketState::Unknown(_) => {
counters
.unknown_status_received
.fetch_add(1, Ordering::Relaxed);
}
}

update_peer_info(remote, &msg);

if msg.poll() {
Expand All @@ -320,6 +380,7 @@ pub(crate) trait State: Sync + Send {
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
wrn!(log, self.state(), self.peer(); "timeout expired");
counters.timeout_expired.fetch_add(1, Ordering::Relaxed);
let next = Down::new(self.peer(), log.clone());
Ok(RecvResult::TransitionTo(Box::new(next)))
}
Expand All @@ -331,6 +392,9 @@ pub(crate) trait State: Sync + Send {
"recv: {}, exiting recieve loop",
e
);
counters
.message_receive_error
.fetch_add(1, Ordering::Relaxed);
Err(anyhow::anyhow!("recv channel closed"))
}
}
Expand Down Expand Up @@ -372,6 +436,7 @@ impl State for Down {
remote: Arc<Mutex<PeerInfo>>,
kill_switch: Arc<AtomicBool>,
db: rdb::Db,
counters: Arc<SessionCounters>,
) -> Result<(Box<dyn State>, BfdEndpoint)> {
match self.peer {
IpAddr::V4(addr) => db.disable_nexthop4(addr),
Expand All @@ -384,13 +449,18 @@ impl State for Down {
}
loop {
// Get an incoming message
let (_addr, msg) =
match self.recv(&endpoint, local, &remote, self.log.clone())? {
RecvResult::MessageFrom((addr, control)) => (addr, control),
RecvResult::TransitionTo(state) => {
return Ok((state, endpoint))
}
};
let (_addr, msg) = match self.recv(
&endpoint,
local,
&remote,
self.log.clone(),
counters.clone(),
)? {
RecvResult::MessageFrom((addr, control)) => (addr, control),
RecvResult::TransitionTo(state) => {
return Ok((state, endpoint))
}
};

if kill_switch.load(Ordering::Relaxed) {
return Err(anyhow!("killed"));
Expand Down Expand Up @@ -455,16 +525,22 @@ impl State for Init {
remote: Arc<Mutex<PeerInfo>>,
kill_switch: Arc<AtomicBool>,
_db: rdb::Db,
counters: Arc<SessionCounters>,
) -> Result<(Box<dyn State>, BfdEndpoint)> {
loop {
// Get an incoming message
let (_addr, msg) =
match self.recv(&endpoint, local, &remote, self.log.clone())? {
RecvResult::MessageFrom((addr, control)) => (addr, control),
RecvResult::TransitionTo(state) => {
return Ok((state, endpoint))
}
};
let (_addr, msg) = match self.recv(
&endpoint,
local,
&remote,
self.log.clone(),
counters.clone(),
)? {
RecvResult::MessageFrom((addr, control)) => (addr, control),
RecvResult::TransitionTo(state) => {
return Ok((state, endpoint))
}
};

if kill_switch.load(Ordering::Relaxed) {
return Err(anyhow!("killed"));
Expand Down Expand Up @@ -527,6 +603,7 @@ impl State for Up {
remote: Arc<Mutex<PeerInfo>>,
kill_switch: Arc<AtomicBool>,
db: rdb::Db,
counters: Arc<SessionCounters>,
) -> Result<(Box<dyn State>, BfdEndpoint)> {
match self.peer {
IpAddr::V4(addr) => db.enable_nexthop4(addr),
Expand All @@ -539,13 +616,18 @@ impl State for Up {
}
loop {
// Get an incoming message
let (_addr, msg) =
match self.recv(&endpoint, local, &remote, self.log.clone())? {
RecvResult::MessageFrom((addr, control)) => (addr, control),
RecvResult::TransitionTo(state) => {
return Ok((state, endpoint))
}
};
let (_addr, msg) = match self.recv(
&endpoint,
local,
&remote,
self.log.clone(),
counters.clone(),
)? {
RecvResult::MessageFrom((addr, control)) => (addr, control),
RecvResult::TransitionTo(state) => {
return Ok((state, endpoint))
}
};

if kill_switch.load(Ordering::Relaxed) {
return Err(anyhow!("killed"));
Expand Down
Loading

0 comments on commit de065a8

Please sign in to comment.