diff --git a/Cargo.lock b/Cargo.lock index d580cc38..bf0b0046 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -303,7 +303,7 @@ dependencies = [ [[package]] name = "ai-interfaces" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "async-trait", @@ -1693,7 +1693,7 @@ dependencies = [ [[package]] name = "block-mesh-common" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "aws-config", @@ -1730,7 +1730,7 @@ dependencies = [ [[package]] name = "block-mesh-manager" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "askama", @@ -1819,7 +1819,7 @@ dependencies = [ [[package]] name = "block-mesh-manager-api" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -1853,7 +1853,7 @@ dependencies = [ [[package]] name = "block-mesh-manager-database-domain" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -1878,7 +1878,7 @@ dependencies = [ [[package]] name = "block-mesh-manager-worker" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -1914,7 +1914,7 @@ dependencies = [ [[package]] name = "block-mesh-manager-ws" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -1948,7 +1948,7 @@ dependencies = [ [[package]] name = "block-mesh-solana-client" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anchor-lang", "anyhow", @@ -1979,7 +1979,7 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" [[package]] name = "blockmesh-cli" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -2005,7 +2005,7 @@ dependencies = [ [[package]] name = "blockmesh-program" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anchor-lang", "anchor-spl", @@ -2611,7 +2611,7 @@ dependencies = [ [[package]] name = "client-node" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anchor-lang", "anyhow", @@ -2639,7 +2639,7 @@ dependencies = [ [[package]] name = "cloudflare-affiliate-plan-page" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2655,7 +2655,7 @@ dependencies = [ [[package]] name = "cloudflare-api-token" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2674,7 +2674,7 @@ dependencies = [ [[package]] name = "cloudflare-captcha-page" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2690,7 +2690,7 @@ dependencies = [ [[package]] name = "cloudflare-feature-flags" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2706,7 +2706,7 @@ dependencies = [ [[package]] name = "cloudflare-images-gallery-page" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "askama", @@ -2724,7 +2724,7 @@ dependencies = [ [[package]] name = "cloudflare-landing-page" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2740,7 +2740,7 @@ dependencies = [ [[package]] name = "cloudflare-privacy-page" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2756,7 +2756,7 @@ dependencies = [ [[package]] name = "cloudflare-vps-page" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2774,7 +2774,7 @@ dependencies = [ [[package]] name = "cloudflare-wallet-helper" -version = "0.0.428" +version = "0.0.429" dependencies = [ "askama", "askama_axum", @@ -2790,7 +2790,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-cron" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -2807,7 +2807,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-echo-debug" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -2826,7 +2826,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-ip-data" -version = "0.0.428" +version = "0.0.429" dependencies = [ "block-mesh-common", "rustc-hash 1.1.0", @@ -2841,7 +2841,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-logger-proxy" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -2858,7 +2858,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-logs-queue" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "chrono", @@ -2878,7 +2878,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-solana-relay" -version = "0.0.428" +version = "0.0.429" dependencies = [ "rustc-hash 1.1.0", "serde", @@ -2894,7 +2894,7 @@ dependencies = [ [[package]] name = "cloudflare-worker-tauri-releases" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "reqwest 0.12.7", @@ -3459,7 +3459,7 @@ dependencies = [ [[package]] name = "dash-with-expiry" -version = "0.0.428" +version = "0.0.429" dependencies = [ "chrono", "dashmap 6.1.0", @@ -3504,7 +3504,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "data-sink" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -3513,6 +3513,7 @@ dependencies = [ "block-mesh-common", "chrono", "clickhouse", + "dash-with-expiry", "dashmap 6.1.0", "database-utils", "flume", @@ -3543,7 +3544,7 @@ dependencies = [ [[package]] name = "database-utils" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -3564,7 +3565,7 @@ dependencies = [ [[package]] name = "debug-area" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "database-utils", @@ -3929,7 +3930,7 @@ checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" [[package]] name = "emails" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -4077,7 +4078,7 @@ dependencies = [ [[package]] name = "extension" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -4132,7 +4133,7 @@ checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "feature-flags-server" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -6253,7 +6254,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "logger-general" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -6273,7 +6274,7 @@ dependencies = [ [[package]] name = "logger-leptos" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -6294,7 +6295,7 @@ dependencies = [ [[package]] name = "logs-drain" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "axum", @@ -7235,7 +7236,7 @@ dependencies = [ [[package]] name = "pg-notify-test" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "database-utils", @@ -7599,7 +7600,7 @@ dependencies = [ [[package]] name = "proxy-endpoint" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anchor-lang", "anyhow", @@ -7628,7 +7629,7 @@ dependencies = [ [[package]] name = "proxy-master" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anchor-lang", "anyhow", @@ -8620,7 +8621,7 @@ dependencies = [ [[package]] name = "secret" -version = "0.0.428" +version = "0.0.429" dependencies = [ "serde", "sqlx", @@ -9874,7 +9875,7 @@ dependencies = [ [[package]] name = "solana-tiny-client" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "bincode", @@ -10041,7 +10042,7 @@ dependencies = [ [[package]] name = "speed-test" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "chrono", @@ -11085,7 +11086,7 @@ dependencies = [ [[package]] name = "test-proxy-endpoint" -version = "0.0.428" +version = "0.0.429" dependencies = [ "block-mesh-common", "bytes", @@ -11104,7 +11105,7 @@ dependencies = [ [[package]] name = "test-proxy-master" -version = "0.0.428" +version = "0.0.429" dependencies = [ "anyhow", "block-mesh-common", @@ -11206,7 +11207,7 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" [[package]] name = "tg-privacy-bot" -version = "0.0.428" +version = "0.0.429" dependencies = [ "ai-interfaces", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 5d1e3626..e9399251 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ dead_code = "forbid" [workspace.package] authors = ["Ohad Dahan "] -version = "0.0.428" +version = "0.0.429" [patch.crates-io] ## aes-gcm-siv 0.10.3 and curve25519-dalek 3.x pin zeroize to <1.4 diff --git a/libs/data-sink/Cargo.toml b/libs/data-sink/Cargo.toml index 8b78a9a9..ba0a3ef9 100644 --- a/libs/data-sink/Cargo.toml +++ b/libs/data-sink/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dash-with-expiry = { path = "../dash-with-expiry" } clickhouse = { workspace = true, features = ["native-tls", "uuid", "time", "inserter"] } validator = { workspace = true, features = ["derive"] } influxdb = { workspace = true, features = ["derive"] } diff --git a/libs/data-sink/src/routes.rs b/libs/data-sink/src/routes.rs index c8d81c42..6e508506 100644 --- a/libs/data-sink/src/routes.rs +++ b/libs/data-sink/src/routes.rs @@ -8,9 +8,11 @@ use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Json, Router}; use block_mesh_common::interfaces::server_api::DigestDataRequest; +use dash_with_expiry::dash_set_with_expiry::DashSetWithExpiry; use database_utils::utils::health_check::health_check; use database_utils::utils::instrument_wrapper::{commit_txn, create_txn}; use reqwest::StatusCode; +use tokio::sync::OnceCell; use validator::validate_email; #[tracing::instrument(name = "db_health", skip_all)] @@ -38,6 +40,8 @@ pub async fn server_health() -> Result { Ok((StatusCode::OK, "OK")) } +static CACHE: OnceCell> = OnceCell::const_new(); + pub async fn digest_data( State(state): State, Json(body): Json, @@ -56,6 +60,14 @@ pub async fn digest_data( } commit_txn(transaction).await?; if state.use_clickhouse { + let cache = CACHE + .get_or_init(|| async { DashSetWithExpiry::new() }) + .await; + let key = (body.data.origin.clone(), body.data.id.clone()); + if cache.get(&key).is_some() { + return Ok((StatusCode::ALREADY_REPORTED, "Already reported")); + } + let result = DataSink::dup_exists_clickhouse( &state.clickhouse_client, &body.data.origin, @@ -63,6 +75,7 @@ pub async fn digest_data( ) .await?; if result { + cache.insert(key, None); return Ok((StatusCode::ALREADY_REPORTED, "Already reported")); } let _ = DataSink::create_data_sink_clickhouse( @@ -71,6 +84,7 @@ pub async fn digest_data( body.data, ) .await; + cache.insert(key, None); } else { let data_sink_db_pool = &state.data_sink_db_pool; let mut transaction = create_txn(data_sink_db_pool).await?; diff --git a/scripts/init_clickhouse.sh b/scripts/init_clickhouse.sh index d984647c..326ac7ab 100755 --- a/scripts/init_clickhouse.sh +++ b/scripts/init_clickhouse.sh @@ -5,7 +5,7 @@ export _PWD="$(pwd)" export ROOT="$(git rev-parse --show-toplevel)" source "${ROOT}/scripts/setup.sh" -DOCKERS="$(docker ps -a -q --filter clickhouse/clickhouse-server --format="{{.ID}}")" +DOCKERS="$(docker ps -a -q --filter ancestor=clickhouse/clickhouse-server --format="{{.ID}}")" if [ -n "$DOCKERS" ] then ensure docker rm --force --volumes $DOCKERS