Skip to content

Commit

Permalink
Add additional methods for getting raw messages from the Stream
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Jan 8, 2025
1 parent 9a507db commit 18c6f7b
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 41 deletions.
3 changes: 3 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,6 @@ lifecycle
AtomicU64
with_deleted
StreamInfoBuilder
direct_get
direct_get_next_for_subject
direct_get_last_for_subject
3 changes: 3 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ impl Store {
}
Err(err) => match err.kind() {
crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
return Err(EntryError::new(EntryErrorKind::InvalidKey))
}
crate::jetstream::stream::LastRawMessageErrorKind::Other => {
return Err(EntryError::with_source(EntryErrorKind::Other, err))
}
Expand Down
160 changes: 119 additions & 41 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ impl<I> Stream<I> {
}
StreamMessage::try_from(response).map_err(Into::into)
}
/// Get a raw message from the stream.
/// Get a raw message from the stream for a given stream sequence.
/// This low-level API always reaches stream leader.
/// This should be discouraged in favor of using [Stream::direct_get].
///
/// # Examples
///
Expand Down Expand Up @@ -588,14 +590,102 @@ impl<I> Stream<I> {
/// # }
/// ```
pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
self.raw_message(StreamGetMessage {
sequence: Some(sequence),
last_by_subject: None,
next_by_subject: None,
})
.await
}

/// Get a fist message from the stream for a given subject starting from provided sequence..
/// This low-level API always reaches stream leader.
/// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
///
/// # Examples
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// use futures::TryStreamExt;
///
/// let client = async_nats::connect("localhost:4222").await?;
/// let context = async_nats::jetstream::new(client);
/// let stream = context.get_stream_no_info("events").await?;
///
/// let raw_message = stream
/// .get_first_raw_message_by_subject("events.created", 10)
/// .await?;
/// println!("Retrieved raw message {:?}", raw_message);
/// # Ok(())
/// # }
/// ```
pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: u64,
) -> Result<StreamMessage, RawMessageError> {
self.raw_message(StreamGetMessage {
sequence: Some(sequence),
last_by_subject: None,
next_by_subject: Some(subject.as_ref().to_string()),
})
.await
}

/// Get a next message from the stream for a given subject.
/// This low-level API always reaches stream leader.
/// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
///
/// # Examples
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// use futures::TryStreamExt;
///
/// let client = async_nats::connect("localhost:4222").await?;
/// let context = async_nats::jetstream::new(client);
/// let stream = context.get_stream_no_info("events").await?;
///
/// let raw_message = stream
/// .get_next_raw_message_by_subject("events.created")
/// .await?;
/// println!("Retrieved raw message {:?}", raw_message);
/// # Ok(())
/// # }
/// ```
pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<StreamMessage, RawMessageError> {
self.raw_message(StreamGetMessage {
sequence: None,
last_by_subject: None,
next_by_subject: Some(subject.as_ref().to_string()),
})
.await
}

async fn raw_message(
&self,
request: StreamGetMessage,
) -> Result<StreamMessage, RawMessageError> {
for subject in [&request.last_by_subject, &request.next_by_subject]
.into_iter()
.flatten()
{
if !is_valid_subject(subject) {
return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
}
}
let subject = format!("STREAM.MSG.GET.{}", &self.name);
let payload = json!({
"seq": sequence,
});

let response: Response<GetRawMessage> = self
.context
.request(subject, &payload)
.request(subject, &request)
.map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
.await?;

Expand All @@ -616,7 +706,9 @@ impl<I> Stream<I> {
}
}

