Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): introduce status_kind in the Context #133

Merged
merged 7 commits into from
Jul 17, 2024
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate
### Added
- core: `Context::status_kind` API, now actors can read `ActorStatusKind` from the context ([#133]).
- core: `is_*` methods on `ActorStatusKind` for each variant ([#133]).
- Specify MSRV as 1.76.
- logger: log truncation up to the `max_line_size` configuration parameter ([#128]).
- core: directly accept never returning functions in `ActorGroup::exec()` ([#127]).
Expand Down
110 changes: 17 additions & 93 deletions elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fmt, mem, sync::Arc};
use std::{
mem,
sync::{atomic, Arc},
};

use futures_intrusive::sync::ManualResetEvent;
use metrics::{decrement_gauge, increment_counter, increment_gauge};
Expand All @@ -7,6 +10,7 @@ use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};

use crate::{
actor_status::{ActorStatus, ActorStatusKind, AtomicActorStatusKind},
envelope::Envelope,
errors::{SendError, TrySendError},
group::TerminationPolicy,
Expand All @@ -29,95 +33,6 @@ pub struct ActorMeta {
pub key: String,
}

// === ActorStatus ===

/// Represents the current status of an actor.
/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorStatus {
kind: ActorStatusKind,
details: Option<String>,
}

impl ActorStatus {
pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);

const fn new(kind: ActorStatusKind) -> Self {
Self {
kind,
details: None,
}
}

/// Creates a new status with the same kind and provided details.
pub fn with_details(&self, details: impl fmt::Display) -> Self {
ActorStatus {
kind: self.kind,
details: Some(details.to_string()),
}
}

/// Returns the corresponding [`ActorStatusKind`] for this status.
pub fn kind(&self) -> ActorStatusKind {
self.kind
}

/// Returns details for this status, if provided.
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}

pub(crate) fn is_failed(&self) -> bool {
self.kind == ActorStatusKind::Failed
}

fn is_finished(&self) -> bool {
use ActorStatusKind::*;
matches!(self.kind, Failed | Terminated)
}
}

impl fmt::Display for ActorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.details {
Some(details) => write!(f, "{:?}: {}", self.kind, details),
None => write!(f, "{:?}", self.kind),
}
}
}

// === ActorStatusKind ===

/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ActorStatusKind {
Normal,
Initializing,
Terminating,
Terminated,
Alarming,
Failed,
}

impl ActorStatusKind {
fn as_str(&self) -> &'static str {
match self {
ActorStatusKind::Normal => "Normal",
ActorStatusKind::Initializing => "Initializing",
ActorStatusKind::Terminating => "Terminating",
ActorStatusKind::Terminated => "Terminated",
ActorStatusKind::Alarming => "Alarming",
ActorStatusKind::Failed => "Failed",
}
}
}

// === ActorStartInfo ===

/// A struct holding information related to an actor start.
Expand Down Expand Up @@ -182,6 +97,7 @@ pub(crate) struct Actor {
termination_policy: TerminationPolicy,
mailbox: Mailbox,
request_table: RequestTable,
status_kind: AtomicActorStatusKind,
control: RwLock<Control>,
finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
status_subscription: Arc<SubscriptionManager>,
Expand All @@ -206,6 +122,7 @@ impl Actor {
status_subscription: Arc<SubscriptionManager>,
) -> Self {
Actor {
status_kind: AtomicActorStatusKind::from(ActorStatusKind::Initializing),
meta,
termination_policy,
mailbox: Mailbox::new(mailbox_config),
Expand Down Expand Up @@ -306,8 +223,15 @@ impl Actor {
self.control.write().restart_policy = policy;
}

pub(crate) fn status_kind(&self) -> ActorStatusKind {
nerodono marked this conversation as resolved.
Show resolved Hide resolved
self.status_kind.load(atomic::Ordering::Acquire)
}

// Note that this method should be called inside a right scope.
pub(crate) fn set_status(&self, status: ActorStatus) {
self.status_kind
.store(status.kind(), atomic::Ordering::Release);

let mut control = self.control.write();
let prev_status = mem::replace(&mut control.status, status.clone());

Expand All @@ -318,7 +242,7 @@ impl Actor {
self.send_status_to_subscribers(&control);
drop(control);

if status.is_finished() {
if status.kind().is_finished() {
self.close();
// Drop all messages to release requests immediately.
self.mailbox.drop_all();
Expand All @@ -328,10 +252,10 @@ impl Actor {
log_status(&status);

if status.kind != prev_status.kind {
if !prev_status.is_finished() {
if !prev_status.kind().is_finished() {
decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
}
if !status.is_finished() {
if !status.kind().is_finished() {
increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
}

Expand Down
149 changes: 149 additions & 0 deletions elfo-core/src/actor_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::sync::atomic::{self, AtomicU8};
use std::{fmt, mem};

use serde::{Deserialize, Serialize};

// === ActorStatus ===

/// Represents the current status of an actor.
/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorStatus {
pub(crate) kind: ActorStatusKind,
pub(crate) details: Option<String>,
}

impl ActorStatus {
pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);

const fn new(kind: ActorStatusKind) -> Self {
Self {
kind,
details: None,
}
}

/// Creates a new status with the same kind and provided details.
pub fn with_details(&self, details: impl fmt::Display) -> Self {
ActorStatus {
kind: self.kind,
details: Some(details.to_string()),
}
}

/// Returns the corresponding [`ActorStatusKind`] for this status.
pub fn kind(&self) -> ActorStatusKind {
self.kind
}

/// Returns details for this status, if provided.
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}
}

impl fmt::Display for ActorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.details {
Some(details) => write!(f, "{:?}: {}", self.kind, details),
None => write!(f, "{:?}", self.kind),
}
}
}

// === ActorStatusKind ===

/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[repr(u8)]
pub enum ActorStatusKind {
Normal,
Initializing,
Terminating,
Terminated,
Alarming,
Failed,
}

impl ActorStatusKind {
#[inline]
pub const fn is_normal(&self) -> bool {
matches!(self, Self::Normal)
}

#[inline]
pub const fn is_initializing(&self) -> bool {
matches!(self, Self::Initializing)
}

#[inline]
pub const fn is_terminating(&self) -> bool {
matches!(self, Self::Terminating)
}

#[inline]
pub const fn is_terminated(&self) -> bool {
matches!(self, Self::Terminated)
}

#[inline]
pub const fn is_alarming(&self) -> bool {
matches!(self, Self::Alarming)
}

#[inline]
pub const fn is_failed(&self) -> bool {
matches!(self, Self::Failed)
}

#[inline]
pub const fn is_finished(&self) -> bool {
self.is_failed() || self.is_terminated()
}
}

impl ActorStatusKind {
pub(crate) fn as_str(&self) -> &'static str {
match self {
ActorStatusKind::Normal => "Normal",
ActorStatusKind::Initializing => "Initializing",
ActorStatusKind::Terminating => "Terminating",
ActorStatusKind::Terminated => "Terminated",
ActorStatusKind::Alarming => "Alarming",
ActorStatusKind::Failed => "Failed",
}
}
}

// === AtomicActorStatusKind ===

#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct AtomicActorStatusKind(AtomicU8);

impl From<ActorStatusKind> for AtomicActorStatusKind {
fn from(value: ActorStatusKind) -> Self {
Self(AtomicU8::new(value as _))
}
}

impl AtomicActorStatusKind {
pub(crate) fn store(&self, kind: ActorStatusKind, ordering: atomic::Ordering) {
self.0.store(kind as u8, ordering);
}

pub(crate) fn load(&self, ordering: atomic::Ordering) -> ActorStatusKind {
let result = self.0.load(ordering);

// SAFETY: `ActorStatusKind` has `#[repr(u8)]` annotation. The only
// place where value may be changed is `Self::store`, which consumes `ActorStatusKind`, thus,
// guarantees that possibly invalid value cannot be stored
unsafe { mem::transmute::<u8, ActorStatusKind>(result) }
}
}
29 changes: 28 additions & 1 deletion elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tracing::{info, trace};
use elfo_utils::unlikely;

use crate::{
actor::{Actor, ActorStartInfo, ActorStatus},
actor::{Actor, ActorStartInfo},
actor_status::ActorStatus,
addr::Addr,
address_book::AddressBook,
config::AnyConfig,
Expand All @@ -26,6 +27,7 @@ use crate::{
routers::Singleton,
scope,
source::{SourceHandle, Sources, UnattachedSource},
ActorStatusKind,
};

use self::stats::Stats;
Expand Down Expand Up @@ -105,6 +107,31 @@ impl<C, K> Context<C, K> {
ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_status(status);
}

/// Gets the actor's status kind.
nerodono marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Example
/// ```
/// # use elfo_core as elfo;
/// # fn exec(ctx: elfo::Context) {
/// // if actor is terminating.
/// assert!(ctx.status_kind().is_terminating());
/// // if actor is alarming.
/// assert!(ctx.status_kind().is_alarming());
/// // and so on...
/// # }
/// ```
/// # Panics
///
/// Panics when called on pruned context.
pub fn status_kind(&self) -> ActorStatusKind {
self.actor
.as_ref()
.expect("called `status_kind()` on pruned context")
.as_actor()
.expect("invariant")
.status_kind()
}

/// Overrides the group's default mailbox capacity, which set in the config.
///
/// Note: after restart the actor will be created from scratch, so this
Expand Down
3 changes: 2 additions & 1 deletion elfo-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::{
};

use crate::{
actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus},
actor::{Actor, ActorMeta, ActorStartInfo},
actor_status::ActorStatus,
addr::{Addr, GroupNo},
config::SystemConfig,
context::Context,
Expand Down
Loading