Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate web server to Actix Web #229

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
556 changes: 504 additions & 52 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ edition = "2021"
[dependencies]
badge = { path = "./libs/badge" }

actix-web = "4"
actix-web-lab = "0.20"
anyhow = "1"
cadence = "1"
crates-index = { version = "3", default-features = false, features = ["git"] }
Expand All @@ -22,23 +24,22 @@ dotenvy = "0.15"
either = "1.12"
font-awesome-as-a-crate = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
hyper = { version = "0.14.10", features = ["full"] }
error_reporter = "1"
indexmap = { version = "2", features = ["serde"] }
lru_time_cache = "0.11"
maud = "0.26"
mime = "0.3"
once_cell = "1"
parking_lot = "0.12"
pulldown-cmark = "0.12"
relative-path = { version = "1", features = ["serde"] }
reqwest = { version = "0.12", features = ["json"] }
route-recognizer = "0.3"
rustsec = "0.29"
semver = { version = "1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_urlencoded = "0.7"
serde_with = "3"
tokio = { version = "1.24.2", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio = { version = "1.24.2", features = ["rt", "macros", "sync", "time"] }
toml = "0.8"
tracing = "0.1.30"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
13 changes: 8 additions & 5 deletions src/engine/fut/crawl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::Error;
use futures_util::{future::BoxFuture, stream::FuturesOrdered, FutureExt as _, StreamExt as _};
use futures_util::{
future::LocalBoxFuture, stream::FuturesOrdered, FutureExt as _, StreamExt as _,
};
use relative_path::RelativePathBuf;

use crate::{
Expand All @@ -16,8 +18,9 @@ pub async fn crawl_manifest(
entry_point: RelativePathBuf,
) -> anyhow::Result<ManifestCrawlerOutput> {
let mut crawler = ManifestCrawler::new();
let mut futures: FuturesOrdered<BoxFuture<'static, Result<(RelativePathBuf, String), Error>>> =
FuturesOrdered::new();
let mut futures: FuturesOrdered<
LocalBoxFuture<'static, Result<(RelativePathBuf, String), Error>>,
> = FuturesOrdered::new();

let engine2 = engine.clone();
let repo_path2 = repo_path.clone();
Expand All @@ -28,7 +31,7 @@ pub async fn crawl_manifest(
.await?;
Ok((entry_point, contents))
}
.boxed();
.boxed_local();

futures.push_back(fut);

Expand All @@ -47,7 +50,7 @@ pub async fn crawl_manifest(
let contents = engine.retrieve_manifest_at_path(&repo_path, &path).await?;
Ok((path, contents))
}
.boxed();
.boxed_local();

futures.push_back(fut);
}
Expand Down
13 changes: 8 additions & 5 deletions src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::{
time::{Duration, Instant},
};

use actix_web::dev::Service;
use anyhow::{anyhow, Error};
use cadence::{MetricSink, NopMetricSink, StatsdClient};
use futures_util::{
future::try_join_all,
stream::{self, BoxStream},
stream::{self, LocalBoxStream},
StreamExt as _,
};
use hyper::service::Service;
use once_cell::sync::Lazy;
use relative_path::{RelativePath, RelativePathBuf};
use rustsec::database::Database;
Expand All @@ -38,7 +38,7 @@ mod machines;

use self::fut::{analyze_dependencies, crawl_manifest};

#[derive(Clone, Debug)]
#[derive(Debug, Clone)]
pub struct Engine {
metrics: Arc<StatsdClient>,
query_crate: Cache<QueryCrate, CrateName>,
Expand Down Expand Up @@ -255,7 +255,10 @@ impl Engine {
Ok(latest)
}

fn fetch_releases<'a, I>(&'a self, names: I) -> BoxStream<'a, anyhow::Result<Vec<CrateRelease>>>
fn fetch_releases<'a, I>(
&'a self,
names: I,
) -> LocalBoxStream<'a, anyhow::Result<Vec<CrateRelease>>>
where
I: IntoIterator<Item = CrateName>,
<I as IntoIterator>::IntoIter: Send + 'a,
Expand All @@ -277,7 +280,7 @@ impl Engine {
) -> Result<String, Error> {
let manifest_path = path.join(RelativePath::new("Cargo.toml"));

let mut service = self.retrieve_file_at_path.clone();
let service = self.retrieve_file_at_path.clone();
service.call((repo_path.clone(), manifest_path)).await
}

Expand Down
27 changes: 10 additions & 17 deletions src/interactors/crates.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use std::{
fmt, str,
task::{Context, Poll},
};
use std::{fmt, str};

use actix_web::dev::Service;
use anyhow::{anyhow, Error};
use crates_index::{Crate, DependencyKind};
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use semver::{Version, VersionReq};
use serde::Deserialize;
use tokio::task::spawn_blocking;

use crate::{
models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease},
BoxFuture, ManagedIndex,
ManagedIndex,
};

const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1";
Expand Down Expand Up @@ -86,13 +83,11 @@ impl fmt::Debug for QueryCrate {
impl Service<CrateName> for QueryCrate {
type Response = QueryCrateResponse;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, crate_name: CrateName) -> Self::Future {
fn call(&self, crate_name: CrateName) -> Self::Future {
let index = self.index.clone();
Self::query(index, crate_name).boxed()
}
Expand Down Expand Up @@ -150,13 +145,11 @@ impl fmt::Debug for GetPopularCrates {
impl Service<()> for GetPopularCrates {
type Response = Vec<CratePath>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, _req: ()) -> Self::Future {
fn call(&self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::query(client).boxed()
}
Expand Down
22 changes: 7 additions & 15 deletions src/interactors/github.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::{
fmt,
task::{Context, Poll},
};
use std::fmt;

use actix_web::dev::Service;
use anyhow::Error;
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use serde::Deserialize;

use crate::{
models::repo::{RepoPath, Repository},
BoxFuture,
};
use crate::models::repo::{RepoPath, Repository};

const GITHUB_API_BASE_URI: &str = "https://api.github.com";

Expand Down Expand Up @@ -72,13 +66,11 @@ impl fmt::Debug for GetPopularRepos {
impl Service<()> for GetPopularRepos {
type Response = Vec<Repository>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, _req: ()) -> Self::Future {
fn call(&self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::query(client).boxed()
}
Expand Down
19 changes: 7 additions & 12 deletions src/interactors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{
fmt,
task::{Context, Poll},
};
use std::fmt;

use actix_web::dev::Service;
use anyhow::{anyhow, Error};
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use relative_path::RelativePathBuf;

use crate::{models::repo::RepoPath, BoxFuture};
use crate::models::repo::RepoPath;

pub mod crates;
pub mod github;
Expand Down Expand Up @@ -43,13 +40,11 @@ impl RetrieveFileAtPath {
impl Service<(RepoPath, RelativePathBuf)> for RetrieveFileAtPath {
type Response = String;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, (repo_path, path): (RepoPath, RelativePathBuf)) -> Self::Future {
fn call(&self, (repo_path, path): (RepoPath, RelativePathBuf)) -> Self::Future {
let client = self.client.clone();
Self::query(client, repo_path, path).boxed()
}
Expand Down
25 changes: 9 additions & 16 deletions src/interactors/rustsec.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
use std::{
fmt,
sync::Arc,
task::{Context, Poll},
};
use std::{fmt, sync::Arc};

use actix_web::dev::Service;
use anyhow::Error;
use futures_util::FutureExt as _;
use hyper::service::Service;
use futures_util::{future::LocalBoxFuture, FutureExt as _};
use rustsec::database::Database;

use crate::BoxFuture;

#[derive(Clone)]
pub struct FetchAdvisoryDatabase {
client: reqwest::Client,
Expand All @@ -30,20 +24,19 @@ impl FetchAdvisoryDatabase {
impl Service<()> for FetchAdvisoryDatabase {
type Response = Arc<Database>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_web::dev::always_ready!();

fn call(&mut self, _req: ()) -> Self::Future {
fn call(&self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::fetch(client).boxed()
Self::fetch(client).boxed_local()
}
}

impl fmt::Debug for FetchAdvisoryDatabase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("FetchAdvisoryDatabase")
f.debug_struct("FetchAdvisoryDatabase")
.finish_non_exhaustive()
}
}
58 changes: 23 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@

use std::{
env,
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
pin::Pin,
net::{Ipv4Addr, UdpSocket},
time::Duration,
};

use actix_web::{middleware::Logger, web};
use actix_web_lab::{extract::ThinData, middleware::NormalizePath};
use cadence::{QueuingMetricSink, UdpMetricSink};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Server,
};
use reqwest::redirect::Policy as RedirectPolicy;
use tracing::Instrument as _;

mod engine;
mod interactors;
Expand All @@ -25,10 +19,7 @@ mod parsers;
mod server;
mod utils;

use self::{engine::Engine, server::App, utils::index::ManagedIndex};

/// Future crate's BoxFuture without the explicit lifetime parameter.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
use self::{engine::Engine, utils::index::ManagedIndex};

const DEPS_RS_UA: &str = "deps.rs";

Expand Down Expand Up @@ -59,7 +50,7 @@ fn init_tracing_subscriber() {
.init();
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
dotenvy::dotenv().ok();
init_tracing_subscriber();
Expand All @@ -77,8 +68,6 @@ async fn main() {
.parse()
.expect("could not read port");

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);

let index = ManagedIndex::new();

{
Expand All @@ -92,25 +81,24 @@ async fn main() {
let mut engine = Engine::new(client.clone(), index);
engine.set_metrics(metrics);

let make_svc = make_service_fn(move |_socket: &AddrStream| {
let engine = engine.clone();

async move {
let server = App::new(engine.clone());
Ok::<_, hyper::Error>(service_fn(move |req| {
let server = server.clone();
async move {
let path = req.uri().path().to_owned();

server
.handle(req)
.instrument(tracing::info_span!("@", %path))
.await
}
}))
}
});
let server = Server::bind(&addr).serve(make_svc);
let server = actix_web::HttpServer::new(move || {
actix_web::App::new()
.app_data(ThinData(engine.clone()))
.service(server::index)
.service(server::crate_redirect)
.service(server::crate_latest_status_svg)
.service(server::crate_status_svg)
.service(server::crate_status_html)
.service(server::repo_status_svg)
.service(server::repo_status_html)
.configure(server::static_files)
.default_service(web::to(server::not_found))
.wrap(NormalizePath::trim())
.wrap(Logger::default())
})
.bind_auto_h2c((Ipv4Addr::UNSPECIFIED, port))
.unwrap()
.run();

tracing::info!("Server running on port {port}");

Expand Down
Loading
Loading