Skip to content

Commit

Permalink
refactor: move bolt12 invoice fetching to sidecar
Browse files Browse the repository at this point in the history
To have the entire /v2/lightning route in the sidecar;
makes it way easier to handle the Nginx config
  • Loading branch information
michael1011 committed Jan 27, 2025
1 parent 343e961 commit 985599e
Show file tree
Hide file tree
Showing 20 changed files with 308 additions and 768 deletions.
11 changes: 0 additions & 11 deletions boltzr/protos/boltzr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ service BoltzR {
rpc SignEvmRefund (SignEvmRefundRequest) returns (SignEvmRefundResponse);

rpc DecodeInvoiceOrOffer (DecodeInvoiceOrOfferRequest) returns (DecodeInvoiceOrOfferResponse);
rpc FetchInvoice (FetchInvoiceRequest) returns (FetchInvoiceResponse);

rpc IsMarked (IsMarkedRequest) returns (IsMarkedResponse);

Expand Down Expand Up @@ -217,16 +216,6 @@ message DecodeInvoiceOrOfferResponse {
}
}

message FetchInvoiceRequest {
string currency = 1;
string offer = 2;
uint64 amount_msat = 3;
}

message FetchInvoiceResponse {
string invoice = 1;
}

message IsMarkedRequest {
string ip = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion boltzr/src/api/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::response::{IntoResponse, Response};
use axum::Json;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ApiError {
pub error: String,
}
Expand Down
166 changes: 161 additions & 5 deletions boltzr/src/api/lightning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,70 @@ use crate::api::errors::{ApiError, AxumError};
use crate::api::ws::status::SwapInfos;
use crate::api::ServerState;
use crate::service::InfoFetchError;
use crate::swap::manager::SwapManager;
use alloy::hex;
use anyhow::Result;
use axum::extract::Path;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{Extension, Json};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Deserialize, Serialize)]
pub struct Bolt12FetchRequest {
offer: String,
// In satoshis
amount: u64,
}

#[derive(Deserialize, Serialize)]
pub struct Bolt12FetchResponse {
invoice: String,
}

#[derive(Deserialize)]
pub struct LightningInfoParams {
currency: String,
node: String,
}

