Skip to content

Commit

Permalink
[fix] Fix Bugs (#93)
Browse files Browse the repository at this point in the history
* fix bug where wrong key is used and modified mock worker to be more explict

* fixed bug where heartbeat would start if monitoring is disabled
  • Loading branch information
warittornc authored Nov 1, 2024
1 parent 91701e2 commit 19af41d
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 71 deletions.
21 changes: 12 additions & 9 deletions bothan-api/server-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,19 @@ async fn init_crypto_server(
let manager = Arc::new(manager);
let cloned_manager = manager.clone();

tokio::spawn(async move {
loop {
// Heartbeat is fixed at 1 minute.
tokio::time::sleep(Duration::from_secs(60)).await;
match cloned_manager.post_heartbeat().await {
Ok(_) => info!("heartbeat sent"),
Err(e) => error!("failed to send heartbeat: {e}"),
// Only spawn heartbeat if monitoring is enabled
if config.monitoring.enabled {
tokio::spawn(async move {
loop {
// Heartbeat is fixed at 1 minute.
tokio::time::sleep(Duration::from_secs(60)).await;
match cloned_manager.post_heartbeat().await {
Ok(_) => info!("heartbeat sent"),
Err(e) => error!("failed to send heartbeat: {e}"),
}
}
}
});
});
}

Ok(Arc::new(CryptoQueryServer::new(manager)))
}
Expand Down
193 changes: 131 additions & 62 deletions bothan-core/src/manager/crypto_asset_info/price/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn process_source_query<'a>(
) -> Result<Option<(String, Decimal)>, MissingPrerequisiteError> {
let source_id = &source_query.source_id;
let query_id = &source_query.query_id;
match worker.get_asset(source_id).await {
match worker.get_asset(query_id).await {
Ok(AssetState::Available(a)) if a.timestamp.ge(&stale_cutoff) => {
// Create a record for the specific source
source_records.push((
Expand Down Expand Up @@ -221,6 +221,7 @@ fn compute_source_routes(

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use num_traits::One;
Expand All @@ -237,65 +238,70 @@ mod tests {

#[derive(Default)]
struct MockWorker {
value: Decimal,
timestamp: i64,
expected_results: HashMap<String, AssetState>,
}

#[async_trait::async_trait]
impl AssetWorker for MockWorker {
async fn get_asset(&self, id: &str) -> Result<AssetState, StoreError> {
Ok(AssetState::Available(AssetInfo::new(
id.to_string(),
self.value,
self.timestamp,
)))
}

async fn set_query_ids(&self, _: Vec<String>) -> Result<(), SetQueryIDError> {
Ok(())
impl MockWorker {
fn add_expected_query(&mut self, id: String, asset_state: AssetState) {
self.expected_results.insert(id, asset_state);
}
}

#[derive(Default)]
struct MockUnavailableWorker {}

#[async_trait::async_trait]
impl AssetWorker for MockUnavailableWorker {
async fn get_asset(&self, _: &str) -> Result<AssetState, StoreError> {
Ok(AssetState::Unsupported)
impl AssetWorker for MockWorker {
async fn get_asset(&self, id: &str) -> Result<AssetState, StoreError> {
Ok(self
.expected_results
.get(id)
.unwrap_or_else(|| panic!("unexpected id {} received", id))
.clone())
}

async fn set_query_ids(&self, _: Vec<String>) -> Result<(), SetQueryIDError> {
Ok(())
}
}

fn mock_workers<'a>(ids: Vec<String>) -> WorkerMap<'a> {
ids.into_iter()
.map(|id| {
(
id.clone(),
Arc::new(MockWorker::default()) as Arc<dyn AssetWorker>,
)
})
.collect()
}

fn mock_unavailable_workers<'a>(ids: Vec<String>) -> WorkerMap<'a> {
ids.into_iter()
.map(|id| {
(
id.clone(),
Arc::new(MockUnavailableWorker::default()) as Arc<dyn AssetWorker>,
)
})
fn mock_workers<'a, K: Into<String>>(workers: Vec<(K, MockWorker)>) -> WorkerMap<'a> {
workers
.into_iter()
.map(|(id, worker)| (id.into(), Arc::new(worker) as Arc<dyn AssetWorker>))
.collect()
}

#[tokio::test]
async fn test_get_signal_price_states() {
let ids = vec!["CS:BTC-USD".to_string(), "CS:USDT-USD".to_string()];
let workers = mock_workers(vec!["binance".to_string(), "coingecko".to_string()]);

let mut binance_worker = MockWorker::default();
binance_worker.add_expected_query(
"btcusdt".to_string(),
AssetState::Available(AssetInfo::new(
"btcusdt".to_string(),
Decimal::new(69000, 0),
11000,
)),
);

let mut coingecko_worker = MockWorker::default();
coingecko_worker.add_expected_query(
"bitcoin".to_string(),
AssetState::Available(AssetInfo::new(
"bitcoin".to_string(),
Decimal::new(70000, 0),
11001,
)),
);
coingecko_worker.add_expected_query(
"tether".to_string(),
AssetState::Available(AssetInfo::new("tether".to_string(), Decimal::one(), 11002)),
);

let workers = mock_workers(vec![
("binance", binance_worker),
("coingecko", coingecko_worker),
]);

let registry = valid_mock_registry().validate().unwrap();
let stale_cutoff = 0;
let mut records = PriceSignalComputationRecords::default();
Expand All @@ -304,17 +310,27 @@ mod tests {
get_signal_price_states(ids, &workers, &registry, stale_cutoff, &mut records).await;

let expected_res = vec![
PriceState::Available(Decimal::default()),
PriceState::Available(Decimal::default()),
PriceState::Available(Decimal::new(69500, 0)),
PriceState::Available(Decimal::one()),
];
assert_eq!(res, expected_res);
}

#[tokio::test]
async fn test_get_signal_price_states_with_unavailable() {
let ids = vec!["CS:BTC-USD".to_string(), "CS:USDT-USD".to_string()];
let workers =
mock_unavailable_workers(vec!["binance".to_string(), "coingecko".to_string()]);
let mut binance_worker = MockWorker::default();
binance_worker.add_expected_query("btcusdt".to_string(), AssetState::Unsupported);

let mut coingecko_worker = MockWorker::default();
coingecko_worker.add_expected_query("bitcoin".to_string(), AssetState::Unsupported);
coingecko_worker.add_expected_query("tether".to_string(), AssetState::Unsupported);

let workers = mock_workers(vec![
("binance", binance_worker),
("coingecko", coingecko_worker),
]);

let registry = valid_mock_registry().validate().unwrap();
let stale_cutoff = 0;
let mut records = PriceSignalComputationRecords::default();
Expand All @@ -333,17 +349,46 @@ mod tests {
"CS:USDT-USD".to_string(),
"CS:DNE-USD".to_string(),
];
let workers = mock_workers(vec!["binance".to_string(), "coingecko".to_string()]);

let mut binance_worker = MockWorker::default();
binance_worker.add_expected_query(
"btcusdt".to_string(),
AssetState::Available(AssetInfo::new(
"btcusdt".to_string(),
Decimal::new(69000, 0),
11000,
)),
);

let mut coingecko_worker = MockWorker::default();
coingecko_worker.add_expected_query(
"bitcoin".to_string(),
AssetState::Available(AssetInfo::new(
"bitcoin".to_string(),
Decimal::new(70000, 0),
11001,
)),
);
coingecko_worker.add_expected_query(
"tether".to_string(),
AssetState::Available(AssetInfo::new("tether".to_string(), Decimal::one(), 11002)),
);

let workers = mock_workers(vec![
("binance", binance_worker),
("coingecko", coingecko_worker),
]);

let registry = valid_mock_registry().validate().unwrap();
let stale_cutoff = 0;
let stale_cutoff = 10000;
let mut records = PriceSignalComputationRecords::default();

let res =
get_signal_price_states(ids, &workers, &registry, stale_cutoff, &mut records).await;

let expected_res = vec![
PriceState::Available(Decimal::default()),
PriceState::Available(Decimal::default()),
PriceState::Available(Decimal::new(69500, 0)),
PriceState::Available(Decimal::one()),
PriceState::Unsupported,
];
assert_eq!(res, expected_res);
Expand All @@ -358,7 +403,15 @@ mod tests {
)];
let processor = Processor::Median(MedianProcessor::new(1));
let signal = Signal::new(source_queries, processor, vec![]);
let workers = mock_workers(vec!["test-source".to_string()]);

let mut test_source_worker = MockWorker::default();
test_source_worker.add_expected_query(
"testusd".to_string(),
AssetState::Available(AssetInfo::new("testusd".to_string(), Decimal::default(), 0)),
);

let workers = mock_workers(vec![("test-source", test_source_worker)]);

let cache = PriceCache::new();
let stale_cutoff = 0;
let mut record = PriceSignalComputationRecord::default();
Expand Down Expand Up @@ -390,7 +443,15 @@ mod tests {
)];
let processor = Processor::Median(MedianProcessor::new(1));
let signal = Signal::new(source_queries, processor, vec![]);
let workers = mock_workers(vec!["test-source".to_string()]);

let mut test_source_worker = MockWorker::default();
test_source_worker.add_expected_query(
"testusd".to_string(),
AssetState::Available(AssetInfo::new("testusd".to_string(), Decimal::default(), 0)),
);

let workers = mock_workers(vec![("test-source", test_source_worker)]);

let cache = PriceCache::new();
let stale_cutoff = 0;
let mut record = PriceSignalComputationRecord::default();
Expand All @@ -406,31 +467,37 @@ mod tests {

#[tokio::test]
async fn test_process_source_query() {
let worker = MockWorker::default();
let source_query =
SourceQuery::new("test-source".to_string(), "testusd".to_string(), vec![]);
let stale_cutoff = 0;
let mut worker = MockWorker::default();
let id = "testusd".to_string();
let asset_info = AssetInfo::new(id.clone(), Decimal::new(1000, 0), 10);
worker.add_expected_query("testusd".to_string(), AssetState::Available(asset_info));

let source_query = SourceQuery::new("test-source".to_string(), id.clone(), vec![]);
let stale_cutoff = 5;
let cache = PriceCache::new();
let source_records = &mut vec![];

let res =
process_source_query(&worker, &source_query, stale_cutoff, &cache, source_records)
.await;

let expected_res = Ok(Some(("test-source".to_string(), Decimal::default())));
let expected_res = Ok(Some(("test-source".to_string(), Decimal::new(1000, 0))));
let expected_source_records = vec![(
"test-source".to_string(),
SourceRecord::new("testusd".to_string(), Decimal::default(), vec![], None),
SourceRecord::new("testusd".to_string(), Decimal::new(1000, 0), vec![], None),
)];
assert_eq!(res, expected_res);
assert_eq!(source_records, &expected_source_records);
}

#[tokio::test]
async fn test_process_source_query_with_timeout() {
let worker = MockWorker::default();
let source_query =
SourceQuery::new("test-source".to_string(), "testusd".to_string(), vec![]);
let mut worker = MockWorker::default();
let id = "testusd".to_string();
let asset_info = AssetInfo::new(id.clone(), Decimal::default(), 0);
worker.add_expected_query("testusd".to_string(), AssetState::Available(asset_info));

let source_query = SourceQuery::new("test-source".to_string(), id.clone(), vec![]);
let stale_cutoff = 1000;
let cache = PriceCache::new();
let source_records = &mut vec![];
Expand All @@ -443,8 +510,10 @@ mod tests {
}

#[tokio::test]
async fn test_process_source_query_with_unavailable_asset_state() {
let worker = MockUnavailableWorker::default();
async fn test_process_source_query_with_unsupported_asset_state() {
let mut worker = MockWorker::default();
worker.add_expected_query("testusd".to_string(), AssetState::Unsupported);

let source_query =
SourceQuery::new("test-source".to_string(), "testusd".to_string(), vec![]);
let stale_cutoff = 1000;
Expand Down

0 comments on commit 19af41d

Please sign in to comment.