Skip to content

Commit

Permalink
feat: Add admin api to send targeted push notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
holzeis committed Mar 1, 2024
1 parent e48d93e commit 1c59581
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 19 deletions.
1 change: 1 addition & 0 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async fn main() -> Result<()> {
tx_price_feed,
tx_user_feed,
auth_users_notifier.clone(),
notification_service.get_sender(),
user_backup,
);

Expand Down
79 changes: 79 additions & 0 deletions coordinator/src/campaign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::db;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use crate::routes::AppState;
use crate::AppError;
use axum::extract::State;
use axum::Json;
use bitcoin::secp256k1::PublicKey;
use serde::Deserialize;
use serde::Serialize;
use std::sync::Arc;
use tracing::instrument;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushCampaignParams {
pub node_ids: Vec<PublicKey>,
pub title: String,
pub message: String,
pub dry_run: Option<bool>,
}

#[instrument(skip_all, err(Debug))]
pub async fn post_push_campaign(
State(state): State<Arc<AppState>>,
params: Json<PushCampaignParams>,
) -> Result<String, AppError> {
let params = params.0;
tracing::info!(?params, "Sending campaign with push notifications");

let mut conn = state
.pool
.get()
.map_err(|e| AppError::InternalServerError(format!("Could not get connection: {e:#}")))?;

let users = db::user::get_users(&mut conn, params.node_ids)
.map_err(|e| AppError::InternalServerError(format!("Failed to get users: {e:#}")))?;

let fcm_tokens = users
.iter()
.map(|user| user.fcm_token.clone())
.filter(|token| !token.is_empty() && token != "unavailable")
.map(|token| FcmToken::new(token).expect("token to not be empty"))
.collect::<Vec<_>>();

let notification_kind = NotificationKind::Campaign {
title: params.title.clone(),
message: params.message.clone(),
};

tracing::info!(
"Sending push notification campaign (title: {}, message: {} to {} users",
params.title,
params.message,
fcm_tokens.len(),
);

if params.dry_run.unwrap_or(true) {
tracing::debug!("Not sending push notification campaign because of dry run flag.");
} else {
state
.notification_sender
.send(Notification::new_batch(
fcm_tokens.clone(),
notification_kind,
))
.await
.map_err(|e| {
AppError::InternalServerError(format!("Failed to send push notifications: {e:#}"))
})?;
}

Ok(format!(
"Sending push notification campaign (title: {}, message: {} to {} users",
params.title,
params.message,
fcm_tokens.len(),
))
}
8 changes: 8 additions & 0 deletions coordinator/src/db/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,11 @@ pub fn get_user(conn: &mut PgConnection, trader_id: PublicKey) -> Result<Option<

Ok(maybe_user)
}

pub fn get_users(conn: &mut PgConnection, trader_ids: Vec<PublicKey>) -> Result<Vec<User>> {
let users = users::table
.filter(users::pubkey.eq_any(trader_ids.iter().map(|id| id.to_string())))
.load(conn)?;

Ok(users)
}
1 change: 1 addition & 0 deletions coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod payout_curve;