pub async fn lightning_node_info<S>(
Extension(state): Extension<Arc<ServerState<S>>>,
pub async fn bolt12_fetch<S, M>(
Extension(state): Extension<Arc<ServerState<S, M>>>,
Path(currency): Path<String>,
Json(body): Json<Bolt12FetchRequest>,
) -> Result<impl IntoResponse, AxumError>
where
S: SwapInfos + Send + Sync + Clone + 'static,
M: SwapManager + Send + Sync + 'static,
{
let cln = state
.manager
.get_currency(&currency)
.and_then(|currency| currency.cln);

Ok(match cln {
Some(mut cln) => {
let invoice = cln.fetch_invoice(body.offer, body.amount * 1_000).await?;
(StatusCode::CREATED, Json(Bolt12FetchResponse { invoice })).into_response()
}
None => (
StatusCode::NOT_FOUND,
Json(ApiError {
error: "no BOLT12 support".to_string(),
}),
)
.into_response(),
})
}

pub async fn node_info<S, M>(
Extension(state): Extension<Arc<ServerState<S, M>>>,
Path(LightningInfoParams { node, currency }): Path<LightningInfoParams>,
) -> Result<impl IntoResponse, AxumError>
where
S: SwapInfos + Send + Sync + Clone + 'static,
M: SwapManager + Send + Sync + 'static,
{
let node = match decode_node(&node) {
Ok(node) => node,
Expand All @@ -42,12 +85,13 @@ where
)
}

pub async fn lightning_channels<S>(
Extension(state): Extension<Arc<ServerState<S>>>,
pub async fn channels<S, M>(
Extension(state): Extension<Arc<ServerState<S, M>>>,
Path(LightningInfoParams { node, currency }): Path<LightningInfoParams>,
) -> Result<impl IntoResponse, AxumError>
where
S: SwapInfos + Send + Sync + Clone + 'static,
M: SwapManager + Send + Sync + 'static,
{
let node = match decode_node(&node) {
Ok(node) => node,
Expand Down Expand Up @@ -111,8 +155,120 @@ fn handle_info_fetch_error(err: InfoFetchError) -> axum::http::Response<axum::bo
#[cfg(test)]
mod test {
use super::*;
use crate::api::test::Fetcher;
use crate::api::ws::types::SwapStatus;
use crate::api::Server;
use crate::currencies::Currency;
use crate::lightning::invoice::Invoice;
use crate::service::Service;
use crate::swap::manager::test::MockManager;
use crate::wallet::Network;
use axum::body::Body;
use axum::extract::Request;
use axum::Router;
use http_body_util::BodyExt;
use rstest::*;
use tower::ServiceExt;

fn setup_router(manager: MockManager) -> Router {
let (status_tx, _) = tokio::sync::broadcast::channel::<Vec<SwapStatus>>(1);

Server::<Fetcher, MockManager>::add_routes(Router::new()).layer(Extension(Arc::new(
ServerState {
manager: Arc::new(manager),
service: Arc::new(Service::new_mocked_prometheus(false)),
swap_status_update_tx: status_tx.clone(),
swap_infos: Fetcher { status_tx },
},
)))
}

#[tokio::test]
async fn test_bolt12_fetch() {
let mut cln = crate::lightning::cln::test::cln_client().await;
let offer = cln.offer().await.unwrap();

let mut manager = MockManager::new();
{
let cln = cln.clone();
manager.expect_get_currency().returning(move |_| {
Some(Currency {
network: Network::Regtest,
wallet: Arc::new(crate::wallet::Bitcoin::new(Network::Regtest)),
cln: Some(cln.clone()),
lnd: None,
chain: None,
})
});
}

let amount = 21;

let res = setup_router(manager)
.oneshot(
Request::builder()
.method(axum::http::Method::POST)
.uri("/v2/lightning/BTC/bolt12/fetch")
.header(axum::http::header::CONTENT_TYPE, "application/json")
.body(Body::from(
serde_json::to_vec(&Bolt12FetchRequest {
amount,
offer: offer.bolt12,
})
.unwrap(),
))
.unwrap(),
)
.await
.unwrap();

assert_eq!(res.status(), StatusCode::CREATED);

let body = res.into_body().collect().await.unwrap().to_bytes();
let invoice = serde_json::from_slice::<Bolt12FetchResponse>(&body)
.unwrap()
.invoice;

let decoded = crate::lightning::invoice::decode(Network::Regtest, &invoice).unwrap();
match decoded {
Invoice::Bolt12(invoice) => {
assert_eq!(invoice.amount_msats(), amount * 1_000);
}
_ => unreachable!(),
};
}

#[tokio::test]
async fn test_bolt12_fetch_no_cln() {
let mut manager = MockManager::new();
manager.expect_get_currency().return_const(None);

let res = setup_router(manager)
.oneshot(
Request::builder()
.method(axum::http::Method::POST)
.uri("/v2/lightning/BTC/bolt12/fetch")
.header(axum::http::header::CONTENT_TYPE, "application/json")
.body(Body::from(
serde_json::to_vec(&Bolt12FetchRequest {
offer: "".to_string(),
amount: 0,
})
.unwrap(),
))
.unwrap(),
)
.await
.unwrap();

assert_eq!(res.status(), StatusCode::NOT_FOUND);

let body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(
serde_json::from_slice::<ApiError>(&body).unwrap().error,
"no BOLT12 support"
);
}

#[rstest]
#[case("03a7ee82c3c7fc4c796d26e513676d445d49b9c62004a47f2e813695a439a8fd01")]
Expand Down
30 changes: 21 additions & 9 deletions boltzr/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::api::errors::error_middleware;
use crate::api::lightning::{lightning_channels, lightning_node_info};
use crate::api::sse::sse_handler;
use crate::api::stats::get_stats;
#[cfg(feature = "metrics")]
use crate::metrics::server::MetricsLayer;
use crate::service::Service;
use axum::routing::get;
use crate::swap::manager::SwapManager;
use axum::routing::{get, post};
use axum::{Extension, Router};
use serde::{Deserialize, Serialize};
use std::error::Error;
Expand All @@ -28,36 +28,41 @@ pub struct Config {
pub port: u16,
}

pub struct Server<S> {
pub struct Server<S, M> {
config: Config,
cancellation_token: CancellationToken,

manager: Arc<M>,
service: Arc<Service>,

swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
}

struct ServerState<S> {
struct ServerState<S, M> {
manager: Arc<M>,
service: Arc<Service>,

swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
}

impl<S> Server<S>
impl<S, M> Server<S, M>
where
S: SwapInfos + Clone + Send + Sync + 'static,
M: SwapManager + Send + Sync + 'static,
{
pub fn new(
config: Config,
cancellation_token: CancellationToken,
manager: Arc<M>,
service: Arc<Service>,
swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
) -> Self {
Server {
config,
manager,
service,
swap_infos,
cancellation_token,
Expand Down Expand Up @@ -93,6 +98,7 @@ where
axum::serve(
listener,
router.layer(Extension(Arc::new(ServerState {
manager: self.manager.clone(),
service: self.service.clone(),
swap_infos: self.swap_infos.clone(),
swap_status_update_tx: self.swap_status_update_tx.clone(),
Expand All @@ -111,18 +117,22 @@ where

fn add_routes(router: Router) -> Router {
router
.route("/streamswapstatus", get(sse_handler::<S>))
.route("/streamswapstatus", get(sse_handler::<S, M>))
.route(
"/v2/swap/{swap_type}/stats/{from}/{to}",
get(get_stats::<S>),
get(get_stats::<S, M>),
)
.route(
"/v2/lightning/{currency}/bolt12/fetch",
post(lightning::bolt12_fetch::<S, M>),
)
.route(
"/v2/lightning/{currency}/node/{node}",
get(lightning_node_info::<S>),
get(lightning::node_info::<S, M>),
)
.route(
"/v2/lightning/{currency}/channels/{node}",
get(lightning_channels::<S>),
get(lightning::channels::<S, M>),
)
.layer(axum::middleware::from_fn(error_middleware))
}
Expand All @@ -135,6 +145,7 @@ pub mod test {
use crate::api::{Config, Server};
use crate::cache::Redis;
use crate::service::Service;
use crate::swap::manager::test::MockManager;
use async_trait::async_trait;
use reqwest::StatusCode;
use std::collections::HashMap;
Expand Down Expand Up @@ -183,6 +194,7 @@ pub mod test {
host: "127.0.0.1".to_string(),
},
cancel.clone(),
Arc::new(MockManager::new()),
Arc::new(Service::new::<Redis>(
Arc::new(HashMap::new()),
None,
Expand Down
6 changes: 4 additions & 2 deletions boltzr/src/api/sse.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::api::ws::status::SwapInfos;
use crate::api::ServerState;
use crate::swap::manager::SwapManager;
use async_stream::try_stream;
use axum::response::sse::{Event, Sse};
use axum::{extract::Query, Extension};
Expand All @@ -23,12 +24,13 @@ pub struct IdParams {
pub id: String,
}

pub async fn sse_handler<S>(
Extension(state): Extension<Arc<ServerState<S>>>,
pub async fn sse_handler<S, M>(
Extension(state): Extension<Arc<ServerState<S, M>>>,
Query(params): Query<IdParams>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>>
where
S: SwapInfos + Send + Sync + Clone + 'static,
M: SwapManager + Send + Sync + 'static,
{
trace!("New SSE status stream for swap: {}", params.id);

Expand Down
Loading

0 comments on commit 985599e

Please sign in to comment.