Skip to content

Commit

Permalink
Add Google PubSub support
Browse files Browse the repository at this point in the history
In this commit I add support for sending login notifications to Google Pub/Sub:

These command line arguments were added:

```
       --ntf-pubsub-base-url <URL>
            The base url of Google Cloud Storage API [env: UNFTP_NTF_PUBSUB_BASE_URL=]  [default:
            https://pubsub.googleapis.com]
        --ntf-pubsub-project <PROJECT_ID>
            The ID of the GCP project where the topic exists [env: UNFTP_NTF_PUBSUB_PROJECT=]

        --ntf-pubsub-topic <TOPIC_NAME>
            The name of the Google PubSub topic to publish to [env: UNFTP_NTF_PUBSUB_TOPIC=]
```

that can be used to enable this.

Currently only workload identity is supported as authentication mechanism

#102
  • Loading branch information
hannesdejager authored Oct 28, 2021
1 parent c4e72c7 commit 210712d
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 33 deletions.
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ version="0.1.2"

[dependencies]
async-trait = "0.1.51"
base64 = "0.13.0"
bitflags = "1.3.2"
clap = "2.33.3"
futures = "0.3.17"
http = "0.2.5"
hyper = { version = "0.14.13", features = ["server", "http1"] }
hyper-rustls = "^0.22"
lazy_static = "1.4.0"
libunftp = "0.18.1"
prometheus = { version = "0.12.0", features = ["process"] }
Expand All @@ -41,6 +43,7 @@ serde_json = { version = "1.0.68" }
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] }
slog-async = "2.7.0"
slog-term = "2.8.0"
thiserror = "1.0.30"
tokio = { version = "1.12.0", features = ["full"] }
unftp-sbe-fs = "0.2.0"
unftp-sbe-gcs = { version = "0.2.0", optional = true }
Expand Down
28 changes: 28 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub const LOG_LEVEL: &str = "log-level";
pub const PASSIVE_HOST: &str = "passive-host";
pub const PASSIVE_PORTS: &str = "passive-ports";
pub const PROXY_EXTERNAL_CONTROL_PORT: &str = "proxy-external-control-port";
pub const PUBSUB_BASE_URL: &str = "ntf-pubsub-base-url";
pub const PUBSUB_TOPIC: &str = "ntf-pubsub-topic";
pub const PUBSUB_PROJECT: &str = "ntf-pubsub-project";
pub const REDIS_HOST: &str = "log-redis-host";
pub const REDIS_KEY: &str = "log-redis-key";
pub const REDIS_PORT: &str = "log-redis-port";
Expand Down Expand Up @@ -414,4 +417,29 @@ pub(crate) fn clap_app(tmp_dir: &str) -> clap::App {
.env("UNFTP_USR_JSON_PATH")
.takes_value(true),
)
.arg(
Arg::with_name(PUBSUB_BASE_URL)
.long("ntf-pubsub-base-url")
.value_name("URL")
.help("The base url of the Google Pub/Sub API")
.env("UNFTP_NTF_PUBSUB_BASE_URL")
.default_value("https://pubsub.googleapis.com")
.takes_value(true),
)
.arg(
Arg::with_name(PUBSUB_TOPIC)
.long("ntf-pubsub-topic")
.value_name("TOPIC_NAME")
.help("The name of the Google Pub/Sub topic to publish to")
.env("UNFTP_NTF_PUBSUB_TOPIC")
.takes_value(true),
)
.arg(
Arg::with_name(PUBSUB_PROJECT)
.long("ntf-pubsub-project")
.value_name("PROJECT_ID")
.help("The ID of the GCP project where the Google Pub/Sub topic exists")
.env("UNFTP_NTF_PUBSUB_PROJECT")
.takes_value(true),
)
}
59 changes: 59 additions & 0 deletions src/domain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use async_trait::async_trait;
use serde::__private::fmt::Debug;
use serde::{Deserialize, Serialize};

// EventDispatcher can send events to the outside world.
#[async_trait]
pub trait EventDispatcher<T>: Send + Sync + Debug {
async fn dispatch(&self, event: T);
}

// An EventDispatcher that dispatches to the void of nothingness.
#[derive(Debug)]
pub struct NullEventDispatcher {}

#[async_trait]
impl EventDispatcher<FTPEvent> for NullEventDispatcher {
async fn dispatch(&self, _event: FTPEvent) {
// Do Nothing
}
}

// The event that will be sent
#[derive(Serialize, Deserialize, Debug)]
pub struct FTPEvent {
pub source_instance: String,
pub hostname: String,
pub payload: FTPEventPayload,
}

