Skip to content

Commit

Permalink
refactor: migrate web server to Actix Web
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Sep 19, 2024
1 parent ff6d9e8 commit 7c50788
Show file tree
Hide file tree
Showing 19 changed files with 944 additions and 577 deletions.
556 changes: 504 additions & 52 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ edition = "2021"
[dependencies]
badge = { path = "./libs/badge" }

actix-web = "4"
actix-web-lab = "0.20"
anyhow = "1"
cadence = "1"
crates-index = { version = "3", default-features = false, features = ["git"] }
Expand All @@ -22,23 +24,22 @@ dotenvy = "0.15"
either = "1.12"
font-awesome-as-a-crate = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
hyper = { version = "0.14.10", features = ["full"] }
error_reporter = "1"
indexmap = { version = "2", features = ["serde"] }
lru_time_cache = "0.11"
maud = "0.26"
mime = "0.3"
once_cell = "1"
parking_lot = "0.12"
pulldown-cmark = "0.12"
relative-path = { version = "1", features = ["serde"] }
reqwest = { version = "0.12", features = ["json"] }
route-recognizer = "0.3"
rustsec = "0.29"
semver = { version = "1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_urlencoded = "0.7"
serde_with = "3"
tokio = { version = "1.24.2", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio = { version = "1.24.2", features = ["rt", "macros", "sync", "time"] }
toml = "0.8"
tracing = "0.1.30"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
13 changes: 8 additions & 5 deletions src/engine/fut/crawl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::Error;
use futures_util::{future::BoxFuture, stream::FuturesOrdered, FutureExt as _, StreamExt as _};
use futures_util::{
future::LocalBoxFuture, stream::FuturesOrdered, FutureExt as _, StreamExt as _,
};
use relative_path::RelativePathBuf;

use crate::{
Expand All @@ -16,8 +18,9 @@ pub async fn crawl_manifest(
entry_point: RelativePathBuf,
) -> anyhow::Result<ManifestCrawlerOutput> {
let mut crawler = ManifestCrawler::new();
let mut futures: FuturesOrdered<BoxFuture<'static, Result<(RelativePathBuf, String), Error>>> =
FuturesOrdered::new();
let mut futures: FuturesOrdered<
LocalBoxFuture<'static, Result<(RelativePathBuf, String), Error>>,
> = FuturesOrdered::new();

let engine2 = engine.clone();
let repo_path2 = repo_path.clone();
Expand All @@ -28,7 +31,7 @@ pub async fn crawl_manifest(
.await?;
Ok((entry_point, contents))
}
.boxed();
.boxed_local();

futures.push_back(fut);

Expand All @@ -47,7 +50,7 @@ pub async fn crawl_manifest(
let contents = engine.retrieve_manifest_at_path(&repo_path, &path).await?;
Ok((path, contents))
}
.boxed();
.boxed_local();

futures.push_back(fut);
}
Expand Down
13 changes: 8 additions & 5 deletions src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::{
time::{Duration, Instant},
};

use actix_web::dev::Service;
use anyhow::{anyhow, Error};
use cadence::{MetricSink, NopMetricSink, StatsdClient};
use futures_util::{
future::try_join_all,
stream::{self, BoxStream},
stream::{self, LocalBoxStream},
StreamExt as _,
};
use hyper::service::Service;
use once_cell::sync::Lazy;
use relative_path::{RelativePath, RelativePathBuf};
use rustsec::database::Database;
Expand All @@ -38,7 +38,7 @@ mod machines;

use self::fut::{analyze_dependencies, crawl_manifest};

#[derive(Clone, Debug)]
#[derive(Debug, Clone)]
pub struct Engine {
metrics: Arc<StatsdClient>,
query_crate: Cache<QueryCrate, CrateName>,
Expand Down Expand Up @@ -255,7 +255,10 @@ impl Engine {
Ok(latest)
}

fn fetch_releases<'a, I>(&'a self, names: I) -> BoxStream<'a, anyhow::Result<Vec<CrateRelease>>>
fn fetch_releases<'a, I>(
&'a self,
names: I,
) -> LocalBoxStream<'a, anyhow::Result<Vec<CrateRelease>>>
where
I: IntoIterator<Item = CrateName>,
<I as IntoIterator>::IntoIter: Send + 'a,
Expand All @@ -277,7 +280,7 @@ impl Engine {
) -> Result<String, Error> {
let manifest_path = path.join(RelativePath::new("Cargo.toml"));

let mut service = self.retrieve_file_at_path.clone();
let service = self.retrieve_file_at_path.clone();
service.call((repo_path.clone(), manifest_path)).await
}

Expand Down
27 changes: 10 additions & 17 deletions src/interactors/crates.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use std::{
fmt, str,
task::{Context, Poll},
};
use std::{fmt, str};

use actix_web::dev::Service;
use anyhow::{anyhow, Error};
use crates_index::{Crate, DependencyKind};
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use semver::{Version, VersionReq};
use serde::Deserialize;
use tokio::task::spawn_blocking;

use crate::{
models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease},
BoxFuture, ManagedIndex,
ManagedIndex,
};

const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1";
Expand Down Expand Up @@ -86,13 +83,11 @@ impl fmt::Debug for QueryCrate {
impl Service<CrateName> for QueryCrate {
type Response = QueryCrateResponse;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, crate_name: CrateName) -> Self::Future {
fn call(&self, crate_name: CrateName) -> Self::Future {
let index = self.index.clone();
Self::query(index, crate_name).boxed()
}
Expand Down Expand Up @@ -150,13 +145,11 @@ impl fmt::Debug for GetPopularCrates {
impl Service<()> for GetPopularCrates {
type Response = Vec<CratePath>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, _req: ()) -> Self::Future {
fn call(&self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::query(client).boxed()
}
Expand Down
22 changes: 7 additions & 15 deletions src/interactors/github.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::{
fmt,
task::{Context, Poll},
};
use std::fmt;

use actix_web::dev::Service;
use anyhow::Error;
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use serde::Deserialize;

use crate::{
models::repo::{RepoPath, Repository},
BoxFuture,
};
use crate::models::repo::{RepoPath, Repository};

const GITHUB_API_BASE_URI: &str = "https://api.github.com";

Expand Down Expand Up @@ -72,13 +66,11 @@ impl fmt::Debug for GetPopularRepos {
impl Service<()> for GetPopularRepos {
type Response = Vec<Repository>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, _req: ()) -> Self::Future {
fn call(&self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::query(client).boxed()
}
Expand Down
19 changes: 7 additions & 12 deletions src/interactors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{
fmt,
task::{Context, Poll},
};
use std::fmt;

use actix_web::dev::Service;
use anyhow::{anyhow, Error};
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use relative_path::RelativePathBuf;

use crate::{models::repo::RepoPath, BoxFuture};
use crate::models::repo::RepoPath;

pub mod crates;
pub mod github;
Expand Down Expand Up @@ -43,13 +40,11 @@ impl RetrieveFileAtPath {
impl Service<(RepoPath, RelativePathBuf)> for RetrieveFileAtPath {
type Response = String;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, (repo_path, path): (RepoPath, RelativePathBuf)) -> Self::Future {
fn call(&self, (repo_path, path): (RepoPath, RelativePathBuf)) -> Self::Future {
let client = self.client.clone();
Self::query(client, repo_path, path).boxed()
}
Expand Down
25 changes: 9 additions & 16 deletions src/interactors/rustsec.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
use std::{
fmt,
sync::Arc,
task::{Context, Poll},
};
use std::{fmt, sync::Arc};

use actix_web::dev::Service;
use anyhow::Error;
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use rustsec::database::Database;

use crate::BoxFuture;

#[derive(Clone)]
pub struct FetchAdvisoryDatabase {
client: reqwest::Client,
Expand All @@ -30,20 +24,19 @@ impl FetchAdvisoryDatabase {
impl Service<()> for FetchAdvisoryDatabase {
type Response = Arc<Database>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, _req: ()) -> Self::Future {
fn call(&self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::fetch(client).boxed()
Self::fetch(client).boxed_local()
}
}

impl fmt::Debug for FetchAdvisoryDatabase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("FetchAdvisoryDatabase")
f.debug_struct("FetchAdvisoryDatabase")
.finish_non_exhaustive()
}
}
58 changes: 23 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@

use std::{
env,
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
pin::Pin,
net::{Ipv4Addr, UdpSocket},
time::Duration,
};

use actix_web::{middleware::Logger, web};
use actix_web_lab::{extract::ThinData, middleware::NormalizePath};
use cadence::{QueuingMetricSink, UdpMetricSink};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Server,
};
use reqwest::redirect::Policy as RedirectPolicy;
use tracing::Instrument as _;

mod engine;
mod interactors;
Expand All @@ -25,10 +19,7 @@ mod parsers;
mod server;
mod utils;

use self::{engine::Engine, server::App, utils::index::ManagedIndex};

/// Future crate's BoxFuture without the explicit lifetime parameter.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
use self::{engine::Engine, utils::index::ManagedIndex};

const DEPS_RS_UA: &str = "deps.rs";

Expand Down Expand Up @@ -59,7 +50,7 @@ fn init_tracing_subscriber() {
.init();
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
dotenvy::dotenv().ok();
init_tracing_subscriber();
Expand All @@ -77,8 +68,6 @@ async fn main() {
.parse()
.expect("could not read port");

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);

let index = ManagedIndex::new();

{
Expand All @@ -92,25 +81,24 @@ async fn main() {
let mut engine = Engine::new(client.clone(), index);
engine.set_metrics(metrics);

let make_svc = make_service_fn(move |_socket: &AddrStream| {
let engine = engine.clone();

async move {
let server = App::new(engine.clone());
Ok::<_, hyper::Error>(service_fn(move |req| {
let server = server.clone();
async move {
let path = req.uri().path().to_owned();

server
.handle(req)
.instrument(tracing::info_span!("@", %path))
.await
}
}))
}
});
let server = Server::bind(&addr).serve(make_svc);
let server = actix_web::HttpServer::new(move || {
actix_web::App::new()
.app_data(ThinData(engine.clone()))
.service(server::index)
.service(server::crate_redirect)
.service(server::crate_latest_status_svg)
.service(server::crate_status_svg)
.service(server::crate_status_html)
.service(server::repo_status_svg)
.service(server::repo_status_html)
.configure(server::static_files)
.default_service(web::to(server::not_found))
.wrap(NormalizePath::trim())
.wrap(Logger::default())
})
.bind_auto_h2c((Ipv4Addr::UNSPECIFIED, port))
.unwrap()
.run();

tracing::info!("Server running on port {port}");

Expand Down
Loading

0 comments on commit 7c50788

Please sign in to comment.