Skip to content

Commit

Permalink
Added API to read and trim a stream. (#266)
Browse files Browse the repository at this point in the history
* Added API to read and trim a stream.

* fix command flags

* Apply suggestions from code review

Co-authored-by: Guy Korland <[email protected]>

* Format fixes

* Fix complition.

* Format fixes

* Added reverse option.

Co-authored-by: Guy Korland <[email protected]>
  • Loading branch information
MeirShpilraien and gkorland authored Jan 25, 2023
1 parent dbc7883 commit cc05327
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 2 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ required-features = []
name = "scan_keys"
crate-type = ["cdylib"]

[[example]]
name = "stream"
crate-type = ["cdylib"]

[dependencies]
bitflags = "1.2"
libc = "0.2"
Expand Down
46 changes: 46 additions & 0 deletions examples/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#[macro_use]
extern crate redis_module;

use redis_module::raw::{KeyType, RedisModuleStreamID};
use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue};

fn stream_read_from(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
let mut args = args.into_iter().skip(1);

let stream_key = args.next_arg()?;

let stream = ctx.open_key(&stream_key);
let key_type = stream.key_type();

if key_type != KeyType::Stream {
return Err(RedisError::WrongType);
}

let mut iter = stream.get_stream_iterator(false)?;
let element = iter.next();
let id_to_keep = iter.next().as_ref().map_or_else(
|| RedisModuleStreamID {
ms: u64::MAX,
seq: u64::MAX,
},
|e| e.id,
);

let stream = ctx.open_key_writable(&stream_key);
stream.trim_stream_by_id(id_to_keep, false)?;
Ok(match element {
Some(e) => RedisValue::BulkString(format!("{}-{}", e.id.ms, e.id.seq)),
None => RedisValue::Null,
})
}

//////////////////////////////////////////////////////

redis_module! {
name: "stream",
version: 1,
data_types: [],
commands: [
["STREAM_POP", stream_read_from, "write", 1, 1, 1],
],
}
39 changes: 37 additions & 2 deletions src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use raw::KeyType;
use crate::native_types::RedisType;
use crate::raw;
use crate::redismodule::REDIS_OK;
use crate::stream::StreamIterator;
use crate::RedisError;
use crate::RedisResult;
use crate::RedisString;
Expand All @@ -32,8 +33,8 @@ pub enum KeyMode {

#[derive(Debug)]
pub struct RedisKey {
ctx: *mut raw::RedisModuleCtx,
key_inner: *mut raw::RedisModuleKey,
pub(crate) ctx: *mut raw::RedisModuleCtx,
pub(crate) key_inner: *mut raw::RedisModuleKey,
}

impl RedisKey {
Expand Down Expand Up @@ -136,6 +137,20 @@ impl RedisKey {
};
Ok(val)
}

pub fn get_stream_iterator(&self, reverse: bool) -> Result<StreamIterator, RedisError> {
StreamIterator::new(self, None, None, false, reverse)
}

pub fn get_stream_range_iterator(
&self,
from: Option<raw::RedisModuleStreamID>,
to: Option<raw::RedisModuleStreamID>,
exclusive: bool,
reverse: bool,
) -> Result<StreamIterator, RedisError> {
StreamIterator::new(self, from, to, exclusive, reverse)
}
}

impl Drop for RedisKey {
Expand Down Expand Up @@ -351,6 +366,26 @@ impl RedisKeyWritable {

status.into()
}

pub fn trim_stream_by_id(
&self,
mut id: raw::RedisModuleStreamID,
approx: bool,
) -> Result<usize, RedisError> {
let flags = if approx {
raw::REDISMODULE_STREAM_TRIM_APPROX
} else {
0
};
let res = unsafe {
raw::RedisModule_StreamTrimByID.unwrap()(self.key_inner, flags as i32, &mut id)
};
if res <= 0 {
Err(RedisError::Str("Failed trimming the stream"))
} else {
Ok(res as usize)
}
}
}

/// Opaque type used to hold multi-get results. Use the provided methods to convert
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod rediserror;
mod redismodule;
pub mod redisraw;
pub mod redisvalue;
pub mod stream;

