From dac2dd675e89712336038e30d7dbe38b3a447aed Mon Sep 17 00:00:00 2001 From: Ihor M Date: Fri, 7 Jan 2022 13:16:25 +0100 Subject: [PATCH 1/2] updated lib - dropped gatekeeper --- legacy/lib.rs | 27 ---- legacy/parse_request.rs | 138 ------------------- legacy/problem.rs | 268 ------------------------------------ legacy/retry.rs | 123 ----------------- legacy/retry_tests.rs | 116 ---------------- legacy/service_requester.rs | 148 -------------------- legacy/ws_try.rs | 168 ---------------------- src/gatekeeper.rs | 91 ------------ src/lib.rs | 1 - src/service_requester.rs | 28 ++-- 10 files changed, 10 insertions(+), 1098 deletions(-) delete mode 100644 legacy/lib.rs delete mode 100644 legacy/parse_request.rs delete mode 100644 legacy/problem.rs delete mode 100644 legacy/retry.rs delete mode 100644 legacy/retry_tests.rs delete mode 100644 legacy/service_requester.rs delete mode 100644 legacy/ws_try.rs delete mode 100644 src/gatekeeper.rs diff --git a/legacy/lib.rs b/legacy/lib.rs deleted file mode 100644 index b332593..0000000 --- a/legacy/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -pub mod auth_middleware; -pub mod business_result; -pub mod elasticsearch; -#[cfg(test)] -pub mod elasticsearch_test; -pub mod gatekeeper; -#[cfg(feature = "with-slog")] -pub mod logging_slog; -pub mod metrics; -pub mod parse_request; -mod problem; -pub mod retry; -#[cfg(test)] -mod retry_tests; -pub mod serde_field_value; -pub mod service_requester; -#[cfg(test)] -mod service_requester_test; -pub mod status; -pub mod subject; -pub mod types; -pub mod ws_try; - -pub use crate::business_result::{AsyncBusinessResult, BusinessResult, BusinessResultExt}; -pub use crate::parse_request::*; -pub use crate::problem::*; -pub use crate::service_requester::*; diff --git a/legacy/parse_request.rs b/legacy/parse_request.rs deleted file mode 100644 index 192d717..0000000 --- a/legacy/parse_request.rs +++ /dev/null @@ -1,138 +0,0 @@ -use super::AsyncBusinessResult; -use crate::problem::Problem; -use actix_web::dev::{JsonBody, MessageBody}; -use actix_web::{FromRequest, HttpMessage, HttpRequest}; -use bytes::Bytes; -use futures::{Future, IntoFuture}; -use serde::de::DeserializeOwned; -use std::fmt; -use std::ops::Deref; - -pub struct ValidatedJsonConfig { - pub limit: usize, -} - -impl Default for ValidatedJsonConfig { - fn default() -> Self { - ValidatedJsonConfig { limit: 1024 * 1024 } - } -} - -pub struct ValidatedJson(pub T); - -impl ValidatedJson { - /// Deconstruct to an inner value - pub fn into_inner(self) -> T { - self.0 - } -} - -impl Deref for ValidatedJson { - type Target = T; - - fn deref(&self) -> &T { - &self.0 - } -} - -impl fmt::Debug for ValidatedJson -where - T: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Json: {:?}", self.0) - } -} - -impl fmt::Display for ValidatedJson -where - T: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Display::fmt(&self.0, f) - } -} - -impl FromRequest for ValidatedJson -where - T: DeserializeOwned + 'static, - S: 'static, -{ - type Config = ValidatedJsonConfig; - type Result = Result, Problem>; - - #[inline] - fn from_request(req: &HttpRequest, cfg: &Self::Config) -> Self::Result { - Ok(Box::new( - JsonBody::new::(req, None) - .limit(cfg.limit) - .map_err(|error| Problem::bad_request().with_details(format!("Invalid json: {}", error))) - .map(ValidatedJson), - )) - } -} - -pub fn validate_json(http_message: &R, f: F) -> AsyncBusinessResult -where - R: HttpMessage + 'static, - B: DeserializeOwned + 'static, - F: FnOnce(B) -> U + 'static, - U: IntoFuture + 'static, -{ - Box::new( - http_message - .json::() - .map_err(|error| Problem::bad_request().with_details(format!("Invalid json: {}", error))) - .and_then(f), - ) -} - -#[macro_export] -macro_rules! request_parameter { - ($req:expr, $name:expr) => { - match $req.match_info().query($name) { - Ok(value) => value, - Err(error) => { - return business_result::failure(Problem::bad_request().with_details(format!("Missing {}: {}", $name, error))); - } - } - }; -} - -pub struct LimitedRawConfig { - pub limit: usize, -} - -impl Default for LimitedRawConfig { - fn default() -> Self { - LimitedRawConfig { limit: 1024 * 1024 } - } -} - -pub struct LimitedRaw(pub Bytes); - -impl Deref for LimitedRaw { - type Target = Bytes; - - fn deref(&self) -> &Bytes { - &self.0 - } -} - -impl FromRequest for LimitedRaw -where - S: 'static, -{ - type Config = LimitedRawConfig; - type Result = Result, Problem>; - - #[inline] - fn from_request(req: &HttpRequest, cfg: &Self::Config) -> Self::Result { - Ok(Box::new( - MessageBody::new(req) - .limit(cfg.limit) - .map_err(|error| Problem::bad_request().with_details(format!("Invalid body: {}", error))) - .map(LimitedRaw), - )) - } -} diff --git a/legacy/problem.rs b/legacy/problem.rs deleted file mode 100644 index 984b51e..0000000 --- a/legacy/problem.rs +++ /dev/null @@ -1,268 +0,0 @@ -use actix; -use actix_web; -use log::error; -use serde_derive::{Deserialize, Serialize}; -use serde_json; -use std; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct Problem { - pub code: u16, - #[serde(rename = "type")] - pub problem_type: String, - pub reason: String, - pub details: Option, -} - -impl Problem { - pub fn for_status>(code: u16, reason: S) -> Problem { - Problem { - code, - problem_type: format!("https://httpstatus.es/{}", code), - reason: reason.into(), - details: None, - } - } - - pub fn bad_request() -> Problem { - Self::for_status(400, "Bad request") - } - - pub fn unauthorized() -> Problem { - Self::for_status(401, "Unauthorized") - } - - pub fn forbidden() -> Problem { - Self::for_status(403, "Forbidden") - } - - pub fn conflict() -> Problem { - Self::for_status(409, "Conflict") - } - - pub fn internal_server_error() -> Problem { - Self::for_status(500, "Internal server error") - } - - pub fn not_found() -> Problem { - Self::for_status(404, "Not found") - } - - pub fn method_not_allowed() -> Problem { - Self::for_status(405, "Method not allowed") - } - - pub fn failed_dependency() -> Problem { - Self::for_status(424, "Failed dependency") - } - - pub fn with_details(mut self, details: T) -> Problem { - self.details = match self.details { - Some(existing) => Some(format!("{}: {}", existing, details)), - None => Some(format!("{}", details)), - }; - self - } -} - -impl std::fmt::Display for Problem { - fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - match self.details { - Some(ref details) => write!( - f, - "Problem(code={}, reason={}, details={})", - self.code, self.reason, details - )?, - None => write!(f, "Problem(code={}, reason={})", self.code, self.reason)?, - }; - Ok(()) - } -} - -impl std::error::Error for Problem {} - -impl From for Problem { - fn from(error: std::env::VarError) -> Problem { - use std::env::VarError::*; - - match error { - NotPresent => Problem::internal_server_error().with_details("Environment variable missing"), - NotUnicode(_) => Problem::internal_server_error().with_details("Environment variable not unicode"), - } - } -} - -impl From for Problem { - fn from(error: std::io::Error) -> Problem { - error!("IO: {}", error); - - Problem::internal_server_error().with_details(format!("IO: {}", error)) - } -} - -impl From> for Problem { - fn from(error: std::sync::PoisonError) -> Problem { - error!("Sync poison: {}", error); - - Problem::internal_server_error().with_details(format!("Sync poison: {}", error)) - } -} - -impl From for Problem { - fn from(error: std::time::SystemTimeError) -> Problem { - error!("SystemTime error: {}", error); - - Problem::internal_server_error().with_details(format!("SystemTime error: {}", error)) - } -} - -impl From for Problem { - fn from(error: std::str::Utf8Error) -> Problem { - error!("UTF-8 error: {}", error); - - Problem::bad_request().with_details(format!("UTF-8 error: {}", error)) - } -} - -impl From for Problem { - fn from(error: actix_web::Error) -> Problem { - error!("Actix: {}", error); - - Problem::internal_server_error().with_details(format!("Actix: {}", error)) - } -} - -impl actix_web::error::ResponseError for Problem { - fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::build( - actix_web::http::StatusCode::from_u16(self.code).unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), - ) - .json(self) - } -} - -impl actix_web::Responder for Problem { - type Item = actix_web::HttpResponse; - type Error = Problem; - - fn respond_to(self, _req: &actix_web::HttpRequest) -> Result { - Ok( - actix_web::HttpResponse::build( - actix_web::http::StatusCode::from_u16(self.code).unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), - ) - .json(self), - ) - } -} - -impl From for Problem { - fn from(error: actix_web::client::SendRequestError) -> Problem { - use actix_web::client::SendRequestError::*; - - error!("Http client: {}", error); - - match error { - Timeout => Problem::internal_server_error().with_details("Request timeout"), - Connector(err) => Problem::internal_server_error().with_details(format!("HTTP connection error: {}", err)), - ParseError(err) => Problem::internal_server_error().with_details(format!("Invalid HTTP response: {}", err)), - Io(err) => Problem::from(err), - } - } -} - -impl From for Problem { - fn from(error: actix_web::error::PayloadError) -> Self { - error!("Http payload: {}", error); - Problem::internal_server_error().with_details(format!("Http payload: {}", error)) - } -} - -impl From for Problem { - fn from(error: actix_web::error::ReadlinesError) -> Self { - use actix_web::error::ReadlinesError::*; - - match error { - EncodingError => Problem::internal_server_error().with_details("Readline: Invalid encoding"), - PayloadError(error) => Problem::from(error), - LimitOverflow => Problem::internal_server_error().with_details("Readline: Limit exeeded"), - ContentTypeError(error) => Problem::from(error), - } - } -} - -impl From for Problem { - fn from(error: actix_web::error::ContentTypeError) -> Self { - error!("Http content type: {}", error); - - Problem::internal_server_error().with_details(format!("Http content type: {}", error)) - } -} - -impl From for Problem { - fn from(error: actix_web::error::JsonPayloadError) -> Self { - error!("Http json type: {}", error); - - Problem::internal_server_error().with_details(format!("Http json payload: {}", error)) - } -} - -impl From for Problem { - fn from(error: actix::MailboxError) -> Self { - error!("Actix mailbox error: {}", error); - - Problem::internal_server_error().with_details(format!("Actix mailbox error: {}", error)) - } -} - -#[cfg(feature = "with-toml")] -impl From<::toml::de::Error> for Problem { - fn from(error: ::toml::de::Error) -> Self { - error!("Toml: {}", error); - - Problem::internal_server_error().with_details(format!("Toml: {}", error)) - } -} - -impl From for Problem { - fn from(error: serde_json::Error) -> Self { - error!("Json: {}", error); - - Problem::internal_server_error().with_details(format!("Json: {}", error)) - } -} - -#[cfg(feature = "with-diesel")] -impl From<::r2d2::Error> for Problem { - fn from(error: ::r2d2::Error) -> Self { - error!("R2D2: {}", error); - - Problem::internal_server_error().with_details(format!("R2D2: {}", error)) - } -} - -#[cfg(feature = "with-diesel")] -impl From<::diesel::result::Error> for Problem { - fn from(error: ::diesel::result::Error) -> Self { - error!("Diesel result: {}", error); - - Problem::internal_server_error().with_details(format!("Diesel result: {}", error)) - } -} - -#[cfg(feature = "with-reqwest")] -impl From<::reqwest::Error> for Problem { - fn from(error: ::reqwest::Error) -> Self { - error!("Reqwest error: {}", error); - - Problem::internal_server_error().with_details(format!("Request result: {}", error)) - } -} - -#[cfg(feature = "with-config")] -impl From<::config::ConfigError> for Problem { - fn from(error: ::config::ConfigError) -> Self { - error!("Config error: {}", error); - - Problem::internal_server_error().with_details(format!("Config: {}", error)) - } -} diff --git a/legacy/retry.rs b/legacy/retry.rs deleted file mode 100644 index 3ebd905..0000000 --- a/legacy/retry.rs +++ /dev/null @@ -1,123 +0,0 @@ -use super::{AsyncBusinessResult, BusinessResult}; -use crate::problem::Problem; -use actix::{fut, Actor, ActorFuture, AsyncContext, Context, Handler, Message, WrapFuture}; -use futures::sync::oneshot; -use futures::Future; -use log::error; -use std::time::Duration; - -pub struct RetryActor { - delay: Duration, - context: C, - factory: F, - sender: Option>>, -} - -pub struct Try(u16); - -impl Message for Try { - type Result = (); -} - -impl Handler for RetryActor -where - F: Fn(&C) -> FU + 'static, - FU: Future + 'static, - U: 'static + Send, - C: 'static, -{ - type Result = (); - - fn handle(&mut self, msg: Try, ctx: &mut Self::Context) -> Self::Result { - ctx.spawn( - (self.factory)(&self.context) - .into_actor(self) - .then(move |result, actor, inner_ctx| { - match result { - Ok(value) => { - if let Some(sender) = actor.sender.take() { - match sender.send(Ok(value)) { - Ok(_) => (), - Err(_) => { - error!("Sender failed"); - } - } - } - } - Err(_) if msg.0 > 1 => { - inner_ctx.notify_later(Try(msg.0 - 1), actor.delay); - } - Err(problem) => { - if let Some(sender) = actor.sender.take() { - match sender.send(Err(problem)) { - Ok(_) => (), - Err(_) => { - error!("Sender failed"); - } - } - } - } - }; - fut::ok(()) - }), - ); - } -} - -impl Actor for RetryActor -where - F: Fn(&C) -> FU + 'static, - FU: Future + 'static, - U: 'static + Send, - C: 'static, -{ - type Context = Context; -} - -pub struct Retrier { - tries: u16, - delay: Duration, -} - -impl Default for Retrier { - fn default() -> Self { - Retrier { - tries: 5, - delay: Duration::from_millis(500), - } - } -} - -impl Retrier { - pub fn new(tries: u16, delay: Duration) -> Retrier { - Retrier { tries, delay } - } - - pub fn retry(&self, context: C, factory: F) -> AsyncBusinessResult - where - F: Fn(&C) -> FU + 'static, - FU: Future + 'static, - U: 'static + Send, - C: 'static, - { - let (sender, receiver) = oneshot::channel::>(); - let retrier = RetryActor { - delay: self.delay, - context, - factory, - sender: Some(sender), - } - .start(); - - Box::new( - retrier - .send(Try(self.tries)) - .map_err(|err| Problem::internal_server_error().with_details(format!("{}", err))) - .and_then(|_| { - receiver - .map_err(|err| Problem::internal_server_error().with_details(format!("{}", err))) - .flatten() - }), - ) - } -} diff --git a/legacy/retry_tests.rs b/legacy/retry_tests.rs deleted file mode 100644 index f64eb99..0000000 --- a/legacy/retry_tests.rs +++ /dev/null @@ -1,116 +0,0 @@ -use super::retry::Retrier; -use super::{business_result, Problem}; -use actix::{Arbiter, System}; -use futures::Future; -use spectral::prelude::*; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; - -#[test] -fn test_retry_ok() { - let system = System::new("test"); - let retry: Retrier = Default::default(); - let tries = Arc::new(Mutex::new(0)); - - Arbiter::spawn( - retry - .retry(tries.clone(), |count_tries| { - let mut t = count_tries.lock().unwrap(); - *t += 1; - business_result::success(42) - }) - .then(move |result| { - assert_that(&result).is_equal_to(Ok(42)); - System::current().stop(); - Ok(()) - }), - ); - - let exit = system.run(); - - assert_that(&exit).is_equal_to(0); - let t = *tries.lock().unwrap(); - assert_that(&t).is_equal_to(1); -} - -#[test] -fn test_retry_all_fail() { - let system = System::new("test"); - let retry: Retrier = Retrier::new(5, Duration::from_millis(100)); - let tries = Arc::new(Mutex::new(0)); - let times = Arc::new(Mutex::new(Vec::::new())); - - Arbiter::spawn( - retry - .retry((tries.clone(), times.clone()), |(count_tries, times_collect)| { - let mut t = count_tries.lock().unwrap(); - *t += 1; - let mut times = times_collect.lock().unwrap(); - times.push(SystemTime::now()); - business_result::failure::(Problem::not_found().with_details("Always fail")) - }) - .then(move |result| { - assert_that(&result).is_equal_to(Err(Problem::not_found().with_details("Always fail"))); - System::current().stop(); - Ok(()) - }), - ); - - let exit = system.run(); - - assert_that(&exit).is_equal_to(0); - let t = *tries.lock().unwrap(); - assert_that(&t).is_equal_to(5); - let call_times: &Vec = ×.lock().unwrap(); - assert_that(call_times).has_length(5); - let diffs: Vec = call_times - .iter() - .zip(call_times.iter().skip(1)) - .map(|(a, b)| b.duration_since(*a).unwrap()) - .collect(); - assert_that(&diffs).has_length(4); - assert!(diffs.iter().all(|d| *d > Duration::from_millis(90))); -} - -#[test] -fn test_retry_nfail() { - let system = System::new("test"); - let retry: Retrier = Retrier::new(5, Duration::from_millis(500)); - let tries = Arc::new(Mutex::new(0)); - let times = Arc::new(Mutex::new(Vec::::new())); - - Arbiter::spawn( - retry - .retry((tries.clone(), times.clone()), |(count_tries, times_collect)| { - let mut t = count_tries.lock().unwrap(); - *t += 1; - let mut times = times_collect.lock().unwrap(); - times.push(SystemTime::now()); - if *t > 2 { - business_result::success(42) - } else { - business_result::failure::(Problem::not_found().with_details("Fail")) - } - }) - .then(move |result| { - assert_that(&result).is_equal_to(Ok(42)); - System::current().stop(); - Ok(()) - }), - ); - - let exit = system.run(); - - assert_that(&exit).is_equal_to(0); - let t = *tries.lock().unwrap(); - assert_that(&t).is_equal_to(3); - let call_times: &Vec = ×.lock().unwrap(); - assert_that(call_times).has_length(3); - let diffs: Vec = call_times - .iter() - .zip(call_times.iter().skip(1)) - .map(|(a, b)| b.duration_since(*a).unwrap()) - .collect(); - assert_that(&diffs).has_length(2); - assert!(diffs.iter().all(|d| *d > Duration::from_millis(490))); -} diff --git a/legacy/service_requester.rs b/legacy/service_requester.rs deleted file mode 100644 index 0579fc2..0000000 --- a/legacy/service_requester.rs +++ /dev/null @@ -1,148 +0,0 @@ -use super::types; -use crate::gatekeeper; -use crate::problem::Problem; -use crate::ws_try; -use actix::{Actor, Addr}; -use actix_web::{client, http}; -use futures::Future; -use serde::Serialize; -use url::form_urlencoded::byte_serialize; - -pub fn encode_url_component>(value: S) -> String { - byte_serialize(value.as_ref()).collect::() -} - -#[derive(Clone)] -pub struct ServiceRequester { - token_creator: Addr, - error_handler: &'static (dyn Fn(client::ClientResponse) -> Box> + Sync), -} - -pub trait IntoClientRequest { - fn apply_body(self, builder: &mut client::ClientRequestBuilder) -> Result; -} - -impl ServiceRequester { - pub fn with_service_auth(service_name: &str, scopes: &[(&str, &[&str])]) -> ServiceRequester { - ServiceRequester { - token_creator: gatekeeper::TokenCreator::for_service(service_name, scopes).start(), - error_handler: &ws_try::default_error_handler, - } - } - - pub fn with_error_handler( - &self, - error_handler: &'static (dyn Fn(client::ClientResponse) -> Box> + Sync), - ) -> Self { - ServiceRequester { - token_creator: self.token_creator.clone(), - error_handler, - } - } - - #[inline] - pub fn get(&self, url: U) -> impl Future - where - U: AsRef, - O: ws_try::FromClientResponse, - F: Future, - { - self.without_body(http::Method::GET, url) - } - - #[inline] - pub fn post(&self, url: U, body: I) -> impl Future - where - U: AsRef, - I: IntoClientRequest, - O: ws_try::FromClientResponse, - F: Future, - { - self.with_body(http::Method::POST, url, body) - } - - #[inline] - pub fn put(&self, url: U, body: I) -> impl Future - where - U: AsRef, - I: IntoClientRequest, - O: ws_try::FromClientResponse, - F: Future, - { - self.with_body(http::Method::PUT, url, body) - } - - #[inline] - pub fn patch(&self, url: U, body: I) -> impl Future - where - U: AsRef, - I: IntoClientRequest, - O: ws_try::FromClientResponse, - F: Future, - { - self.with_body(http::Method::PATCH, url, body) - } - - #[inline] - pub fn delete(&self, url: U) -> impl Future - where - U: AsRef, - O: ws_try::FromClientResponse, - F: Future, - { - self.without_body(http::Method::DELETE, url) - } - - pub fn with_body(&self, method: http::Method, url: U, body: I) -> impl Future - where - U: AsRef, - I: IntoClientRequest, - O: ws_try::FromClientResponse, - F: Future, - { - let error_handler_ref = self.error_handler; - gatekeeper::get_token(&self.token_creator).and_then(move |token| { - let request = body.apply_body( - client::ClientRequest::build() - .method(method) - .uri(url) - .header("Authorization", format!("Bearer {}", token.raw)), - ); - - ws_try::expect_success_with_error::<_, F, O, _>(request, error_handler_ref) - }) - } - - pub fn without_body(&self, method: http::Method, url: U) -> impl Future - where - U: AsRef, - O: ws_try::FromClientResponse, - F: Future, - { - let error_handler_ref = self.error_handler; - gatekeeper::get_token(&self.token_creator).and_then(move |token| { - let request = client::ClientRequest::build() - .method(method) - .uri(url) - .header("Authorization", format!("Bearer {}", token.raw)) - .finish(); - - ws_try::expect_success_with_error::<_, F, O, _>(request, error_handler_ref) - }) - } -} - -impl IntoClientRequest for S -where - S: Serialize, -{ - fn apply_body(self, builder: &mut client::ClientRequestBuilder) -> Result { - builder.json(self).map_err(Problem::from) - } -} - -impl IntoClientRequest for types::Done { - fn apply_body(self, builder: &mut client::ClientRequestBuilder) -> Result { - builder.finish().map_err(Problem::from) - } -} diff --git a/legacy/ws_try.rs b/legacy/ws_try.rs deleted file mode 100644 index e289b8c..0000000 --- a/legacy/ws_try.rs +++ /dev/null @@ -1,168 +0,0 @@ -use crate::business_result::AsyncBusinessResult; -use crate::problem::Problem; -use crate::types::{Done, Lines}; -use actix_web::client; -use actix_web::HttpMessage; -use futures::{future, Async, Future, Poll, Stream}; -use log::error; -use serde::de::DeserializeOwned; -use std::time::Duration; - -const JSON_RESPONSE_LIMIT: usize = 100 * 1024 * 1024; - -pub trait IntoClientRequest { - fn into_request(self) -> Result; -} - -pub trait FromClientResponse { - type Result; - type FutureResult: Future; - - fn from_response(response: client::ClientResponse) -> Self::FutureResult; -} - -pub enum WSTry { - MayBeSuccess(F), - Failure(Problem), - FutureFailure(Box>), -} - -impl Future for WSTry -where - F: Future, -{ - type Item = T; - type Error = Problem; - - fn poll(&mut self) -> Poll { - match self { - WSTry::MayBeSuccess(f) => f.poll(), - WSTry::Failure(problem) => Err(problem.clone()), - WSTry::FutureFailure(future_problem) => match future_problem.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(problem)) => Err(problem), - Err(problem) => Err(problem), - }, - } - } -} - -pub fn r#try(request: R) -> impl Future -where - R: IntoClientRequest, -{ - let client_request = match request.into_request() { - Ok(request) => request, - Err(problem) => return WSTry::Failure(problem), - }; - let url = client_request.uri().to_string(); - let method = client_request.method().to_string(); - - WSTry::MayBeSuccess( - client_request - .send() - .timeout(Duration::from_secs(60)) - .conn_timeout(Duration::from_secs(20)) - .wait_timeout(Duration::from_secs(60)) - .map_err(move |err| { - error!("Request {} {} failed: {}", method, url, err); - Problem::from(err) - }), - ) -} - -pub fn expect_success(request: R) -> impl Future -where - R: IntoClientRequest, - T: FromClientResponse, - F: Future, -{ - r#try(request).and_then(move |resp| { - if resp.status().is_success() { - WSTry::MayBeSuccess(T::from_response(resp)) - } else { - WSTry::Failure(Problem::for_status( - resp.status().as_u16(), - format!("Service request failed: {}", resp.status()), - )) - } - }) -} - -pub fn expect_success_with_error(request: R, error_handler: E) -> impl Future -where - R: IntoClientRequest, - T: FromClientResponse, - F: Future, - E: Fn(client::ClientResponse) -> Box>, -{ - r#try(request).and_then(move |resp| { - if resp.status().is_success() { - WSTry::MayBeSuccess(T::from_response(resp)) - } else { - WSTry::FutureFailure(error_handler(resp)) - } - }) -} - -#[allow(clippy::needless_pass_by_value)] -pub fn default_error_handler(response: client::ClientResponse) -> Box> { - Box::new(future::ok(Problem::for_status( - response.status().as_u16(), - format!("Service request failed: {}", response.status()), - ))) -} - -impl IntoClientRequest for client::ClientRequest { - fn into_request(self) -> Result { - Ok(self) - } -} - -impl IntoClientRequest for client::ClientRequestBuilder { - fn into_request(mut self) -> Result { - self.finish().map_err(Problem::from) - } -} - -impl IntoClientRequest for Result -where - E: Into, -{ - fn into_request(self) -> Result { - self.map_err(E::into) - } -} - -impl FromClientResponse for T -where - T: DeserializeOwned + 'static, -{ - type Result = T; - type FutureResult = AsyncBusinessResult; - - fn from_response(response: client::ClientResponse) -> Self::FutureResult { - Box::new(response.json().limit(JSON_RESPONSE_LIMIT).map_err(Problem::from)) - } -} - -impl FromClientResponse for Done { - type Result = Done; - type FutureResult = AsyncBusinessResult; - - fn from_response(response: client::ClientResponse) -> Self::FutureResult { - Box::new(response.payload().from_err().for_each(|_| Ok(())).map(|_| Done)) - } -} - -impl FromClientResponse for Lines { - type Result = Lines; - type FutureResult = AsyncBusinessResult; - - fn from_response(response: client::ClientResponse) -> Self::FutureResult { - Box::new(response.readlines().collect().then(|result| match result { - Ok(lines) => Ok(Lines::new(lines)), - Err(error) => Err(Problem::from(error)), - })) - } -} diff --git a/src/gatekeeper.rs b/src/gatekeeper.rs deleted file mode 100644 index 0d27d13..0000000 --- a/src/gatekeeper.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::business_result::BusinessResult; -use crate::problem::Problem; -use crate::ws_try::SendClientRequestExt; -use actix::{Actor, ActorFuture, ActorResponse, Context, Handler, Message, WrapFuture}; -use reqwest::Client; -use serde_derive::{Deserialize, Serialize}; -use serde_json::{Map, Value}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Token { - #[serde(rename = "keyId")] - pub key_id: String, - #[serde(rename = "keyType")] - pub key_type: String, - pub raw: String, - pub crypted: String, - pub expires: u64, - pub claims: Map, -} - -#[derive(Message)] -#[rtype(result = "BusinessResult")] -struct GetToken; - -pub struct TokenCreator { - claims: Map, - current: Option, - client: Client, -} - -pub async fn get_token(actor: &actix::Addr) -> BusinessResult { - actor.send(GetToken).await? -} - -impl TokenCreator { - pub fn for_service(service_name: &str, scopes: &[(&str, &[&str])]) -> TokenCreator { - let mut claims: Map = Map::new(); - - claims.insert("sub".to_string(), Value::String(format!("service/{}", service_name))); - - let mut scopes_map: Map = Map::new(); - - for (key, values_raw) in scopes { - let values = values_raw.iter().map(|&v| Value::String((*v).to_string())).collect(); - scopes_map.insert((*key).to_string(), Value::Array(values)); - } - - claims.insert("scopes".to_string(), Value::Object(scopes_map)); - - TokenCreator { - claims, - current: None, - client: Client::new(), - } - } -} - -impl Handler for TokenCreator { - type Result = ActorResponse; - - fn handle(&mut self, _msg: GetToken, _ctx: &mut actix::Context) -> Self::Result { - let now_plus_grace = SystemTime::now() + Duration::from_secs(60); - let unixtime = match now_plus_grace.duration_since(UNIX_EPOCH) { - Ok(duration) => duration.as_secs(), - Err(error) => return ActorResponse::reply(Err(Problem::from(error))), - }; - match self.current { - Some(ref token) if token.expires > unixtime => ActorResponse::reply(Ok(token.clone())), - _ => { - let token_response = self - .client - .post("http://localhost:12345/v1/tokens") - .timeout(Duration::from_secs(30)) - .json(&self.claims) - .expect_success::(); - - ActorResponse::r#async(token_response.into_actor(self).map(|maybe_token, actor, _| { - if let Ok(ref token) = maybe_token { - actor.current = Some(token.clone()); - } - maybe_token - })) - } - } - } -} - -impl Actor for TokenCreator { - type Context = Context; -} diff --git a/src/lib.rs b/src/lib.rs index 194a1cf..1178a33 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ pub mod business_result; pub mod elasticsearch; #[cfg(test)] pub mod elasticsearch_test; -pub mod gatekeeper; #[cfg(feature = "with-slog")] pub mod logging_slog; pub mod metrics; diff --git a/src/service_requester.rs b/src/service_requester.rs index d7eee42..5cb204f 100644 --- a/src/service_requester.rs +++ b/src/service_requester.rs @@ -1,9 +1,7 @@ use crate::{ - gatekeeper::{get_token, TokenCreator}, ws_try::{default_error_handler, FromClientResponse, SendClientRequestExt}, BusinessResult, Problem, }; -use actix::{Actor, Addr}; use bytes::Bytes; use reqwest::{redirect::Policy, Client, IntoUrl, Method, RequestBuilder, StatusCode}; use serde::Serialize; @@ -30,27 +28,23 @@ where #[derive(Clone)] pub struct ServiceRequester { client: Client, - token_creator: Addr, + service_name: &'static str, error_handler: &'static (dyn Fn(StatusCode, Result) -> Problem + Sync), } impl ServiceRequester { - pub fn with_service_auth(service_name: &str, scopes: &[(&str, &[&str])]) -> BusinessResult { - ServiceRequester::with_service_auth_with_timeout(service_name, scopes, 120) + pub fn with_service_auth(service_name: &'static str) -> BusinessResult { + ServiceRequester::with_service_auth_with_timeout(service_name, 120) } - pub fn with_service_auth_with_timeout( - service_name: &str, - scopes: &[(&str, &[&str])], - timeout_seconds: u16, - ) -> BusinessResult { + pub fn with_service_auth_with_timeout(service_name: &'static str, timeout_seconds: u16) -> BusinessResult { Ok(ServiceRequester { client: Client::builder() .connect_timeout(Duration::from_secs(timeout_seconds as u64)) .timeout(Duration::from_secs(timeout_seconds as u64)) .redirect(Policy::none()) .build()?, - token_creator: TokenCreator::for_service(service_name, scopes).start(), + service_name, error_handler: &default_error_handler, }) } @@ -61,7 +55,7 @@ impl ServiceRequester { ) -> Self { ServiceRequester { client: self.client, - token_creator: self.token_creator, + service_name: self.service_name, error_handler, } } @@ -120,14 +114,13 @@ impl ServiceRequester { I: IntoClientRequest, O: FromClientResponse + 'static, { - let token = get_token(&self.token_creator).await?; - body .apply_body( self .client .request(method, url) - .header("Authorization", format!("Bearer {}", token.raw)), + .header("X-Auth-Sub", format!("service/{}", self.service_name)) + .header("X-Auth-Token", "internal-token"), ) .expect_success_with_error(self.error_handler) .await @@ -138,12 +131,11 @@ impl ServiceRequester { U: IntoUrl, O: FromClientResponse + 'static, { - let token = get_token(&self.token_creator).await?; - self .client .request(method, url) - .header("Authorization", format!("Bearer {}", token.raw)) + .header("X-Auth-Sub", format!("service/{}", self.service_name)) + .header("X-Auth-Token", "internal-token") .expect_success_with_error(self.error_handler) .await } From fc45265eeb01e575371348efc9a2482c2bc1d8f4 Mon Sep 17 00:00:00 2001 From: Ihor M Date: Fri, 7 Jan 2022 13:16:43 +0100 Subject: [PATCH 2/2] updated lib - dropped gatekeeper --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85ed6c2..dbb0f1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1398,7 +1398,7 @@ dependencies = [ [[package]] name = "microtools" -version = "0.3.4" +version = "0.4.1" dependencies = [ "actix", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index d24aa71..fa70f76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "microtools" -version = "0.3.4" +version = "0.4.1" authors = ["Bodo Junglas ", "Ihor Mordashev