Skip to content

Commit

Permalink
chore(cubestore): Track size for payloads in rate limiter (#7531)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr authored Dec 14, 2023
1 parent 6447f93 commit cc4092e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
3 changes: 2 additions & 1 deletion rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::cachestore::listener::RocksCacheStoreListener;
use crate::table::{Row, TableValue};
use chrono::{DateTime, Utc};
use datafusion::cube_ext;
use deepsize::DeepSizeOf;
use itertools::Itertools;
use log::{trace, warn};
use serde_derive::{Deserialize, Serialize};
Expand Down Expand Up @@ -592,7 +593,7 @@ impl RocksCacheStore {
}
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, DeepSizeOf)]
pub enum QueueKey {
ById(u64),
ByPath(String),
Expand Down
14 changes: 9 additions & 5 deletions rust/cubestore/cubestore/src/sql/cachestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ impl CacheStoreSqlService {
nx,
} => {
let value_size = key.value.deep_size_of() + value.deep_size_of();
let key = key.value;

let success = self
.cachestore
.cache_set(CacheItem::new(key, ttl, value), nx)
.cache_set(CacheItem::new(key.value, ttl, value), nx)
.await?;

(
Expand Down Expand Up @@ -388,19 +386,25 @@ impl CacheStoreSqlService {
(Arc::new(DataFrame::new(vec![], vec![])), None, true)
}
QueueCommand::MergeExtra { key, payload } => {
let payload_size = payload.deep_size_of();
self.cachestore.queue_merge_extra(key, payload).await?;

(Arc::new(DataFrame::new(vec![], vec![])), None, true)
(
Arc::new(DataFrame::new(vec![], vec![])),
Some(payload_size),
true,
)
}
QueueCommand::Ack { key, result } => {
let result_size = result.as_ref().map(|r| r.deep_size_of());
let success = self.cachestore.queue_ack(key, result).await?;

(
Arc::new(DataFrame::new(
vec![Column::new("success".to_string(), ColumnType::Boolean, 0)],
vec![Row::new(vec![TableValue::Boolean(success)])],
)),
None,
result_size,
true,
)
}
Expand Down
13 changes: 13 additions & 0 deletions rust/cubestore/cubestore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,19 @@ mod tests {
use std::fs;
use std::path::{Path, PathBuf};

#[test]
fn dataframe_deep_size_of() {
for (v, expected_size) in [(
DataFrame::new(
vec![Column::new("payload".to_string(), ColumnType::String, 0)],
vec![Row::new(vec![TableValue::String("foo".to_string())])],
),
162_usize,
)] {
assert_eq!(v.deep_size_of(), expected_size, "size for {:?}", v);
}
}

#[tokio::test]
async fn create_wal_test() {
let config = Config::test("create_chunk_test");
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ mod tests {
(TableValue::Int(1), 32_usize),
(TableValue::Decimal(Decimal::new(1)), 32_usize),
(TableValue::String("foo".into()), 35_usize),
(TableValue::String("foofoo".into()), 38_usize),
] {
assert_eq!(v.deep_size_of(), expected_size, "size for {:?}", v);
}
Expand Down

0 comments on commit cc4092e

Please sign in to comment.