Skip to content

Commit

Permalink
Implement Tower EventListener and EventListenerLayer (#3887)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Sep 28, 2023
1 parent 46e65f8 commit 9cc02a9
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 8 deletions.
20 changes: 12 additions & 8 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl EventBroker {
.inner
.subscriptions
.lock()
.expect("The lock should never be poisoned.");
.expect("the lock should not be poisoned");

if !subscriptions.contains::<EventSubscriptions<E>>() {
subscriptions.insert::<EventSubscriptions<E>>(HashMap::new());
Expand Down Expand Up @@ -99,7 +99,7 @@ impl EventBroker {
.inner
.subscriptions
.lock()
.expect("The lock should never be poisoned.");
.expect("the lock should not be poisoned");

if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
for subscription in typed_subscriptions.values() {
Expand Down Expand Up @@ -141,7 +141,7 @@ where E: Event
let mut subscriptions = broker
.subscriptions
.lock()
.expect("The lock should never be poisoned.");
.expect("the lock should not be poisoned");
if let Some(typed_subscriptions) = subscriptions.get_mut::<EventSubscriptions<E>>() {
typed_subscriptions.remove(&self.subscription_id);
}
Expand Down Expand Up @@ -178,20 +178,24 @@ mod tests {

#[tokio::test]
async fn test_event_broker() {
let broker = EventBroker::default();
let event_broker = EventBroker::default();
let counter = Arc::new(AtomicUsize::new(0));
let subscriber = MySubscriber {
counter: counter.clone(),
};
let subscription = broker.subscribe(subscriber);
let subscription_handle = event_broker.subscribe(subscriber);

let event = MyEvent { value: 42 };
broker.publish(event);
event_broker.publish(event);

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 42);

subscription.cancel();
subscription_handle.cancel();

let event = MyEvent { value: 1337 };
broker.publish(event);
event_broker.publish(event);

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 42);
}
Expand Down
173 changes: 173 additions & 0 deletions quickwit/quickwit-common/src/tower/event_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{ready, Future};
use pin_project::pin_project;
use tower::{Layer, Service};

use crate::pubsub::{Event, EventBroker};

pub struct EventListener<S> {
inner: S,
event_broker: EventBroker,
}

impl<S> EventListener<S> {
pub fn new(inner: S, event_broker: EventBroker) -> Self {
Self {
inner,
event_broker,
}
}
}

impl<S, R> Service<R> for EventListener<S>
where
S: Service<R>,
R: Event,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future, R>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: R) -> Self::Future {
let inner = self.inner.call(request.clone());
ResponseFuture {
inner,
event_broker: self.event_broker.clone(),
request: Some(request),
}
}
}

#[derive(Debug, Clone)]
pub struct EventListenerLayer {
event_broker: EventBroker,
}

impl EventListenerLayer {
pub fn new(event_broker: EventBroker) -> Self {
Self { event_broker }
}
}

impl<S> Layer<S> for EventListenerLayer {
type Service = EventListener<S>;

fn layer(&self, service: S) -> Self::Service {
EventListener::new(service, self.event_broker.clone())
}
}

/// Response future for [`EventListener`].
#[pin_project]
pub struct ResponseFuture<F, R> {
#[pin]
inner: F,
event_broker: EventBroker,
request: Option<R>,
}

impl<R, F, T, E> Future for ResponseFuture<F, R>
where
R: Event,
F: Future<Output = Result<T, E>>,
{
type Output = Result<T, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let response = ready!(this.inner.poll(cx));

if response.is_ok() {
this.event_broker
.publish(this.request.take().expect("request should be set"));
}
Poll::Ready(Ok(response?))
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;

use super::*;
use crate::pubsub::EventSubscriber;

#[derive(Debug, Clone, Copy)]
struct MyEvent {
return_ok: bool,
}

impl Event for MyEvent {}

#[derive(Debug, Clone)]
struct MySubscriber {
counter: Arc<AtomicUsize>,
}

#[async_trait]
impl EventSubscriber<MyEvent> for MySubscriber {
async fn handle_event(&mut self, _event: MyEvent) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}

#[tokio::test]
async fn test_event_listener() {
let event_broker = EventBroker::default();
let counter = Arc::new(AtomicUsize::new(0));
let subscriber = MySubscriber {
counter: counter.clone(),
};
let _subscription_handle = event_broker.subscribe::<MyEvent>(subscriber);

let layer = EventListenerLayer::new(event_broker);

let mut service = layer.layer(tower::service_fn(|request: MyEvent| async move {
if request.return_ok {
Ok(())
} else {
Err(())
}
}));
let request = MyEvent { return_ok: false };
service.call(request).await.unwrap_err();

tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 0);

let request = MyEvent { return_ok: true };
service.call(request).await.unwrap();

tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod box_service;
mod buffer;
mod change;
mod estimate_rate;
mod event_listener;
mod metrics;
mod pool;
mod rate;
Expand All @@ -38,6 +39,7 @@ pub use box_service::BoxService;
pub use buffer::{Buffer, BufferError, BufferLayer};
pub use change::Change;
pub use estimate_rate::{EstimateRate, EstimateRateLayer};
pub use event_listener::{EventListener, EventListenerLayer};
use futures::Future;
pub use metrics::{PrometheusMetrics, PrometheusMetricsLayer};
pub use pool::Pool;
Expand Down

0 comments on commit 9cc02a9

Please sign in to comment.