From e3b41c9b392cf1db05f415758719bd5b41cedba1 Mon Sep 17 00:00:00 2001 From: Asuna Date: Mon, 12 Aug 2024 01:52:49 +0800 Subject: [PATCH] Always update stored status incrementally --- src/platform.rs | 2 +- src/source/bilibili/space.rs | 135 +--------------- src/source/mod.rs | 288 +++++++++++++++++++++++++++++++---- src/task/subscription.rs | 5 +- 4 files changed, 268 insertions(+), 162 deletions(-) diff --git a/src/platform.rs b/src/platform.rs index c5c38b1..ef7f25a 100644 --- a/src/platform.rs +++ b/src/platform.rs @@ -2,7 +2,7 @@ pub trait PlatformTrait: Send + Sync { fn metadata(&self) -> PlatformMetadata; } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct PlatformMetadata { pub display_name: &'static str, } diff --git a/src/source/bilibili/space.rs b/src/source/bilibili/space.rs index d947f71..a30b2de 100644 --- a/src/source/bilibili/space.rs +++ b/src/source/bilibili/space.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, collections::HashSet, fmt, future::Future, pin::Pin}; +use std::{borrow::Cow, fmt, future::Future, pin::Pin}; use anyhow::{anyhow, bail, Ok}; use once_cell::sync::Lazy; @@ -6,16 +6,15 @@ use reqwest::header::{self, HeaderValue}; use serde::Deserialize; use serde_json::{self as json}; use spdlog::prelude::*; -use tokio::sync::{Mutex, OnceCell}; +use tokio::sync::Mutex; use super::{upgrade_to_https, Response}; use crate::{ helper, platform::{PlatformMetadata, PlatformTrait}, source::{ - FetcherTrait, Notification, NotificationKind, Post, PostAttachment, PostAttachmentImage, - PostPlatformUniqueId, PostUrl, PostUrls, Posts, PostsRef, RepostFrom, Status, StatusKind, - StatusSource, User, + FetcherTrait, Post, PostAttachment, PostAttachmentImage, PostUrl, PostUrls, Posts, + RepostFrom, Status, StatusKind, StatusSource, User, }, }; @@ -273,8 +272,6 @@ mod data { pub struct Fetcher { params: ConfigParams, - first_fetch: OnceCell<()>, - fetched_cache: Mutex>, } impl PlatformTrait for Fetcher { @@ -289,13 +286,6 @@ impl FetcherTrait for Fetcher { fn fetch_status(&self) -> Pin> + Send + '_>> { Box::pin(self.fetch_status_impl()) } - - fn post_filter<'a>( - &'a self, - notification: Notification<'a>, - ) -> Pin>> + Send + '_>> { - Box::pin(self.post_filter_impl(notification)) - } } impl fmt::Display for Fetcher { @@ -306,26 +296,12 @@ impl fmt::Display for Fetcher { impl Fetcher { pub fn new(params: ConfigParams) -> Self { - Self { - params, - first_fetch: OnceCell::new(), - fetched_cache: Mutex::new(HashSet::new()), - } + Self { params } } async fn fetch_status_impl(&self) -> anyhow::Result { let posts = fetch_space_history(self.params.user_id).await?; - // The initial full cache for `post_filter` - self.first_fetch - .get_or_init(|| async { - let mut published_cache = self.fetched_cache.lock().await; - posts.0.iter().for_each(|post| { - assert!(published_cache.insert(post.platform_unique_id())); - }) - }) - .await; - Ok(Status::new( StatusKind::Posts(posts), StatusSource { @@ -336,35 +312,6 @@ impl Fetcher { }, )) } - - async fn post_filter_impl<'a>( - &self, - mut notification: Notification<'a>, - ) -> Option> { - // Sometimes the API returns posts without all "published video" posts, it - // causes the problem that the next update will treat the missing posts as new - // posts and notify them again. So we do some hacky filter here. - - if let NotificationKind::Posts(posts) = notification.kind { - let mut fetched_cache = self.fetched_cache.lock().await; - let remaining_posts = posts - .0 - .into_iter() - .filter(|post| !fetched_cache.contains(&post.platform_unique_id())) - .collect::>(); - - remaining_posts.iter().for_each(|post| { - assert!(fetched_cache.insert(post.platform_unique_id())); - }); - drop(fetched_cache); - - if remaining_posts.is_empty() { - return None; - } - notification.kind = NotificationKind::Posts(PostsRef(remaining_posts)); - } - Some(notification) - } } #[allow(clippy::type_complexity)] // No, I don't think it's complex XD @@ -683,76 +630,4 @@ mod tests { .is_empty())); assert!(history.0.iter().all(|post| !post.content.is_empty())); } - - #[tokio::test] - async fn dedup_published_videos() { - let fetcher = Fetcher::new(ConfigParams { user_id: 1 }); - - let source = StatusSource { - platform: PlatformMetadata { - display_name: "test.platform", - }, - user: None, - }; - let mut posts = vec![]; - - macro_rules! make_notification { - ( $posts:expr ) => { - Notification { - kind: NotificationKind::Posts(PostsRef($posts)), - source: &source, - } - }; - () => { - make_notification!(posts.iter().collect()) - }; - } - - assert!(fetcher.post_filter(make_notification!()).await.is_none()); - - posts.push(Post { - user: Some(User { - nickname: "test display name".into(), - profile_url: "https://test.profile".into(), - avatar_url: "https://test.avatar".into(), - }), - content: "test1".into(), - urls: PostUrls::from_iter([PostUrl::new_clickable("https://test1", "View")]).unwrap(), - repost_from: None, - attachments: vec![], - }); - - let filtered = fetcher.post_filter(make_notification!()).await; - assert!(matches!( - filtered.unwrap().kind, - NotificationKind::Posts(posts) if posts.0.len() == 1 && posts.0[0].content == "test1" - )); - - let filtered = fetcher.post_filter(make_notification!()).await; - assert!(filtered.is_none()); - - let filtered = fetcher.post_filter(make_notification!(vec![])).await; - assert!(filtered.is_none()); - - posts.push(Post { - user: Some(User { - nickname: "test display name".into(), - profile_url: "https://test.profile".into(), - avatar_url: "https://test.avatar".into(), - }), - content: "test2".into(), - urls: PostUrls::from_iter([PostUrl::new_clickable("https://test2", "View")]).unwrap(), - repost_from: None, - attachments: vec![], - }); - - let filtered = fetcher.post_filter(make_notification!()).await; - assert!(matches!( - filtered.unwrap().kind, - NotificationKind::Posts(posts) if posts.0.len() == 1 && posts.0[0].content == "test2" - )); - - let filtered = fetcher.post_filter(make_notification!(vec![])).await; - assert!(filtered.is_none()); - } } diff --git a/src/source/mod.rs b/src/source/mod.rs index e5468a7..b0edb31 100644 --- a/src/source/mod.rs +++ b/src/source/mod.rs @@ -2,6 +2,7 @@ pub mod bilibili; pub mod twitter; use std::{ + borrow::Borrow, fmt::{self, Display}, future::Future, pin::Pin, @@ -50,22 +51,22 @@ impl fmt::Display for ConfigSourcePlatform { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct StatusSource { pub platform: PlatformMetadata, pub user: Option, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct StatusSourceUser { pub display_name: String, pub profile_url: String, } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Status(Option); -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] struct StatusInner { kind: StatusKind, source: StatusSource, @@ -80,10 +81,6 @@ impl Status { Self(Some(StatusInner { kind, source })) } - pub fn is_empty(&self) -> bool { - self.0.is_none() - } - pub fn generate_notifications<'a>(&'a self, last_status: &'a Status) -> Vec> { self.0 .as_ref() @@ -109,7 +106,7 @@ impl Status { notifications } (StatusKind::Posts(posts), Some(StatusKind::Posts(last_posts))) => { - let new_posts = vec_diff_by(&posts.0, &last_posts.0, |l, r| { + let new_posts = vec_diff_by(&last_posts.0, &posts.0, |l, r| { l.platform_unique_id() == r.platform_unique_id() }) .collect::>(); @@ -128,9 +125,40 @@ impl Status { ) .unwrap_or_default() } + + // Sometimes the data source API glitches and returns empty items without + // producing any errors. If we simply replace the stored value of `Status`, when + // the API comes back to normal, we will incorrectly generate notifications with + // all the items as a new update. To solve this issue, call this function, which + // will always incrementally store the new items and never delete the old items. + pub fn update_incrementally(&mut self, new: Status) { + match (&mut self.0, new.0) { + (None, None) => {} + (Some(_), None) => {} + (inner @ None, Some(new)) => *inner = Some(new), + (Some(stored), Some(new)) => { + match (&mut stored.kind, new.kind) { + (StatusKind::Live(stored), StatusKind::Live(new)) => *stored = new, + (StatusKind::Posts(stored), StatusKind::Posts(new)) => { + let mut new = vec_diff_by(&stored.0, new.0, |l, r| { + l.platform_unique_id() == r.platform_unique_id() + }) + .collect::>(); + // TODO: Do we need to care about the order? + stored.0.append(&mut new); + } + _ => unreachable!("the stored and the new status kinds are mismatch"), + } + stored.source.platform = new.source.platform; + if let Some(user) = new.source.user { + stored.source.user = Some(user); + } + } + } + } } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum StatusKind { Live(LiveStatus), Posts(Posts), @@ -145,7 +173,7 @@ impl fmt::Display for StatusKind { } } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct User { pub nickname: String, pub profile_url: String, @@ -155,7 +183,7 @@ pub struct User { #[derive(Debug, Eq, PartialEq, Hash)] pub struct PostPlatformUniqueId(String); -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Post { pub user: Option, pub content: String, @@ -170,7 +198,7 @@ impl Post { } } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct PostUrls(Vec); impl PostUrls { @@ -216,7 +244,7 @@ impl<'a> PostUrlsRef<'a> { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum PostUrl { Clickable(PostUrlClickable), // For some cases. a post doesn't have a URL (e.g. deleted post), but we still need something @@ -249,13 +277,19 @@ impl PostUrl { } } -#[derive(Debug)] +impl PartialEq for PostUrl { + fn eq(&self, other: &Self) -> bool { + self.unique_id().eq(other.unique_id()) + } +} + +#[derive(Clone, Debug)] pub struct PostUrlClickable { pub url: String, pub display: String, } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum RepostFrom { // TODO: Remove this in the future Legacy { is_repost: bool, is_quote: bool }, @@ -319,7 +353,7 @@ impl PartialEq for PostAttachmentVideo { } } -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum LiveStatusKind { Online, Offline, @@ -336,7 +370,7 @@ impl fmt::Display for LiveStatusKind { } } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct LiveStatus { pub kind: LiveStatusKind, pub title: String, @@ -355,7 +389,7 @@ impl fmt::Display for LiveStatus { } } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Posts(Vec); impl fmt::Display for Posts { @@ -439,20 +473,23 @@ pub fn fetcher(platform: &ConfigSourcePlatform) -> Box { } } -fn vec_diff_by<'a, T, F>(lhs: &'a [T], rhs: &'a [T], predicate: F) -> impl Iterator +fn vec_diff_by<'a, T, R, I, F>(prev: &'a [T], new: I, predicate: F) -> impl Iterator + 'a where - F: Fn(&T, &T) -> bool + 'a, + I: IntoIterator + 'a, + F: Fn(&R, &T) -> bool + 'a, { - lhs.iter() - .filter(move |l| !rhs.iter().any(|r| predicate(l, r))) + new.into_iter() + .filter(move |n| !prev.iter().any(|p| predicate(n, p))) } #[allow(dead_code)] -fn vec_diff<'a, T>(lhs: &'a [T], rhs: &'a [T]) -> impl Iterator +fn vec_diff<'a, T, R, I>(prev: &'a [T], new: I) -> impl Iterator + 'a where + I: IntoIterator + 'a, T: PartialEq, + R: Borrow, { - vec_diff_by(lhs, rhs, |l, r| l == r) + vec_diff_by(prev, new, |n, p| p == n.borrow()) } #[cfg(test)] @@ -462,8 +499,205 @@ mod tests { #[test] fn vec_diff_valid() { assert_eq!( - vec_diff(&[1, 2, 3], &[4, 2, 3, 4]).collect::>(), + vec_diff(&[4, 2, 3, 4], &[1, 2, 3]).collect::>(), [&1] - ) + ); + assert_eq!(vec_diff(&[4, 2, 3, 4], [1, 2, 3]).collect::>(), [1]); + } + + #[test] + fn status_incremental_update_live() { + let mut status = Status::empty(); + assert!(status.0.is_none()); + + let new = Status::new( + StatusKind::Live(LiveStatus { + kind: LiveStatusKind::Online, + title: "title1".into(), + streamer_name: "streamer1".into(), + cover_image_url: "cover1".into(), + live_url: "live1".into(), + }), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: Some(StatusSourceUser { + display_name: "user1".into(), + profile_url: "profile1".into(), + }), + }, + ); + status.update_incrementally(new.clone()); + assert_eq!(status, new); + + let new = Status::new( + StatusKind::Live(LiveStatus { + kind: LiveStatusKind::Online, + title: "title2".into(), + streamer_name: "streamer2".into(), + cover_image_url: "cover2".into(), + live_url: "live2".into(), + }), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: Some(StatusSourceUser { + display_name: "user1".into(), + profile_url: "profile1".into(), + }), + }, + ); + status.update_incrementally(new.clone()); + assert_eq!(status, new); + } + + #[test] + fn status_incremental_update_posts() { + let mut status = Status::empty(); + assert!(status.0.is_none()); + + let new = Status::new( + StatusKind::Posts(Posts(vec![Post { + user: None, + content: "content1".into(), + urls: PostUrls::new(PostUrl::Identity("id1".into())), + repost_from: None, + attachments: vec![], + }])), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: Some(StatusSourceUser { + display_name: "user1".into(), + profile_url: "profile1".into(), + }), + }, + ); + status.update_incrementally(new.clone()); + assert_eq!(status, new); + + status.update_incrementally(Status::new( + StatusKind::Posts(Posts(vec![ + Post { + user: None, + content: "content1".into(), + urls: PostUrls::new(PostUrl::Identity("id1".into())), + repost_from: None, + attachments: vec![], + }, + Post { + user: None, + content: "content2".into(), + urls: PostUrls::new(PostUrl::Identity("id2".into())), + repost_from: None, + attachments: vec![], + }, + ])), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: None, + }, + )); + let last = Status::new( + StatusKind::Posts(Posts(vec![ + Post { + user: None, + content: "content1".into(), + urls: PostUrls::new(PostUrl::Identity("id1".into())), + repost_from: None, + attachments: vec![], + }, + Post { + user: None, + content: "content2".into(), + urls: PostUrls::new(PostUrl::Identity("id2".into())), + repost_from: None, + attachments: vec![], + }, + ])), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: Some(StatusSourceUser { + display_name: "user1".into(), + profile_url: "profile1".into(), + }), + }, + ); + assert_eq!(status, last); + + status.update_incrementally(Status::new( + StatusKind::Posts(Posts(vec![])), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: None, + }, + )); + assert_eq!(status, last); + + status.update_incrementally(Status::new( + StatusKind::Posts(Posts(vec![Post { + user: None, + content: "content3".into(), + urls: PostUrls::new(PostUrl::Identity("id3".into())), + repost_from: None, + attachments: vec![], + }])), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: Some(StatusSourceUser { + display_name: "user2".into(), + profile_url: "profile1".into(), + }), + }, + )); + let last = Status::new( + StatusKind::Posts(Posts(vec![ + Post { + user: None, + content: "content1".into(), + urls: PostUrls::new(PostUrl::Identity("id1".into())), + repost_from: None, + attachments: vec![], + }, + Post { + user: None, + content: "content2".into(), + urls: PostUrls::new(PostUrl::Identity("id2".into())), + repost_from: None, + attachments: vec![], + }, + Post { + user: None, + content: "content3".into(), + urls: PostUrls::new(PostUrl::Identity("id3".into())), + repost_from: None, + attachments: vec![], + }, + ])), + StatusSource { + platform: PlatformMetadata { + display_name: "test", + }, + user: Some(StatusSourceUser { + display_name: "user2".into(), + profile_url: "profile1".into(), + }), + }, + ); + assert_eq!(status, last); + + status.update_incrementally(Status::empty()); + assert_eq!(status, last); } } diff --git a/src/task/subscription.rs b/src/task/subscription.rs index 99d972e..9180781 100644 --- a/src/task/subscription.rs +++ b/src/task/subscription.rs @@ -78,10 +78,7 @@ impl TaskSubscription { } } } - if !status.is_empty() { - self.last_status = status; - } - + self.last_status.update_incrementally(status); Ok(()) } }