Skip to content

Commit

Permalink
Fix service Info to include endpoints info
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 22, 2023
1 parent 740b415 commit 02fd086
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 9 deletions.
12 changes: 12 additions & 0 deletions async-nats/src/service/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,15 @@ pub struct Stats {
/// Queue group to which this endpoint is assigned to.
pub queue_group: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
pub struct Info {
/// Name of the endpoint.
pub name: String,
/// Endpoint subject.
pub subject: String,
/// Queue group to which this endpoint is assigned.
pub queue_group: String,
/// Endpoint-specific metadata.
pub metadata: HashMap<String, String>,
}
28 changes: 19 additions & 9 deletions async-nats/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ pub struct Info {
pub description: Option<String>,
/// Service version.
pub version: String,
/// All service endpoints.
pub subjects: Vec<String>,
/// Additional metadata
pub metadata: HashMap<String, String>,
/// Info about all service endpoints.
pub endpoints: Vec<endpoint::Info>,
}

/// Configuration of the [Service].
Expand Down Expand Up @@ -322,6 +322,10 @@ impl Service {
"service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
)));
}
let endpoints_state = Arc::new(Mutex::new(Endpoints {
endpoints: HashMap::new(),
}));

let queue_group = config
.queue_group
.unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
Expand All @@ -334,15 +338,12 @@ impl Service {
id: id.clone(),
description: config.description.clone(),
version: config.version.clone(),
subjects: Vec::default(),
metadata: config.metadata.clone().unwrap_or_default(),
endpoints: Vec::new(),
};

let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);

let endpoints = HashMap::new();
let endpoints_state = Arc::new(Mutex::new(Endpoints { endpoints }));

// create subscriptions for all verbs.
let mut pings =
verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
Expand All @@ -355,7 +356,6 @@ impl Service {
let handle = tokio::task::spawn({
let mut stats_callback = config.stats_handler;
let info = info.clone();
let subjects = subjects.clone();
let endpoints_state = endpoints_state.clone();
let client = client.clone();
async move {
Expand All @@ -371,10 +371,20 @@ impl Service {
client.publish(ping.reply.unwrap(), pong.into()).await?;
},
Some(info_request) = infos.next() => {
let subjects = subjects.clone();
let info = info.clone();

let endpoints: Vec<endpoint::Info> = {
endpoints_state.lock().unwrap().endpoints.values().map(|value| {
endpoint::Info {
name: value.name.to_owned(),
subject: value.subject.to_owned(),
queue_group: value.queue_group.to_owned(),
metadata: value.metadata.to_owned()
}
}).collect()
};
let info = Info {
subjects: subjects.lock().unwrap().to_vec(),
endpoints,
..info
};
let info_json = serde_json::to_vec(&info).map(Bytes::from)?;
Expand Down
39 changes: 39 additions & 0 deletions async-nats/tests/service_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,45 @@ mod service {
assert_eq!(responses.take(2).count().await, 2);
}

#[tokio::test]
async fn info() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let service = client
.service_builder()
.start("service", "1.0.0")
.await
.unwrap();

let endpoint_info = service::endpoint::Info {
name: "endpoint_1".to_string(),
subject: "subject".to_string(),
queue_group: "queue".to_string(),
metadata: HashMap::from([("key".to_string(), "value".to_string())]),
};

service
.endpoint_builder()
.name(&endpoint_info.name)
.metadata(endpoint_info.metadata.clone())
.queue_group(&endpoint_info.queue_group)
.add(&endpoint_info.subject)
.await
.unwrap();

let info: service::Info = serde_json::from_slice(
&client
.request("$SRV.INFO".into(), "".into())
.await
.unwrap()
.payload,
)
.unwrap();

assert_eq!(&endpoint_info, info.endpoints.first().unwrap());
}

#[tokio::test]
#[cfg(not(target_os = "windows"))]
async fn cross_clients_tests() {
Expand Down

0 comments on commit 02fd086

Please sign in to comment.