// The event variant
#[derive(Serialize, Deserialize, Debug)]
pub enum FTPEventPayload {
Startup {
libunftp_version: String,
unftp_version: String,
},
Login {
username: String,
},
Get {
path: String,
},
Put {
path: String,
},
Delete {
path: String,
},
MakeDir {
path: String,
},
Rename {
from: String,
to: String,
},
RemoveDir {
path: String,
},
}
4 changes: 4 additions & 0 deletions src/infra/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod pubsub;
mod workload_identity;

pub use pubsub::PubsubEventDispatcher;
137 changes: 137 additions & 0 deletions src/infra/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::domain::{EventDispatcher, FTPEvent};
use crate::infra::workload_identity;
use async_trait::async_trait;
use http::{header, Method, Request, StatusCode, Uri};
use hyper::client::connect::dns::GaiResolver;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Response};
use hyper_rustls::HttpsConnector;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;

// Notes:
// - Emulator: https://cloud.google.com/pubsub/docs/emulator
// - virtualenv -p python3 mypython
// - API Docs for publishing: https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/publish
//

/// An [EventDispatcher](crate::domain::EventDispatcher) that dispatches to Google Pub/sub
#[derive(Debug)]
pub struct PubsubEventDispatcher {
log: Arc<slog::Logger>,
api_base_url: String,
project: String,
topic: String,
client: Client<HttpsConnector<HttpConnector>>,
}

const DEFAULT_SERVICE_ENDPOINT: &str = "https://pubsub.googleapis.com";

impl PubsubEventDispatcher {
#[allow(dead_code)]
pub fn new<Str>(log: Arc<slog::Logger>, project: Str, topic: Str) -> Self
where
Str: Into<String>,
{
Self::with_api_base(log, project.into(), topic.into(), DEFAULT_SERVICE_ENDPOINT.to_owned())
}

pub fn with_api_base<Str>(log: Arc<slog::Logger>, project: Str, topic: Str, api_base: Str) -> Self
where
Str: Into<String>,
{
let client: Client<HttpsConnector<HttpConnector<GaiResolver>>, Body> =
Client::builder().build(HttpsConnector::with_native_roots());
PubsubEventDispatcher {
log,
api_base_url: api_base.into(),
project: project.into(),
topic: topic.into(),
client,
}
}

// Gets the authentication token through workload identity mechanisms
async fn get_token(&self) -> Result<String, workload_identity::Error> {
Ok(workload_identity::request_token(None, self.client.clone())
.await?
.access_token)
}

// publishes to Google pub/sub
async fn publish(&self, event: FTPEvent) -> Result<(), String> {
let msg = base64::encode(serde_json::to_string(&event).unwrap());
let b = PubSubRequest {
messages: vec![PubSubMsg {
data: msg.to_owned(),
attributes: HashMap::new(), // TODO Set attribute based on the event type so subscribers can filter.
}],
};
let body_string = serde_json::to_string(&b).map_err(|e| format!("error marshalling message: {}", e))?;

// TODO: Implement other auth methods
// FIXME: When testing locally there won't be a token, we might want to handle this better.
let token = self.get_token().await.unwrap_or_else(|_| "".to_owned());

let request: Request<Body> = Request::builder()
.uri(
Uri::from_maybe_shared(format!(
"{}/v1/projects/{}/topics/{}:publish",
self.api_base_url, self.project, self.topic
))
.map_err(|e| format!("invalid request URI: {}", e))?,
)
.header(header::AUTHORIZATION, format!("Bearer {}", token))
.method(Method::POST)
.body(body_string.into())
.map_err(|e| format!("error with publish request: {}", e))?;

let response: Response<Body> = self.client.request(request).await.unwrap();
if response.status() != StatusCode::OK {
Err(format!("bad HTTP status code received: {}", response.status()))
} else {
Ok(())
}
}
}

#[async_trait]
impl EventDispatcher<FTPEvent> for PubsubEventDispatcher {
async fn dispatch(&self, event: FTPEvent) {
let r = self.publish(event).await;
if r.is_err() {
slog::error!(self.log, "Could not dispatch event to pub/sub: {}", r.unwrap_err());
}
}
}

#[derive(Serialize, Deserialize)]
struct PubSubRequest {
messages: Vec<PubSubMsg>,
}

#[derive(Serialize, Deserialize)]
struct PubSubMsg {
data: String,
attributes: HashMap<String, String>,
}

#[cfg(test)]
mod tests {
use crate::infra::pubsub::{PubSubMsg, PubSubRequest};
use std::collections::HashMap;

#[test]
fn pubub_request_serializes_correctly() {
let payload = base64::encode("123");
let r = PubSubRequest {
messages: vec![PubSubMsg {
data: payload.to_owned(),
attributes: HashMap::new(),
}],
};
let json = serde_json::to_string(&r).unwrap();
assert_eq!(json, "{\"messages\":[{\"data\":\"MTIz\",\"attributes\":{}}]}")
}
}
Loading

0 comments on commit 210712d

Please sign in to comment.