diff --git a/LICENSE b/LICENSE index 261eeb9e..38ffdcdc 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2024 Momento Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/sdk/Cargo.lock b/sdk/Cargo.lock index 096cc39b..47221e5f 100644 --- a/sdk/Cargo.lock +++ b/sdk/Cargo.lock @@ -643,7 +643,7 @@ dependencies = [ [[package]] name = "momento" -version = "0.42.0" +version = "0.43.1" dependencies = [ "anyhow", "base64", @@ -669,9 +669,9 @@ dependencies = [ [[package]] name = "momento-protos" -version = "0.113.0" +version = "0.119.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af2da0a52df61c39b7de6c68f0b6149d6373b178529a3b21c2204480bb02632f" +checksum = "b4f18f7dd8eebb93b42f27d4eba9ea27aa4715da17b618425558c78862ddcb00" dependencies = [ "prost", "tonic", @@ -679,7 +679,7 @@ dependencies = [ [[package]] name = "momento-test-util" -version = "0.42.0" +version = "0.43.1" dependencies = [ "anyhow", "momento", diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 79d7000c..84a7a9f8 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -31,7 +31,7 @@ doc = false [dependencies] -momento-protos = { version = "0.113.0" } +momento-protos = { version = "0.119.4" } log = "0.4" hyper = { version = "0.14" } h2 = { version = "0.3" } diff --git a/sdk/src/topics/messages/subscribe.rs b/sdk/src/topics/messages/subscribe.rs index b542aed5..f1530b42 100644 --- a/sdk/src/topics/messages/subscribe.rs +++ b/sdk/src/topics/messages/subscribe.rs @@ -27,8 +27,8 @@ use crate::{ /// use momento::topics::SubscribeRequest; /// # let (topic_client, cache_name) = momento_test_util::create_doctest_topic_client(); /// -/// // Subscribe to a topic and resume from sequence number 10 -/// let request = SubscribeRequest::new(cache_name, "topic", Some(10)); +/// // Subscribe to a topic and resume from sequence number 10 and page 2 +/// let request = SubscribeRequest::new(cache_name, "topic", Some(10), Some(2)); /// /// // Note: your subscription must be declared as `mut`! /// let mut subscription = topic_client.send_request(request).await?; @@ -50,6 +50,7 @@ pub struct SubscribeRequest { cache_name: String, topic: String, resume_at_topic_sequence_number: Option, + resume_at_sequence_page: Option, } impl SubscribeRequest { @@ -58,11 +59,13 @@ impl SubscribeRequest { cache_name: impl Into, topic: impl Into, resume_at_topic_sequence_number: Option, + resume_at_sequence_page: Option, ) -> Self { Self { cache_name: cache_name.into(), topic: topic.into(), resume_at_topic_sequence_number, + resume_at_sequence_page, } } } @@ -80,6 +83,7 @@ impl MomentoRequest for SubscribeRequest { resume_at_topic_sequence_number: self .resume_at_topic_sequence_number .unwrap_or_default(), + sequence_page: self.resume_at_topic_sequence_number.unwrap_or_default(), }, )?; @@ -94,6 +98,7 @@ impl MomentoRequest for SubscribeRequest { self.cache_name, self.topic, self.resume_at_topic_sequence_number.unwrap_or_default(), + self.resume_at_sequence_page.unwrap_or_default(), SubscriptionState::Subscribed(stream), )) } diff --git a/sdk/src/topics/messages/subscription.rs b/sdk/src/topics/messages/subscription.rs index 05b7320e..e0fae135 100644 --- a/sdk/src/topics/messages/subscription.rs +++ b/sdk/src/topics/messages/subscription.rs @@ -80,6 +80,7 @@ pub struct Subscription { cache_name: String, topic: String, current_sequence_number: u64, + current_sequence_page: u64, current_subscription: SubscriptionState, } @@ -113,6 +114,7 @@ impl Subscription { cache_name: String, topic: String, current_sequence_number: u64, + current_sequence_page: u64, current_subscription: SubscriptionState, ) -> Subscription { Subscription { @@ -120,6 +122,7 @@ impl Subscription { cache_name, topic, current_sequence_number, + current_sequence_page, current_subscription, } } @@ -134,10 +137,12 @@ impl Subscription { pubsub::subscription_item::Kind::Item(item) => match item.value { Some(value) => { let sequence_number = item.topic_sequence_number; + let sequence_page = item.sequence_page; match value.kind { Some(topic_value_kind) => { MapKind::RealItem(SubscriptionItem::Value(SubscriptionValue { topic_sequence_number: sequence_number, + topic_sequence_page: sequence_page, kind: match topic_value_kind { pubsub::topic_value::Kind::Text(text) => { ValueKind::Text(text) @@ -149,13 +154,14 @@ impl Subscription { publisher_id: item.publisher_id, })) } - // This is kind of a broken protocol situation - but we do have a sequence number + // This is kind of a broken protocol situation - but we do have a sequence number and page, // so communicating the discontinuity at least allows downstream consumers to // take action on a partially-unsupported stream. None => { MapKind::RealItem(SubscriptionItem::Discontinuity(Discontinuity { last_sequence_number: None, new_sequence_number: sequence_number, + new_sequence_page: sequence_page, })) } } @@ -166,6 +172,7 @@ impl Subscription { MapKind::RealItem(SubscriptionItem::Discontinuity(Discontinuity { last_sequence_number: Some(discontinuity.last_topic_sequence), new_sequence_number: discontinuity.new_topic_sequence, + new_sequence_page: discontinuity.new_sequence_page, })) } pubsub::subscription_item::Kind::Heartbeat(_) => MapKind::Heartbeat, @@ -179,12 +186,14 @@ impl Subscription { let cache_name = self.cache_name.clone(); let topic = self.topic.clone(); let resume_at_topic_sequence_number = self.current_sequence_number; + let resume_at_topic_sequence_page = self.current_sequence_page; async move { client .subscribe(SubscriptionRequest { cache_name, topic, resume_at_topic_sequence_number, + sequence_page: resume_at_topic_sequence_page, }) .await } @@ -212,12 +221,15 @@ impl futures::Stream for Subscription { SubscriptionItem::Value(v) => { self.current_sequence_number = v.topic_sequence_number; + self.current_sequence_page = v.topic_sequence_page; // We return only SubscriptionValues here break std::task::Poll::Ready(Some(v.clone())); } SubscriptionItem::Discontinuity(d) => { log::debug!("discontinuity! Updating sequence number and continuing..."); - self.current_sequence_number = d.new_sequence_number + self.current_sequence_number = + d.new_sequence_number; + self.current_sequence_page = d.new_sequence_page; } } } @@ -292,7 +304,6 @@ pub(crate) enum SubscriptionItem { /// You might not care about these, and that's okay! It's probably a good idea to /// log them though, so you can reach out for help if you notice something naughty /// that hurts your users. - /// We currently do not expose discontinuities to the end user. Discontinuity(Discontinuity), } @@ -301,9 +312,10 @@ pub(crate) enum SubscriptionItem { pub struct SubscriptionValue { /// The published value. pub kind: ValueKind, - /// Best-effort sequence number for the topic. This is not transactional, it's just - /// to help you know when things are probably working well or probably not working well. + /// The sequence number of the topic. pub topic_sequence_number: u64, + /// The page number of the topic. + pub topic_sequence_page: u64, /// Authenticated id from Publisher's disposable token pub publisher_id: String, } @@ -343,6 +355,9 @@ pub struct Discontinuity { /// This discontinuity's sequence number. The next item on the stream should /// be a value with the next sequence after this. pub new_sequence_number: u64, + + /// This discontinuity's page number. + pub new_sequence_page: u64, } /// How a value should be presented on a subscription stream diff --git a/sdk/src/topics/topic_client.rs b/sdk/src/topics/topic_client.rs index 30e00f71..05283ba2 100644 --- a/sdk/src/topics/topic_client.rs +++ b/sdk/src/topics/topic_client.rs @@ -153,7 +153,7 @@ impl TopicClient { cache_name: impl Into + Clone, topic: impl Into + Clone, ) -> Result { - let request = SubscribeRequest::new(cache_name, topic, None); + let request = SubscribeRequest::new(cache_name, topic, None, None); request.send(self).await }