Skip to content

Commit

Permalink
experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 24, 2025
1 parent ae551e6 commit eb53f62
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 170 deletions.
20 changes: 11 additions & 9 deletions graph/proto/tracing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ syntax = "proto3";

package graph.tracing.v1;

service Stream {
rpc QueryTrace(Request) returns (stream Trace);
}
service Stream { rpc QueryTrace(Request) returns (stream Response); }

message Request {
int32 deployment_id = 1;
}
message Request { int32 deployment_id = 1; }

message Response { repeated Trace traces = 1; }

message Trace {
int32 deployment_id = 1;
string query = 2;
uint64 duration_millis = 3;
int32 deployment_id = 1;
string query = 2;
uint64 duration_millis = 3;
uint32 children = 4;
optional uint64 conn_wait_millis = 5;
optional uint64 permit_wait_millis = 6;
optional uint64 entity_count = 7;
}
139 changes: 85 additions & 54 deletions graph/src/components/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures03::TryFutureExt;
use slog::{o, Logger};
use tokio::sync::{mpsc, watch::Receiver, RwLock};

use crate::prelude::LoggerFactory;

use super::store::DeploymentId;

const DEFAULT_BUFFER_SIZE: usize = 100;
#[cfg(not(test))]
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(test)]
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100);

#[derive(Debug, Clone)]
pub struct Subscriptions<T> {
Expand All @@ -33,59 +33,48 @@ pub struct TracingControl<T> {
default_buffer_size: usize,
}

// impl<T: Send + Clone + 'static> Default for TracingControl<T> {
// fn default() -> Self {
// let subscriptions = Subscriptions::default();
// let subs = subscriptions.clone();
// let watcher = std::thread::spawn(move || {
// let runtime = tokio::runtime::Builder::new_multi_thread()
// .enable_all()
// .build()
// .unwrap();
// runtime.block_on()
// })
// .join()
// .unwrap()
// .unwrap();

// Self {
// subscriptions,
// default_buffer_size: DEFAULT_BUFFER_SIZE,
// watcher,
// }
// }
// }

impl<T: Send + Clone + 'static> TracingControl<T> {
pub async fn start() -> Self {
/// Starts a new tracing control instance.If an async runtime is not available, a new one will be created.
pub fn start() -> Self {
Self::new(DEFAULT_BUFFER_SIZE)
}

pub fn new(buffer_size: usize) -> Self {
let subscriptions = Subscriptions::default();
let subs = subscriptions.clone();
let watcher = indexer_watcher::new_watcher(
#[cfg(test)]
Duration::from_millis(100),
#[cfg(not(test))]
Duration::from_secs(30),
move || {
let subs = subs.clone();

async move { Ok(subs.inner.read().await.clone()) }
},
)
.await

let watcher = std::thread::spawn(move || {
let handle = tokio::runtime::Handle::try_current().unwrap_or(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.handle()
.clone(),
);

handle.block_on(async move {
indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || {
let subs = subs.clone();

async move { Ok(subs.inner.read().await.clone()) }
})
.await
})
})
.join()
.unwrap()
.unwrap();

Self {
watcher,
subscriptions,
default_buffer_size: DEFAULT_BUFFER_SIZE,
default_buffer_size: buffer_size,
}
}
// pub fn new(default_buffer_size: Option<usize>) -> Self {
// Self {
// default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
// ..Default::default()
// }
// }

/// Returns a producer for a given deployment ID. If the producer is closed, it will return None.
/// The producer could still be closed in the meantime.
pub fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
self.watcher
.borrow()
Expand All @@ -94,6 +83,8 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
.filter(|sender| !sender.is_closed())
}

/// Creates a new subscription for a given deployment ID with a given buffer size. If a subscription already
/// exists, it will be replaced.
pub async fn subscribe_with_chan_size(
&self,
key: DeploymentId,
Expand All @@ -118,26 +109,53 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
mod test {

use anyhow::anyhow;
use tokio::time::{self, Instant};
use tokio_retry::Retry;

use super::*;
use std::{future::IntoFuture, sync::Arc};
use std::sync::Arc;

#[tokio::test]
async fn test_watcher() {
let x = time::Instant::now();
let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || {
let x = x.clone();

async move {
let now = Instant::now();
Ok(now.duration_since(x))
}
})
.await
.unwrap();

Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || {
let x = x.clone();
async move {
let count = x.borrow().clone();
println!("{}", count.as_millis());
Err::<Duration, anyhow::Error>(anyhow!("millis: {}", count.as_millis()))
}
})
.await
.unwrap();
}