pub mod admin;
pub mod backup;
pub mod campaign;
pub mod cli;
pub mod db;
pub mod dlc_handler;
Expand Down
5 changes: 1 addition & 4 deletions coordinator/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ async fn process_orderbook_message(
let fcm_token = FcmToken::new(user.fcm_token)?;

notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind,
})
.send(Notification::new(fcm_token, notification_kind))
.await
.with_context(|| {
format!("Failed to send push notification to trader {trader_id}")
Expand Down
49 changes: 34 additions & 15 deletions coordinator/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub enum NotificationKind {
PositionSoonToExpire,
PositionExpired,
CollaborativeRevert,
Campaign { title: String, message: String },
}

impl Display for NotificationKind {
Expand All @@ -21,21 +22,29 @@ impl Display for NotificationKind {
NotificationKind::PositionExpired => write!(f, "PositionExpired"),
NotificationKind::RolloverWindowOpen => write!(f, "RolloverWindowOpen"),
NotificationKind::CollaborativeRevert => write!(f, "CollaborativeRevertPending"),
NotificationKind::Campaign { .. } => write!(f, "Campaign"),
}
}
}

#[derive(Debug, Clone)]
pub struct Notification {
pub user_fcm_token: FcmToken,
pub notification_kind: NotificationKind,
fcm_tokens: Vec<FcmToken>,
notification_kind: NotificationKind,
}

impl Notification {
pub fn new(user_fcm_token: FcmToken, notification_kind: NotificationKind) -> Self {
Self {
notification_kind,
user_fcm_token,
fcm_tokens: vec![user_fcm_token],
}
}

pub fn new_batch(fcm_tokens: Vec<FcmToken>, notification_kind: NotificationKind) -> Self {
Self {
notification_kind,
fcm_tokens,
}
}
}
Expand Down Expand Up @@ -63,19 +72,25 @@ impl NotificationService {
let client = fcm::Client::new();
async move {
while let Some(Notification {
user_fcm_token,
fcm_tokens,
notification_kind,
}) = notification_receiver.recv().await
{
tracing::info!(%notification_kind, %user_fcm_token, "Sending notification");

if !fcm_api_key.is_empty() {
let notification = build_notification(notification_kind);
if let Err(e) =
send_notification(&client, &fcm_api_key, &user_fcm_token, notification)
.await
{
tracing::error!("Could not send notification to FCM: {:?}", e);
for user_fcm_token in fcm_tokens {
tracing::info!(%notification_kind, %user_fcm_token, "Sending notification");

if !fcm_api_key.is_empty() {
let notification = build_notification(&notification_kind);
if let Err(e) = send_notification(
&client,
&fcm_api_key,
&user_fcm_token,
notification,
)
.await
{
tracing::error!("Could not send notification to FCM: {:?}", e);
}
}
}
}
Expand All @@ -94,7 +109,7 @@ impl NotificationService {
}

/// Prepares the notification text
fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> {
fn build_notification(kind: &NotificationKind) -> fcm::Notification<'_> {
let mut notification_builder = fcm::NotificationBuilder::new();
match kind {
NotificationKind::PositionSoonToExpire => {
Expand All @@ -113,6 +128,10 @@ fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> {
notification_builder.title("Error detected");
notification_builder.body("Please open your app to recover your funds.");
}
NotificationKind::Campaign { title, message } => {
notification_builder.title(title);
notification_builder.body(message);
}
}
notification_builder.finalize()
}
Expand Down Expand Up @@ -151,7 +170,7 @@ async fn send_notification<'a>(
let response = client
.send(message)
.await
.context("could not send FCM notification")?;
.context("Could not send FCM notification")?;
tracing::debug!("Sent notification. Response: {:?}", response);
Ok(())
}
6 changes: 6 additions & 0 deletions coordinator/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::admin::list_on_chain_transactions;
use crate::admin::list_peers;
use crate::admin::sign_message;
use crate::backup::SledBackup;
use crate::campaign::post_push_campaign;
use crate::collaborative_revert::confirm_collaborative_revert;
use crate::db;
use crate::db::user;
Expand All @@ -22,6 +23,7 @@ use crate::leaderboard::LeaderBoardQueryParams;
use crate::message::NewUserMessage;
use crate::message::OrderbookMessage;
use crate::node::Node;
use crate::notifications::Notification;
use crate::orderbook::routes::get_order;
use crate::orderbook::routes::get_orders;
use crate::orderbook::routes::post_order;
Expand Down Expand Up @@ -97,6 +99,7 @@ pub struct AppState {
pub announcement_addresses: Vec<SocketAddress>,
pub node_alias: String,
pub auth_users_notifier: mpsc::Sender<OrderbookMessage>,
pub notification_sender: mpsc::Sender<Notification>,
pub user_backup: SledBackup,
pub secp: Secp256k1<VerifyOnly>,
}
Expand All @@ -113,6 +116,7 @@ pub fn router(
tx_price_feed: broadcast::Sender<Message>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
auth_users_notifier: mpsc::Sender<OrderbookMessage>,
notification_sender: mpsc::Sender<Notification>,
user_backup: SledBackup,
) -> Router {
let secp = Secp256k1::verification_only();
Expand All @@ -128,6 +132,7 @@ pub fn router(
announcement_addresses,
node_alias: node_alias.to_string(),
auth_users_notifier,
notification_sender,
user_backup,
secp,
});
Expand Down Expand Up @@ -181,6 +186,7 @@ pub fn router(
get(get_settings).put(update_settings),
)
.route("/api/admin/sync", post(post_sync))
.route("/api/admin/campaign/push", post(post_push_campaign))
.route("/metrics", get(get_metrics))
.route("/health", get(get_health))
.route("/api/leaderboard", get(get_leaderboard))
Expand Down

0 comments on commit 1c59581

Please sign in to comment.