mod context;
pub mod key;
Expand Down
98 changes: 98 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::key::RedisKey;
use crate::raw;
use crate::RedisError;
use crate::RedisString;
use crate::Status;
use std::os::raw::c_long;
use std::ptr;

pub struct StreamRecord {
pub id: raw::RedisModuleStreamID,
pub fields: Vec<(RedisString, RedisString)>,
}

pub struct StreamIterator<'key> {
key: &'key RedisKey,
}

impl<'key> StreamIterator<'key> {
pub(crate) fn new(
key: &RedisKey,
mut from: Option<raw::RedisModuleStreamID>,
mut to: Option<raw::RedisModuleStreamID>,
exclusive: bool,
reverse: bool,
) -> Result<StreamIterator, RedisError> {
let mut flags = if exclusive {
raw::REDISMODULE_STREAM_ITERATOR_EXCLUSIVE as i32
} else {
0
};

flags |= if reverse {
raw::REDISMODULE_STREAM_ITERATOR_REVERSE as i32
} else {
0
};

let res = unsafe {
raw::RedisModule_StreamIteratorStart.unwrap()(
key.key_inner,
flags,
from.as_mut().map_or(ptr::null_mut(), |v| v),
to.as_mut().map_or(ptr::null_mut(), |v| v),
)
};
if Status::Ok == res.into() {
Ok(StreamIterator { key })
} else {
Err(RedisError::Str("Failed creating stream iterator"))
}
}
}

impl<'key> Iterator for StreamIterator<'key> {
type Item = StreamRecord;

fn next(&mut self) -> Option<Self::Item> {
let mut id = raw::RedisModuleStreamID { ms: 0, seq: 0 };
let mut num_fields: c_long = 0;
let mut field_name: *mut raw::RedisModuleString = ptr::null_mut();
let mut field_val: *mut raw::RedisModuleString = ptr::null_mut();
if Status::Ok
!= unsafe {
raw::RedisModule_StreamIteratorNextID.unwrap()(
self.key.key_inner,
&mut id,
&mut num_fields,
)
}
.into()
{
return None;
}
let mut fields = Vec::new();
while Status::Ok
== unsafe {
raw::RedisModule_StreamIteratorNextField.unwrap()(
self.key.key_inner,
&mut field_name,
&mut field_val,
)
.into()
}
{
fields.push((
RedisString::from_redis_module_string(self.key.ctx, field_name),
RedisString::from_redis_module_string(self.key.ctx, field_val),
));
}
Some(StreamRecord { id, fields })
}
}

impl<'key> Drop for StreamIterator<'key> {
fn drop(&mut self) {
unsafe { raw::RedisModule_StreamIteratorDelete.unwrap()(self.key.key_inner) };
}
}
40 changes: 40 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,43 @@ fn test_scan() -> Result<()> {

Ok(())
}

#[test]
fn test_stream_reader() -> Result<()> {
let port: u16 = 6487;
let _guards = vec![start_redis_server_with_module("stream", port)
.with_context(|| "failed to start redis server")?];
let mut con =
get_redis_connection(port).with_context(|| "failed to connect to redis server")?;

let _: String = redis::cmd("XADD")
.arg(&["s", "1-1", "foo", "bar"])
.query(&mut con)
.with_context(|| "failed to add data to the stream")?;

let _: String = redis::cmd("XADD")
.arg(&["s", "1-2", "foo", "bar"])
.query(&mut con)
.with_context(|| "failed to add data to the stream")?;

let res: String = redis::cmd("STREAM_POP")
.arg(&["s"])
.query(&mut con)
.with_context(|| "failed to run keys_pos")?;
assert_eq!(res, "1-1");

let res: String = redis::cmd("STREAM_POP")
.arg(&["s"])
.query(&mut con)
.with_context(|| "failed to run keys_pos")?;
assert_eq!(res, "1-2");

let res: usize = redis::cmd("XLEN")
.arg(&["s"])
.query(&mut con)
.with_context(|| "failed to add data to the stream")?;

assert_eq!(res, 0);

Ok(())
}

0 comments on commit cc05327

Please sign in to comment.