Skip to content

Commit

Permalink
Support for DEBUG DIGEST module data type callback
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta committed Nov 15, 2024
1 parent 9d61cad commit b8754a6
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 3 deletions.
3 changes: 2 additions & 1 deletion src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use crate::digest::Digest;
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
Expand All @@ -26,10 +27,10 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
rdb_load: Some(bloom_callback::bloom_rdb_load),
rdb_save: Some(bloom_callback::bloom_rdb_save),
aof_rewrite: Some(bloom_callback::bloom_aof_rewrite),
digest: Some(bloom_callback::bloom_digest),

mem_usage: Some(bloom_callback::bloom_mem_usage),
// TODO
digest: None,
free: Some(bloom_callback::bloom_free),

aux_load: Some(bloom_callback::bloom_aux_load),
Expand Down
80 changes: 80 additions & 0 deletions src/digest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::os::raw::c_char;
use valkey_module::raw;
use valkey_module::ValkeyString;

/// `Digest` is a high-level rust interface to the Valkey module C API
/// abstracting away the raw C ffi calls.
pub struct Digest {
pub dig: *mut raw::RedisModuleDigest,
}

impl Digest {
pub const fn new(dig: *mut raw::RedisModuleDigest) -> Self {
Self { dig }
}

/// Returns the key name of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetKeyNameFromDigest` is missing in redismodule.h
pub fn get_key_name(&self) -> ValkeyString {
ValkeyString::from_redis_module_string(std::ptr::null_mut(), unsafe {
raw::RedisModule_GetKeyNameFromDigest
.expect("RedisModule_GetKeyNameFromDigest is not available.")(self.dig)
.cast_mut()
})
}

/// Returns the database ID of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetDbIdFromDigest` is missing in redismodule.h
pub fn get_db_id(&self) -> i32 {
unsafe {
raw::RedisModule_GetDbIdFromDigest
.expect("RedisModule_GetDbIdFromDigest is not available.")(self.dig)
}
}

/// Adds a new element to this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddStringBuffer` is missing in redismodule.h
pub fn add_string_buffer(&mut self, ele: &[u8]) {
unsafe {
raw::RedisModule_DigestAddStringBuffer
.expect("RedisModule_DigestAddStringBuffer is not available.")(
self.dig,
ele.as_ptr().cast::<c_char>(),
ele.len(),
)
}
}

/// Similar to [`Digest::add_string_buffer`], but takes [`i64`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddLongLong` is missing in redismodule.h
pub fn add_long_long(&mut self, ll: i64) {
unsafe {
raw::RedisModule_DigestAddLongLong
.expect("RedisModule_DigestAddLongLong is not available.")(self.dig, ll)
}
}

/// Ends the current sequence in this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestEndSequence` is missing in redismodule.h
pub fn end_sequence(&mut self) {
unsafe {
raw::RedisModule_DigestEndSequence
.expect("RedisModule_DigestEndSequence is not available.")(self.dig)
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use valkey_module::configuration::ConfigurationFlags;
use valkey_module::{valkey_module, Context, InfoContext, Status, ValkeyResult, ValkeyString};
pub mod bloom;
pub mod configs;
pub mod digest;
pub mod metrics;
pub mod wrapper;
use crate::bloom::command_handler;
Expand Down
12 changes: 12 additions & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::digest::Digest;
use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
Expand Down Expand Up @@ -118,6 +119,17 @@ pub unsafe extern "C" fn bloom_copy(
Box::into_raw(bb).cast::<libc::c_void>()
}

/// # Safety
/// Raw handler for the Bloom digest callback.
pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) {
let mut dig = Digest::new(md);
let val = &*(value.cast::<BloomFilterType>());
dig.add_long_long(val.expansion.into());
dig.add_long_long(val.capacity());
dig.add_string_buffer(&val.fp_rate.to_le_bytes());
dig.end_sequence();
}

/// # Safety
/// Raw handler for the Bloom object's free_effort callback.
pub unsafe extern "C" fn bloom_free_effort(
Expand Down
5 changes: 5 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ def test_copy_and_exists_cmd(self):
assert client.execute_command('EXISTS filter') == 1
mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4')
assert len(madd_result) == 4 and len(mexists_result) == 4
# cmd debug digest
client.execute_command('DEBUG', 'DIGEST')
debug_filter = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('COPY filter new_filter') == 1
debug_new_filter = client.execute_command('DEBUG DIGEST-VALUE new_filter')
assert debug_filter == debug_new_filter
assert client.execute_command('EXISTS new_filter') == 1
copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4')
assert mexists_result == copy_mexists_result
Expand Down
5 changes: 4 additions & 1 deletion tests/test_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def test_basic_save_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

debug_save_1 = client.execute_command('DEBUG DIGEST-VALUE testSave')

# save rdb, restart sever
client.bgsave()
self.server.wait_for_save_done()
Expand All @@ -33,7 +34,9 @@ def test_basic_save_and_restore(self):
bf_exists_result_2 = client.execute_command('BF.EXISTS testSave item')
assert bf_exists_result_2 == 1
bf_info_result_2 = client.execute_command('BF.INFO testSave')
debug_save_2 = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert bf_info_result_2 == bf_info_result_1
assert debug_save_2 == debug_save_1

def test_restore_failed_large_bloom_filter(self):
client = self.server.get_new_client()
Expand Down
2 changes: 1 addition & 1 deletion tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker):
self.port_tracker = resource_port_tracker

def _get_valkey_args(self):
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""})
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"})
self.args.update(self.get_custom_args())
return self.args

Expand Down

0 comments on commit b8754a6

Please sign in to comment.