/// Get the last raw message from the stream by subject.
/// Get a last message from the stream for a given subject.
/// This low-level API always reaches stream leader.
/// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
///
/// # Examples
///
Expand All @@ -628,17 +720,11 @@ impl<I> Stream<I> {
///
/// let client = async_nats::connect("localhost:4222").await?;
/// let context = async_nats::jetstream::new(client);
/// let stream = context.get_stream_no_info("events").await?;
///
/// let stream = context
/// .get_or_create_stream(async_nats::jetstream::stream::Config {
/// name: "events".to_string(),
/// max_messages: 10_000,
/// ..Default::default()
/// })
/// let raw_message = stream
/// .get_last_raw_message_by_subject("events.created")
/// .await?;
///
/// let publish_ack = context.publish("events", "data".into()).await?;
/// let raw_message = stream.get_last_raw_message_by_subject("events").await?;
/// println!("Retrieved raw message {:?}", raw_message);
/// # Ok(())
/// # }
Expand All @@ -647,32 +733,12 @@ impl<I> Stream<I> {
&self,
stream_subject: &str,
) -> Result<StreamMessage, LastRawMessageError> {
let subject = format!("STREAM.MSG.GET.{}", &self.name);
let payload = json!({
"last_by_subj": stream_subject,
});

let response: Response<GetRawMessage> = self
.context
.request(subject, &payload)
.map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
.await?;
match response {
Response::Err { error } => {
if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
Err(LastRawMessageError::new(
LastRawMessageErrorKind::NoMessageFound,
))
} else {
Err(LastRawMessageError::new(
LastRawMessageErrorKind::JetStream(error),
))
}
}
Response::Ok(value) => Ok(value.message.try_into().map_err(|err| {
LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err)
})?),
}
self.raw_message(StreamGetMessage {
sequence: None,
last_by_subject: Some(stream_subject.to_string()),
next_by_subject: None,
})
.await
}

/// Delete a message from the stream.
Expand Down Expand Up @@ -2121,6 +2187,7 @@ impl futures::Stream for Consumers {
#[derive(Clone, Debug, PartialEq)]
pub enum LastRawMessageErrorKind {
NoMessageFound,
InvalidSubject,
JetStream(super::errors::Error),
Other,
}
Expand All @@ -2129,6 +2196,7 @@ impl Display for LastRawMessageErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoMessageFound => write!(f, "no message found"),
Self::InvalidSubject => write!(f, "invalid subject"),
Self::Other => write!(f, "failed to get last raw message"),
Self::JetStream(err) => write!(f, "JetStream error: {}", err),
}
Expand Down Expand Up @@ -2331,6 +2399,16 @@ impl From<super::context::RequestError> for ConsumerCreateStrictError {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct StreamGetMessage {
#[serde(rename = "seq", skip_serializing_if = "is_default")]
sequence: Option<u64>,
#[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
next_by_subject: Option<String>,
#[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
last_by_subject: Option<String>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
63 changes: 63 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3799,4 +3799,67 @@ mod jetstream {
println!("count: {count}");
assert!(count.eq(&220_000));
}

#[tokio::test]
async fn raw_messages() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::ConnectOptions::new()
.connect(server.client_url())
.await
.unwrap();
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
..Default::default()
})
.await
.unwrap();

for i in 1..=10 {
jetstream
.publish(format!("events.{i}"), format!("{i}").into())
.await
.unwrap()
.await
.unwrap();
}
jetstream
.publish("events.2", "2".into())
.await
.unwrap()
.await
.unwrap();

// by sequence
let message = stream.get_raw_message(5).await.unwrap();
assert_eq!(message.sequence, 5);
assert_eq!(from_utf8(&message.payload).unwrap(), "5");

// next by subject
let message = stream
.get_next_raw_message_by_subject("events.2")
.await
.unwrap();
assert_eq!(message.sequence, 2);
assert_eq!(from_utf8(&message.payload).unwrap(), "2");

// last by subject
let message = stream
.get_last_raw_message_by_subject("events.2")
.await
.unwrap();
assert_eq!(message.sequence, 11);
assert_eq!(from_utf8(&message.payload).unwrap(), "2");

// first by subject starting from sequence
let message = stream
.get_first_raw_message_by_subject("events.2", 5)
.await
.unwrap();
assert_eq!(message.sequence, 11);
assert_eq!(from_utf8(&message.payload).unwrap(), "2");
}
}

0 comments on commit 18c6f7b

Please sign in to comment.