Skip to content

Commit

Permalink
refactor: Abstract IPFS persistance (#169)
Browse files Browse the repository at this point in the history
* refactor: Run tests without kubo, abstract out IpfsDatabase

* chore: Update to fixed rs-ucan version

* refactor: Introduce a `trait ServerSetup`

* refactor: Abstract server traits using `ServerSetup`
  • Loading branch information
matheus23 authored Sep 28, 2023
1 parent 85fd8df commit e35366e
Show file tree
Hide file tree
Showing 21 changed files with 625 additions and 422 deletions.
537 changes: 287 additions & 250 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ ed25519-zebra = "3.1"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ucan = { git = "https://github.com/blaine/rs-ucan", branch = "temporary-unscoped-fix" }
ucan-key-support = { git = "https://github.com/blaine/rs-ucan", branch = "temporary-unscoped-fix" }
test-log = { version = "0.2", default-features = false, features = ["trace"] }
testresult = "0.3"
ucan = { git = "https://github.com/blaine/rs-ucan", branch = "temporary-unscoped-fix", commit = "24f00ed8055a15ccfe1ffd7f8c5c81ae70d290e5" }
ucan-key-support = { git = "https://github.com/blaine/rs-ucan", branch = "temporary-unscoped-fix", commit = "24f00ed8055a15ccfe1ffd7f8c5c81ae70d290e5" }
url = "2.3"

# Speedup build on macOS
Expand Down
3 changes: 3 additions & 0 deletions fission-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ validator = { version = "0.16.0", features = ["derive"] }

[dev-dependencies]
assert-json-diff = "2.0"
blake3 = "1.4.1"
rsa = { version = "0.9" }
stringreader = "0.1.1"
test-log = { workspace = true }
testresult = { workspace = true }
tokio-test = "0.4"
uuid = "1.4.1"
wiremock = "0.5"
Expand Down
99 changes: 59 additions & 40 deletions fission-server/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
//! The Axum Application State
use anyhow::Result;
use async_trait::async_trait;
use anyhow::{anyhow, Result};
use axum::extract::ws;
use dashmap::DashMap;
use dyn_clone::DynClone;
use futures::channel::mpsc::Sender;
use std::{fmt, net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc};

use crate::db::Pool;
use crate::{db::Pool, traits::ServerSetup};

/// A channel for transmitting messages to a websocket peer
pub type WsPeer = Sender<ws::Message>;
Expand All @@ -18,41 +16,55 @@ pub type WsPeerMap = Arc<DashMap<String, DashMap<SocketAddr, WsPeer>>>;

#[derive(Clone)]
/// Global application route state.
pub struct AppState {
pub struct AppState<S: ServerSetup> {
/// The database pool
pub db_pool: Pool,
/// The ipfs peers to be rendered in the ipfs/peers endpoint
pub ipfs_peers: Vec<String>,
/// Connection to what stores the IPFS blocks
pub ipfs_db: S::IpfsDatabase,
/// The service that sends account verification codes
pub verification_code_sender: Box<dyn VerificationCodeSender>,
pub verification_code_sender: S::VerificationCodeSender,
/// The currently connected websocket peers
pub ws_peer_map: WsPeerMap,
}

#[derive(Default)]
/// Builder for [`AppState`]
pub struct AppStateBuilder {
pub struct AppStateBuilder<S: ServerSetup> {
db_pool: Option<Pool>,
ipfs_peers: Vec<String>,
verification_code_sender: Option<Box<dyn VerificationCodeSender>>,
ipfs_db: Option<S::IpfsDatabase>,
verification_code_sender: Option<S::VerificationCodeSender>,
}

impl AppStateBuilder {
impl<S: ServerSetup> Default for AppStateBuilder<S> {
fn default() -> Self {
Self {
db_pool: None,
ipfs_peers: Default::default(),
ipfs_db: None,
verification_code_sender: None,
}
}
}

impl<S: ServerSetup> AppStateBuilder<S> {
/// Finalize the builder and return the [`AppState`]
pub fn finalize(self) -> Result<AppState> {
let db_pool = self
.db_pool
.ok_or_else(|| anyhow::anyhow!("db_pool is required"))?;
pub fn finalize(self) -> Result<AppState<S>> {
let db_pool = self.db_pool.ok_or_else(|| anyhow!("db_pool is required"))?;

let ipfs_peers = self.ipfs_peers;

let ipfs_db = self.ipfs_db.ok_or_else(|| anyhow!("ipfs_db is required"))?;

let verification_code_sender = self
.verification_code_sender
.ok_or_else(|| anyhow::anyhow!("verification_code_sender is required"))?;
.ok_or_else(|| anyhow!("verification_code_sender is required"))?;

Ok(AppState {
db_pool,
ipfs_peers,
ipfs_db,
verification_code_sender,
ws_peer_map: Default::default(),
})
Expand All @@ -70,44 +82,51 @@ impl AppStateBuilder {
self
}

/// Set the service that sends account verification codes
pub fn with_verification_code_sender<T>(mut self, verification_code_sender: T) -> Self
where
T: VerificationCodeSender + 'static,
{
self.verification_code_sender = Some(Box::new(verification_code_sender));
/// Set the ipfs database
pub fn with_ipfs_db(mut self, ipfs_db: S::IpfsDatabase) -> Self {
self.ipfs_db = Some(ipfs_db);
self
}
}

/// The service that sends account verification codes
#[async_trait]
pub trait VerificationCodeSender: DynClone + Send + Sync {
/// Send the code associated with the email
async fn send_code(&self, email: &str, code: &str) -> Result<()>;
}

dyn_clone::clone_trait_object!(VerificationCodeSender);

#[async_trait]
impl VerificationCodeSender for Box<dyn VerificationCodeSender> {
async fn send_code(&self, email: &str, code: &str) -> Result<()> {
self.as_ref().send_code(email, code).await
/// Set the service that sends account verification codes
pub fn with_verification_code_sender(
mut self,
verification_code_sender: S::VerificationCodeSender,
) -> Self {
self.verification_code_sender = Some(verification_code_sender);
self
}
}

impl fmt::Debug for AppState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl<S> std::fmt::Debug for AppState<S>
where
S: ServerSetup,
S::IpfsDatabase: std::fmt::Debug,
S::VerificationCodeSender: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppState")
.field("db_pool", &self.db_pool)
.field("ipfs_peers", &self.ipfs_peers)
.field("ipfs_db", &self.ipfs_db)
.field("ws_peer_map", &self.ws_peer_map)
.field("verification_code_sender", &self.verification_code_sender)
.finish()
}
}

impl fmt::Debug for AppStateBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl<S> std::fmt::Debug for AppStateBuilder<S>
where
S: ServerSetup,
S::IpfsDatabase: std::fmt::Debug,
S::VerificationCodeSender: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppStateBuilder")
.field("db_pool", &self.db_pool)
.field("ipfs_peers", &self.ipfs_peers)
.field("ipfs_db", &self.ipfs_db)
.field("verification_code_sender", &self.verification_code_sender)
.finish()
}
}
1 change: 1 addition & 0 deletions fission-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod routes;
pub mod settings;
pub mod tracer;
pub mod tracing_layers;
pub mod traits;

#[cfg(test)]
mod test_utils;
12 changes: 11 additions & 1 deletion fission-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use fission_server::{
metrics_layer::{MetricsLayer, METRIC_META_PREFIX},
storage_layer::StorageLayer,
},
traits::{IpfsHttpApiDatabase, ServerSetup},
};
use http::header;
use metrics_exporter_prometheus::PrometheusHandle;
Expand Down Expand Up @@ -60,6 +61,14 @@ use utoipa_swagger_ui::SwaggerUi;
/// Request identifier field.
const REQUEST_ID: &str = "request_id";

#[derive(Clone, Debug, Default)]
pub struct ProdSetup;

impl ServerSetup for ProdSetup {
type IpfsDatabase = IpfsHttpApiDatabase;
type VerificationCodeSender = EmailVerificationCodeSender;
}

#[tokio::main]
async fn main() -> Result<()> {
let (stdout_writer, _stdout_guard) = tracing_appender::non_blocking(io::stdout());
Expand Down Expand Up @@ -151,10 +160,11 @@ async fn serve_metrics(
async fn serve_app(settings: Settings, db_pool: Pool, token: CancellationToken) -> Result<()> {
let req_id = HeaderName::from_static(REQUEST_ID);

let app_state = AppStateBuilder::default()
let app_state = AppStateBuilder::<ProdSetup>::default()
.with_db_pool(db_pool)
.with_ipfs_peers(settings.ipfs().peers.clone())
.with_verification_code_sender(EmailVerificationCodeSender::new(settings.mailgun().clone()))
.with_ipfs_db(IpfsHttpApiDatabase::default())
.finalize()?;

let router = router::setup_app_router(app_state)
Expand Down
23 changes: 9 additions & 14 deletions fission-server/src/models/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::str::FromStr;

use anyhow::bail;
use anyhow::{bail, Result};
use did_key::{generate, Ed25519KeyPair};

use chrono::NaiveDateTime;
Expand All @@ -13,12 +13,12 @@ use ucan::{builder::UcanBuilder, capability::CapabilitySemantics};
use utoipa::ToSchema;

use diesel_async::RunQueryDsl;
use ipfs_api::IpfsApi;

use crate::{
crypto::patchedkey::PatchedKeyPair,
db::{schema::accounts, Conn},
models::volume::{NewVolumeRecord, Volume},
traits::IpfsDatabase,
};

/// New Account Struct (for creating new accounts)
Expand Down Expand Up @@ -125,10 +125,7 @@ impl Account {
//
// Note: this doesn't use a join, but rather a separate query to the volumes table.
// Possibly not ideal, but it's simple and works.
pub async fn get_volume(
&self,
conn: &mut Conn<'_>,
) -> Result<Option<NewVolumeRecord>, diesel::result::Error> {
pub async fn get_volume(&self, conn: &mut Conn<'_>) -> Result<Option<NewVolumeRecord>> {
if let Some(volume_id) = self.volume_id {
let volume = Volume::find_by_id(conn, volume_id).await?;
Ok(Some(volume.into()))
Expand All @@ -142,12 +139,9 @@ impl Account {
&self,
conn: &mut Conn<'_>,
cid: &str,
) -> Result<NewVolumeRecord, anyhow::Error> {
let ipfs = ipfs_api::IpfsClient::default();

if let Err(e) = ipfs.pin_add(cid, true).await {
return Err(anyhow::anyhow!("Failed to pin CID: {}", e));
}
ipfs_db: &impl IpfsDatabase,
) -> Result<NewVolumeRecord> {
ipfs_db.pin_add(cid, true).await?;

let volume = Volume::new(conn, cid).await?;

Expand All @@ -172,11 +166,12 @@ impl Account {
&self,
conn: &mut Conn<'_>,
cid: &str,
) -> Result<NewVolumeRecord, anyhow::Error> {
ipfs_db: &impl IpfsDatabase,
) -> Result<NewVolumeRecord> {
if let Some(volume_id) = self.volume_id {
let volume = Volume::find_by_id(conn, volume_id)
.await?
.update_cid(conn, cid)
.update_cid(conn, cid, ipfs_db)
.await?;
Ok(volume.into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion fission-server/src/models/email_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use utoipa::ToSchema;
use validator::{Validate, ValidationError};

use crate::{
app_state::VerificationCodeSender,
db::{schema::email_verifications, Conn},
settings,
traits::VerificationCodeSender,
};

#[derive(Debug, Clone)]
Expand Down
38 changes: 16 additions & 22 deletions fission-server/src/models/volume.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//! Volume model
use anyhow::Result;
use serde::{Deserialize, Serialize};

use chrono::NaiveDateTime;
use diesel::prelude::*;
use tracing::log;
use utoipa::ToSchema;

use diesel_async::RunQueryDsl;

use ipfs_api::IpfsApi;

use crate::db::{schema::volumes, Conn};
use crate::{
db::{schema::volumes, Conn},
traits::IpfsDatabase,
};

#[derive(Debug, Queryable, Insertable, Clone, Identifiable, Selectable, ToSchema)]
#[diesel(table_name = volumes)]
Expand Down Expand Up @@ -54,45 +55,38 @@ impl From<Volume> for NewVolumeRecord {

impl Volume {
/// Create a new Volume. Inserts the volume into the database.
pub async fn new(conn: &mut Conn<'_>, cid: &str) -> Result<Self, diesel::result::Error> {
pub async fn new(conn: &mut Conn<'_>, cid: &str) -> Result<Self> {
let new_volume = NewVolumeRecord {
cid: cid.to_string(),
};

diesel::insert_into(volumes::table)
Ok(diesel::insert_into(volumes::table)
.values(new_volume)
.get_result(conn)
.await
.await?)
}

/// Find a volume by its primary key
pub async fn find_by_id(conn: &mut Conn<'_>, id: i32) -> Result<Self, diesel::result::Error> {
volumes::table
pub async fn find_by_id(conn: &mut Conn<'_>, id: i32) -> Result<Self> {
Ok(volumes::table
.filter(volumes::id.eq(id))
.get_result(conn)
.await
.await?)
}

/// Update a volume by its CID
pub async fn update_cid(
&self,
conn: &mut Conn<'_>,
cid: &str,
) -> Result<Self, diesel::result::Error> {
let ipfs = ipfs_api::IpfsClient::default();

let result = ipfs.pin_add(cid, true).await;

if result.is_err() {
log::debug!("Error communicating with IPFS: {:?}", result);
// FIXME: Use better error
return Err(diesel::result::Error::NotFound);
}
ipfs_db: &impl IpfsDatabase,
) -> Result<Self> {
ipfs_db.pin_add(cid, true).await?;

diesel::update(volumes::table)
Ok(diesel::update(volumes::table)
.filter(volumes::id.eq(self.id))
.set(volumes::cid.eq(cid))
.get_result(conn)
.await
.await?)
}
}
Loading

0 comments on commit e35366e

Please sign in to comment.