Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats: db proposal #42

Merged
merged 38 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f3812d3
add nats util crate
JettTech Dec 11, 2024
f3085c4
clean-up
JettTech Dec 11, 2024
7d72ac5
clean comments
JettTech Dec 11, 2024
54ce72c
edit trait
JettTech Dec 11, 2024
ea1c583
add mongo consts, update err types
JettTech Dec 12, 2024
e07f57d
resolve borrow
JettTech Dec 13, 2024
02bc2c1
add all tests
JettTech Dec 13, 2024
c3f46a8
elevate rust setup dir
JettTech Dec 13, 2024
6597c31
update server tests
JettTech Dec 16, 2024
bf6f165
add back db proposal
JettTech Dec 18, 2024
8042def
clean db
JettTech Dec 18, 2024
b0cd5ae
clearn
JettTech Dec 18, 2024
0f95cf4
tidy
JettTech Dec 18, 2024
71ed439
Merge branch 'nats-baseline' into nats-crates
JettTech Dec 18, 2024
fa5efb6
Merge branch 'nats-baseline' into nats-crates
JettTech Dec 18, 2024
634364c
update aliases
JettTech Dec 18, 2024
40ff0e4
Merge branch 'nats-baseline' into nats-crates
JettTech Dec 19, 2024
ad3da0d
Merge branch 'main' into nats-crates
JettTech Dec 20, 2024
07a710a
Merge branch 'nats-crates' of github.com:Holo-Host/holo-host into nat…
JettTech Jan 7, 2025
f5d631b
Merge branch 'main' into nats-crates
JettTech Jan 7, 2025
ff868de
Merge branch 'main' into nats-crates
JettTech Jan 7, 2025
796b97c
Merge branch 'nats-crates' of github.com:Holo-Host/holo-host into nat…
JettTech Jan 7, 2025
43e1502
update gitignore
JettTech Jan 7, 2025
e2e2a4e
clean up
JettTech Jan 7, 2025
819df09
update file descriptions, clean
JettTech Jan 7, 2025
af56c80
edit workload and host schmeas
JettTech Jan 7, 2025
1d38a02
adjusments
JettTech Jan 7, 2025
41253be
add serde skip and optional mongo ids on schema structs, update workl…
JettTech Jan 8, 2025
1485022
clean
JettTech Jan 8, 2025
97f1fb4
Merge branch 'main' into nats-crates
JettTech Jan 10, 2025
86fa6d0
Merge branch 'main' into nats-crates
JettTech Jan 13, 2025
a1f7360
update js client names and types, make consumer accept endpoint gener…
JettTech Jan 15, 2025
4aab1e8
update service lib to explicitly state return type
JettTech Jan 15, 2025
5a6461a
clean up resp msg handling
JettTech Jan 15, 2025
6160244
clean up helper
JettTech Jan 15, 2025
6cb6c01
add id to workload_status
JettTech Jan 15, 2025
eea5cb6
Merge branch 'main' into nats-crates
JettTech Jan 15, 2025
670136d
fix types
JettTech Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ target/
# nats
rust/*/tmp
rust/*/jwt
rust/*/*/test_leaf_server/*
rust/*/*/test_leaf_server.conf
rust/*/*/leaf_server.conf
rust/*/*/resolver.conf
Expand Down
32 changes: 31 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/clients/host_agent/src/gen_leaf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use util_libs::nats_server::{self, JetStreamConfig, LeafNodeRemote, LeafServer,
const LEAF_SERVE_NAME: &str = "test_leaf_server";
const LEAF_SERVER_CONFIG_PATH: &str = "test_leaf_server.conf";

pub async fn run(_user_creds_path: &str) {
pub async fn run(user_creds_path: &str) {
let leaf_server_remote_conn_url = nats_server::get_hub_server_url();
let leaf_client_conn_domain = "127.0.0.1";
let leaf_client_conn_port = 4111;
Expand All @@ -27,7 +27,7 @@ pub async fn run(_user_creds_path: &str) {
let leaf_node_remotes = vec![LeafNodeRemote {
// sys account user (automated)
url: leaf_server_remote_conn_url.to_string(),
credentials_path: None, // Some(user_creds_path.to_string()),
credentials_path: Some(user_creds_path.to_string()),
}];

// Create a new Leaf Server instance
Expand Down
31 changes: 13 additions & 18 deletions rust/clients/host_agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
/*
This client is associated with the:
- WORKLOAD account
- hpos user
This client is associated with the:
- WORKLOAD account
- hpos user

// This client is responsible for:
- subscribing to workload streams
- installing new workloads
- removing workloads
- send workload status upon request
This client is responsible for subscribing the host agent to workload stream endpoints:
- installing new workloads
- removing workloads
- sending active periodic workload reports
- sending workload status upon request
*/

// mod auth;
// mod utils;
mod workloads;
mod workload_manager;
use anyhow::Result;
use clap::Parser;
use dotenv::dotenv;
Expand Down Expand Up @@ -57,12 +54,10 @@ async fn main() -> Result<(), AgentCliError> {
}

async fn daemonize() -> Result<(), async_nats::Error> {
// let user_creds_path = auth::initializer::run().await?;
let user_creds_path = "placeholder_creds_that_will_not_be_read".to_string();
gen_leaf_server::run(&user_creds_path).await;

let user_creds_path = nats_js_client::get_nats_client_creds("HOLO", "HPOS", "hpos");
workloads::manager::run(&user_creds_path).await?;

// let (host_pubkey, host_creds_path) = auth::initializer::run().await?;
let host_creds_path = nats_js_client::get_nats_client_creds("HOLO", "HPOS", "hpos");
let host_pubkey = "host_id_placeholder>";
gen_leaf_server::run(&host_creds_path).await;
workload_manager::run(host_pubkey, &host_creds_path).await?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,33 @@
- subscribing to workload streams
- installing new workloads
- removing workloads
- send workload status upon request
- sending active periodic workload reports
- sending workload status upon request
- sending active periodic workload reports
*/

use super::endpoints;
use anyhow::{anyhow, Result};
// use mongodb::{options::ClientOptions, Client as MongoDBClient};
use std::time::Duration;
use mongodb::{options::ClientOptions, Client as MongoDBClient};
use std::{sync::Arc, time::Duration};
use util_libs::{
// db::mongodb::get_mongodb_url,
db::mongodb::get_mongodb_url,
js_stream_service::JsServiceParamsPartial,
nats_js_client::{self, EndpointType, JsClient},
nats_js_client::{self, EndpointType, },
};
use workload::{
WorkloadApi, WORKLOAD_SRV_DESC, WORKLOAD_SRV_NAME, WORKLOAD_SRV_SUBJ, WORKLOAD_SRV_VERSION,
};
use async_nats::Message;

const HOST_AGENT_CLIENT_NAME: &str = "Host Agent";
const HOST_AGENT_CLIENT_INBOX_PREFIX: &str = "_host_inbox";
const HOST_AGENT_INBOX_PREFIX: &str = "_host_inbox";

// TODO: Use _user_creds_path for auth once we add in the more resilient auth pattern.
pub async fn run(user_creds_path: &str) -> Result<(), async_nats::Error> {
// TODO: Use _host_creds_path for auth once we add in the more resilient auth pattern.
pub async fn run(host_pubkey: &str, host_creds_path: &str) -> Result<(), async_nats::Error> {
log::info!("HPOS Agent Client: Connecting to server...");
// ==================== NATS Setup ====================
log::info!("user_creds_path : {}", user_creds_path);
log::info!("host_creds_path : {}", host_creds_path);
log::info!("host_pubkey : {}", host_pubkey);

// ==================== NATS Setup ====================
// Connect to Nats server
let nats_url = nats_js_client::get_nats_url();
log::info!("nats_url : {}", nats_url);
Expand All @@ -49,34 +50,33 @@ pub async fn run(user_creds_path: &str) -> Result<(), async_nats::Error> {

// Spin up Nats Client and loaded in the Js Stream Service
let host_workload_client =
nats_js_client::DefaultJsClient::new(nats_js_client::NewDefaultJsClientParams {
nats_js_client::JsClient::new(nats_js_client::NewJsClientParams {
nats_url,
name: HOST_AGENT_CLIENT_NAME.to_string(),
inbox_prefix: format!(
"{}_{}",
HOST_AGENT_CLIENT_INBOX_PREFIX, "<host_id_placeholder>"
HOST_AGENT_INBOX_PREFIX, host_pubkey
),
service_params: vec![workload_stream_service_params],
credentials_path: None, // Some(user_creds_path.to_string()),
credentials_path: Some(host_creds_path.to_string()),
opts: vec![nats_js_client::with_event_listeners(event_listeners)],
ping_interval: Some(Duration::from_secs(10)),
request_timeout: Some(Duration::from_secs(5)),
})
.await?;

// ==================== DB Setup ====================

// Create a new MongoDB Client and connect it to the cluster
// let mongo_uri = get_mongodb_url();
// let client_options = ClientOptions::parse(mongo_uri).await?;
// let client = MongoDBClient::with_options(client_options)?;
let mongo_uri = get_mongodb_url();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we really be calling into MongoDB from the host_agent code? The agent should communicate with the services running centrally, which in turn is the only touch point to the central MongoDB instance. All communication from the agent goes through the API, which is now using NATS as a transport and won't go direct to things like MongoDB.

Copy link
Contributor Author

@JettTech JettTech Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're 100% correct. This was added here for the convenience of testing in the first iteration. The host agents will not have a mongodb instance in production. I actually already have a PR out that both adds the workload api for the orchestrator and separates out the api for the orchestrator and host such that only the orchestrator service api has access to mongodb.

let client_options = ClientOptions::parse(mongo_uri).await?;
let client = MongoDBClient::with_options(client_options)?;

// Generate the Workload API with access to db
let workload_api = WorkloadApi::new().await?;
let workload_api = WorkloadApi::new(&client).await?;

// ==================== API ENDPOINTS ====================
// Register Workload Streams for Host Agent to consume
// (subjects should be published by orchestrator or nats-db-connector)
// NB: Subjects are published by orchestrator or nats-db-connector
let workload_service = host_workload_client
.get_js_service(WORKLOAD_SRV_NAME.to_string())
.await
Expand All @@ -85,44 +85,48 @@ pub async fn run(user_creds_path: &str) -> Result<(), async_nats::Error> {
))?;

workload_service
.add_local_consumer(
.add_local_consumer::<workload::types::ApiResult>(
"start_workload",
"start",
EndpointType::Async(endpoints::start_workload(&workload_api).await),
EndpointType::Async(workload_api.call(|api: WorkloadApi, msg: Arc<Message>| {
async move {
api.start_workload(msg).await
}
})),
None,
)
.await?;

workload_service
.add_local_consumer(
"signal_status_update",
"signal_status_update",
EndpointType::Async(endpoints::signal_status_update(&workload_api).await),
.add_local_consumer::<workload::types::ApiResult>(
"send_workload_status",
"send_status",
EndpointType::Async(workload_api.call(|api: WorkloadApi, msg: Arc<Message>| {
async move {
api.send_workload_status(msg).await
}
})),
None,
)
.await?;

workload_service
.add_local_consumer(
"remove_workload",
"remove",
EndpointType::Async(endpoints::remove_workload(&workload_api).await),
.add_local_consumer::<workload::types::ApiResult>(
"uninstall_workload",
"uninstall",
EndpointType::Async(workload_api.call(|api: WorkloadApi, msg: Arc<Message>| {
async move {
api.uninstall_workload(msg).await
}
})),
None,
)
.await?;

// Only exit program when explicitly requested
tokio::signal::ctrl_c().await?;

// TODO: bring this back with actual implementation
// log::warn!("CTRL+C detected. Please press CTRL+C again within 5 seconds to confirm exit...");
// tokio::select! {
// _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => { log::warn!("Resuming service.") },
// _ = tokio::signal::ctrl_c() => log::error!("Shutting down."),
// }

// Close client and drain internal buffer before exiting to make sure all messages are sent
host_workload_client.close().await?;

Ok(())
}
45 changes: 0 additions & 45 deletions rust/clients/host_agent/src/workloads/endpoints.rs

This file was deleted.

2 changes: 0 additions & 2 deletions rust/clients/host_agent/src/workloads/mod.rs

This file was deleted.

2 changes: 2 additions & 0 deletions rust/services/workload/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ env_logger = { workspace = true }
log = { workspace = true }
dotenv = { workspace = true }
thiserror = { workspace = true }
semver = "1.0.24"
rand = "0.8.5"
mongodb = "3.1"
bson = { version = "2.6.1", features = ["chrono-0_4"] }
url = { version = "2", features = ["serde"] }
Expand Down
Loading