#[tokio::test]
async fn test_tracing_control() {
let control: TracingControl<()> = TracingControl::start().await;
let control: TracingControl<()> = TracingControl::start();
let control = Arc::new(control);

// produce before subscription
let tx = control.producer(DeploymentId(123));
assert!(tx.is_none());

// drop the subscription
let rx = control.subscribe(DeploymentId(123));
let rx = control.subscribe(DeploymentId(123)).await;

let c = control.clone();
// check subscription is none because channel is closed
let tx = Retry::spawn(vec![Duration::from_secs(5); 10].into_iter(), move || {
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
let control = c.clone();
async move {
match control.producer(DeploymentId(123)) {
Expand All @@ -158,9 +176,22 @@ mod test {
assert!(tx.is_none());

// re-create subscription
let _rx = control.subscribe(DeploymentId(123));
let _rx = control.subscribe(DeploymentId(123)).await;

// check old subscription was replaced
let tx = control.producer(DeploymentId(123));
assert!(!tx.unwrap().is_closed())
let c = control.clone();
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
let tx = c.producer(DeploymentId(123));
async move {
match tx {
Some(sender) if !sender.is_closed() => Ok(sender),
Some(_) => Err(anyhow!("Sender is closed")),
None => Err(anyhow!("Sender not created yet")),
}
}
})
.await
.unwrap();
assert!(!tx.is_closed())
}
}
20 changes: 17 additions & 3 deletions graph/src/grpc/pb/graph.tracing.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,27 @@ pub struct Request {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Response {
#[prost(message, repeated, tag = "1")]
pub traces: ::prost::alloc::vec::Vec<Trace>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Trace {
#[prost(int32, tag = "1")]
pub deployment_id: i32,
#[prost(string, tag = "2")]
pub query: ::prost::alloc::string::String,
#[prost(uint64, tag = "3")]
pub duration_millis: u64,
#[prost(uint32, tag = "4")]
pub children: u32,
#[prost(uint64, optional, tag = "5")]
pub conn_wait_millis: ::core::option::Option<u64>,
#[prost(uint64, optional, tag = "6")]
pub permit_wait_millis: ::core::option::Option<u64>,
#[prost(uint64, optional, tag = "7")]
pub entity_count: ::core::option::Option<u64>,
}
/// Generated client implementations.
pub mod stream_client {
Expand Down Expand Up @@ -104,7 +118,7 @@ pub mod stream_client {
&mut self,
request: impl tonic::IntoRequest<super::Request>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::Trace>>,
tonic::Response<tonic::codec::Streaming<super::Response>>,
tonic::Status,
> {
self.inner
Expand Down Expand Up @@ -136,7 +150,7 @@ pub mod stream_server {
pub trait Stream: Send + Sync + 'static {
/// Server streaming response type for the QueryTrace method.
type QueryTraceStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::Trace, tonic::Status>,
Item = std::result::Result<super::Response, tonic::Status>,
>
+ Send
+ 'static;
Expand Down Expand Up @@ -229,7 +243,7 @@ pub mod stream_server {
struct QueryTraceSvc<T: Stream>(pub Arc<T>);
impl<T: Stream> tonic::server::ServerStreamingService<super::Request>
for QueryTraceSvc<T> {
type Response = super::Trace;
type Response = super::Response;
type ResponseStream = T::QueryTraceStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
Expand Down
16 changes: 1 addition & 15 deletions node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use clap::Parser as _;
use git_testament::{git_testament, render_testament};
use graph::components::store::DeploymentId;
use graph::futures01::Future as _;
use graph::futures03::compat::Future01CompatExt;
use graph::futures03::future::TryFutureExt;
Expand Down Expand Up @@ -31,8 +30,8 @@ use graph_server_json_rpc::JsonRpcServer;
use graph_server_metrics::PrometheusMetricsServer;
use graph_server_websocket::SubscriptionServer as GraphQLSubscriptionServer;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::Store;
use graph_store_postgres::{register_jobs as register_store_jobs, NotificationSender};
use graph_store_postgres::{Store, TRACING_CONTROL};
use graphman_server::GraphmanServer;
use graphman_server::GraphmanServerConfig;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -564,19 +563,6 @@ async fn main() {
}
});

let mut rx = TRACING_CONTROL.subscribe(DeploymentId(1)).await;
loop {
let trace = rx.recv().await;
match trace {
Some(trace) => {
info!(&logger, "#### trace: {:?}", trace);
}
None => {
break;
}
}
}

graph::futures03::future::pending::<()>().await;
}

Expand Down
Loading

0 comments on commit eb53f62

Please sign in to comment.