diff --git a/.config/nats.dic b/.config/nats.dic index 0a635906d..22265d8d1 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -160,3 +160,6 @@ lifecycle AtomicU64 with_deleted StreamInfoBuilder +direct_get +direct_get_next_for_subject +direct_get_last_for_subject diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 439a20d91..48f331a93 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -387,6 +387,9 @@ impl Store { } Err(err) => match err.kind() { crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None, + crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => { + return Err(EntryError::new(EntryErrorKind::InvalidKey)) + } crate::jetstream::stream::LastRawMessageErrorKind::Other => { return Err(EntryError::with_source(EntryErrorKind::Other, err)) } diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 691c6dfd4..fc0950c7a 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -560,7 +560,9 @@ impl Stream { } StreamMessage::try_from(response).map_err(Into::into) } - /// Get a raw message from the stream. + /// Get a raw message from the stream for a given stream sequence. + /// This low-level API always reaches stream leader. + /// This should be discouraged in favor of using [Stream::direct_get]. /// /// # Examples /// @@ -588,14 +590,102 @@ impl Stream { /// # } /// ``` pub async fn get_raw_message(&self, sequence: u64) -> Result { + self.raw_message(StreamGetMessage { + sequence: Some(sequence), + last_by_subject: None, + next_by_subject: None, + }) + .await + } + + /// Get a fist message from the stream for a given subject starting from provided sequence.. + /// This low-level API always reaches stream leader. + /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject]. + /// + /// # Examples + /// + /// ```no_run + /// #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::StreamExt; + /// use futures::TryStreamExt; + /// + /// let client = async_nats::connect("localhost:4222").await?; + /// let context = async_nats::jetstream::new(client); + /// let stream = context.get_stream_no_info("events").await?; + /// + /// let raw_message = stream + /// .get_first_raw_message_by_subject("events.created", 10) + /// .await?; + /// println!("Retrieved raw message {:?}", raw_message); + /// # Ok(()) + /// # } + /// ``` + pub async fn get_first_raw_message_by_subject>( + &self, + subject: T, + sequence: u64, + ) -> Result { + self.raw_message(StreamGetMessage { + sequence: Some(sequence), + last_by_subject: None, + next_by_subject: Some(subject.as_ref().to_string()), + }) + .await + } + + /// Get a next message from the stream for a given subject. + /// This low-level API always reaches stream leader. + /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject]. + /// + /// # Examples + /// + /// ```no_run + /// #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::StreamExt; + /// use futures::TryStreamExt; + /// + /// let client = async_nats::connect("localhost:4222").await?; + /// let context = async_nats::jetstream::new(client); + /// let stream = context.get_stream_no_info("events").await?; + /// + /// let raw_message = stream + /// .get_next_raw_message_by_subject("events.created") + /// .await?; + /// println!("Retrieved raw message {:?}", raw_message); + /// # Ok(()) + /// # } + /// ``` + pub async fn get_next_raw_message_by_subject>( + &self, + subject: T, + ) -> Result { + self.raw_message(StreamGetMessage { + sequence: None, + last_by_subject: None, + next_by_subject: Some(subject.as_ref().to_string()), + }) + .await + } + + async fn raw_message( + &self, + request: StreamGetMessage, + ) -> Result { + for subject in [&request.last_by_subject, &request.next_by_subject] + .into_iter() + .flatten() + { + if !is_valid_subject(subject) { + return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject)); + } + } let subject = format!("STREAM.MSG.GET.{}", &self.name); - let payload = json!({ - "seq": sequence, - }); let response: Response = self .context - .request(subject, &payload) + .request(subject, &request) .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err)) .await?; @@ -616,7 +706,9 @@ impl Stream { } } - /// Get the last raw message from the stream by subject. + /// Get a last message from the stream for a given subject. + /// This low-level API always reaches stream leader. + /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject]. /// /// # Examples /// @@ -628,17 +720,11 @@ impl Stream { /// /// let client = async_nats::connect("localhost:4222").await?; /// let context = async_nats::jetstream::new(client); + /// let stream = context.get_stream_no_info("events").await?; /// - /// let stream = context - /// .get_or_create_stream(async_nats::jetstream::stream::Config { - /// name: "events".to_string(), - /// max_messages: 10_000, - /// ..Default::default() - /// }) + /// let raw_message = stream + /// .get_last_raw_message_by_subject("events.created") /// .await?; - /// - /// let publish_ack = context.publish("events", "data".into()).await?; - /// let raw_message = stream.get_last_raw_message_by_subject("events").await?; /// println!("Retrieved raw message {:?}", raw_message); /// # Ok(()) /// # } @@ -647,32 +733,12 @@ impl Stream { &self, stream_subject: &str, ) -> Result { - let subject = format!("STREAM.MSG.GET.{}", &self.name); - let payload = json!({ - "last_by_subj": stream_subject, - }); - - let response: Response = self - .context - .request(subject, &payload) - .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err)) - .await?; - match response { - Response::Err { error } => { - if error.error_code() == ErrorCode::NO_MESSAGE_FOUND { - Err(LastRawMessageError::new( - LastRawMessageErrorKind::NoMessageFound, - )) - } else { - Err(LastRawMessageError::new( - LastRawMessageErrorKind::JetStream(error), - )) - } - } - Response::Ok(value) => Ok(value.message.try_into().map_err(|err| { - LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err) - })?), - } + self.raw_message(StreamGetMessage { + sequence: None, + last_by_subject: Some(stream_subject.to_string()), + next_by_subject: None, + }) + .await } /// Delete a message from the stream. @@ -2121,6 +2187,7 @@ impl futures::Stream for Consumers { #[derive(Clone, Debug, PartialEq)] pub enum LastRawMessageErrorKind { NoMessageFound, + InvalidSubject, JetStream(super::errors::Error), Other, } @@ -2129,6 +2196,7 @@ impl Display for LastRawMessageErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::NoMessageFound => write!(f, "no message found"), + Self::InvalidSubject => write!(f, "invalid subject"), Self::Other => write!(f, "failed to get last raw message"), Self::JetStream(err) => write!(f, "JetStream error: {}", err), } @@ -2331,6 +2399,16 @@ impl From for ConsumerCreateStrictError { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct StreamGetMessage { + #[serde(rename = "seq", skip_serializing_if = "is_default")] + sequence: Option, + #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")] + next_by_subject: Option, + #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")] + last_by_subject: Option, +} + #[cfg(test)] mod tests { use super::*; diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 4b0dd9be2..b4ab39bb0 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -3799,4 +3799,67 @@ mod jetstream { println!("count: {count}"); assert!(count.eq(&220_000)); } + + #[tokio::test] + async fn raw_messages() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::ConnectOptions::new() + .connect(server.client_url()) + .await + .unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let stream = jetstream + .create_stream(stream::Config { + name: "events".to_string(), + subjects: vec!["events.>".to_string()], + ..Default::default() + }) + .await + .unwrap(); + + for i in 1..=10 { + jetstream + .publish(format!("events.{i}"), format!("{i}").into()) + .await + .unwrap() + .await + .unwrap(); + } + jetstream + .publish("events.2", "2".into()) + .await + .unwrap() + .await + .unwrap(); + + // by sequence + let message = stream.get_raw_message(5).await.unwrap(); + assert_eq!(message.sequence, 5); + assert_eq!(from_utf8(&message.payload).unwrap(), "5"); + + // next by subject + let message = stream + .get_next_raw_message_by_subject("events.2") + .await + .unwrap(); + assert_eq!(message.sequence, 2); + assert_eq!(from_utf8(&message.payload).unwrap(), "2"); + + // last by subject + let message = stream + .get_last_raw_message_by_subject("events.2") + .await + .unwrap(); + assert_eq!(message.sequence, 11); + assert_eq!(from_utf8(&message.payload).unwrap(), "2"); + + // first by subject starting from sequence + let message = stream + .get_first_raw_message_by_subject("events.2", 5) + .await + .unwrap(); + assert_eq!(message.sequence, 11); + assert_eq!(from_utf8(&message.payload).unwrap(), "2"); + } }