From 3f95012ce0fddc647ada226b6d0e7291a694c89b Mon Sep 17 00:00:00 2001 From: Terkwood <38859656+Terkwood@users.noreply.github.com> Date: Mon, 26 Nov 2018 00:03:50 -0500 Subject: [PATCH] Implement google pub/sub push route (#61) --- cloud_images/pond/Cargo.toml | 2 +- cloud_images/pond/src/config.rs | 1 + cloud_images/pond/src/lib.rs | 1 + cloud_images/pond/src/push.rs | 104 ++++++++++++++++++++++++++++++++ cloud_images/pond/src/tanks.rs | 4 +- cloud_images/pond/src/web.rs | 44 +++++++++++++- 6 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 cloud_images/pond/src/push.rs diff --git a/cloud_images/pond/Cargo.toml b/cloud_images/pond/Cargo.toml index 48e3c852..7104952c 100644 --- a/cloud_images/pond/Cargo.toml +++ b/cloud_images/pond/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pond" -version = "0.2.1" +version = "0.3.0" authors = ["Terkwood "] edition = "2018" diff --git a/cloud_images/pond/src/config.rs b/cloud_images/pond/src/config.rs index 6395fce4..ca060019 100644 --- a/cloud_images/pond/src/config.rs +++ b/cloud_images/pond/src/config.rs @@ -8,6 +8,7 @@ pub struct Config { pub redis_namespace: String, rocket_databases: String, pub cors_allow_origin: Option, + pub push_secret: String, } impl Config { diff --git a/cloud_images/pond/src/lib.rs b/cloud_images/pond/src/lib.rs index fa45e23d..2b4aa7c6 100644 --- a/cloud_images/pond/src/lib.rs +++ b/cloud_images/pond/src/lib.rs @@ -19,6 +19,7 @@ mod authorization; pub mod claims; pub mod config; pub mod key_pairs; +mod push; mod redis_conn; mod tanks; pub mod web; diff --git a/cloud_images/pond/src/push.rs b/cloud_images/pond/src/push.rs new file mode 100644 index 00000000..9b1e57e9 --- /dev/null +++ b/cloud_images/pond/src/push.rs @@ -0,0 +1,104 @@ +use base64; +use crate::redis_conn::RedisDbConn; +use redis_delta::RDelta; +use rocket_contrib::databases::redis::Commands; +use std::collections::HashMap; + +/// Push data structure which adheres to Google Cloud Pub/Sub +/// specification. Each of these is an individual Redis Delta +/// containing various payloads depending on the Redis type +/// that it represents. +/// See https://cloud.google.com/pubsub/docs/push +#[derive(Debug, Deserialize)] +pub struct PushData { + pub message: Message, + pub subscription: String, +} + +impl PushData { + // FIXME deal with out-of-order messaging + pub fn ingest(&self, conn: RedisDbConn) -> Result<(), PushDataError> { + let rdelta = self.message.deserialize()?; + let result = match rdelta { + RDelta::UpdateHash { + key, + mut fields, + time: _, + } => { + let mut name_vals: Vec<(String, String)> = vec![]; + for rf in fields.drain(..) { + name_vals.push((rf.name, rf.val)); + } + Ok(conn.0.hset_multiple(key, &name_vals)?) + } + RDelta::UpdateSet { key, vals, time: _ } => + // FIXME this method can leave some members in place + // FIXME who no longer exist in the source database + // FIXME instead of using SADD, we should... ? + // FIXME ...pop everything atomically and re-insert? + // FIXME ...or only signal the addition of members from the source? + // FIXME ...or maybe this is decent behavior after all? + { + Ok(conn.0.sadd(key, vals)?) + } + RDelta::UpdateString { key, val, time: _ } => Ok(conn.0.set(key, val)?), + }; + + if let Err(e) = &result { + eprintln!("Error on ingest! {:?}", e) + } + + result + } +} + +#[derive(Debug, Deserialize)] +pub struct Message { + pub attributes: Option>, + pub data: Base64, + pub message_id: String, +} +impl Message { + pub fn deserialize(&self) -> Result { + let json_r: Result = serde_json::from_slice(&self.data.decode()?[..]); + Ok(json_r?) + } +} + +#[derive(Debug, Deserialize)] +pub struct Base64(pub String); +impl Base64 { + /// You can consume this with `serde_json::from_slice` + pub fn decode(&self) -> Result, base64::DecodeError> { + base64::decode(&self.0) + } +} + +#[derive(Debug)] +pub enum PushDataError { + Base64, + Json, + Utf8, + Redis, +} +impl From for PushDataError { + fn from(_e: rocket_contrib::databases::redis::RedisError) -> PushDataError { + PushDataError::Redis + } +} +impl From for PushDataError { + fn from(_e: std::str::Utf8Error) -> PushDataError { + PushDataError::Utf8 + } +} +impl From for PushDataError { + fn from(_e: serde_json::Error) -> PushDataError { + PushDataError::Json + } +} + +impl From for PushDataError { + fn from(_e: base64::DecodeError) -> PushDataError { + PushDataError::Base64 + } +} diff --git a/cloud_images/pond/src/tanks.rs b/cloud_images/pond/src/tanks.rs index 0827c6e3..2c6566b0 100644 --- a/cloud_images/pond/src/tanks.rs +++ b/cloud_images/pond/src/tanks.rs @@ -74,7 +74,7 @@ pub fn fetch_all(conn: RedisDbConn, namespace: &str) -> Result, redis: fn fetch_num_tanks(conn: &RedisDbConn, namespace: &str) -> Result { let key = Key::AllTanks { - ns: Namespace(namespace), + ns: Namespace(namespace.to_owned()), } .to_string(); conn.0.get(key) @@ -98,7 +98,7 @@ fn fetch_tank_status( namespace: &str, ) -> Result, redis::RedisError> { let key = Key::Tank { - ns: Namespace(namespace), + ns: Namespace(namespace.to_owned()), id, } .to_string(); diff --git a/cloud_images/pond/src/web.rs b/cloud_images/pond/src/web.rs index dc19f2bc..459d4938 100644 --- a/cloud_images/pond/src/web.rs +++ b/cloud_images/pond/src/web.rs @@ -2,9 +2,11 @@ use crate::authentication::{authenticate, AuthenticationResult}; use crate::authorization::authorize; use crate::config::Config; use crate::key_pairs; +use crate::push::{PushData, PushDataError}; use crate::redis_conn::*; use crate::tanks; use rocket::http::hyper::header::{AccessControlAllowOrigin, AccessControlMaxAge}; +use rocket::http::RawStr; use rocket::http::Status; use rocket::request::{self, FromRequest, Request}; use rocket::{Outcome, State}; @@ -129,11 +131,51 @@ fn token_from_bearer_string(bearer_string: &str) -> Result { } } +/// An endpoint which receives push messages from Google pub/sub platform. +/// These messages summarize changes to the Redis database hosted in +/// in local proximity to the temp & ph sensors. +/// See https://cloud.google.com/pubsub/docs/push +/// +/// Here is an example of sending a base64 encoded payload to the endpoint. +/// +/// ```sh +/// curl -k -d '{ "message": { "attributes": { "key": "value" }, "data": "eyJ1cGRhdGVfaGFzaCI6eyJrZXkiOiJwcmF3bmJhYnkvc2Vuc29ycy90ZW1wL2FhYWFhYWFhLWVlZWUtYWFhYS1hYWFhLWFhYWFhYWFhYWFhYSIsImZpZWxkcyI6W3sibmFtZSI6InRlbXBfdXBkYXRlX2NvdW50IiwidmFsIjoiNDEwOTY2In0seyJuYW1lIjoidGVtcF91cGRhdGVfdGltZSIsInZhbCI6IjE1NDI3NTI3MTAifSx7Im5hbWUiOiJ0ZW1wX2MiLCJ2YWwiOiIyNC42MiJ9LHsibmFtZSI6InRlbXBfZiIsInZhbCI6Ijc2LjMyIn1dLCJ0aW1lIjoxNTQyNzUyNzE1fX0=", "message_id": "136969346945" },"subscription": "projects/myproject/subscriptions/mysubscription"}' -H "Content-Type: application/json" -X POST https://localhost:8000/push_redis\?token\=fancy_shared_sekrit +/// ``` +/// +/// In this case, the base64 "data" attribute decodes as follows: +/// ```json +/// {"update_hash":{"key":"prawnbaby/sensors/temp/aaaaaaaa-eeee-aaaa-aaaa-aaaaaaaaaaaa","fields":[{"name":"temp_update_count","val":"410966"},{"name":"temp_update_time","val":"1542752710"},{"name":"temp_c","val":"24.62"},{"name":"temp_f","val":"76.32"}],"time":1542752715}} +/// ``` +#[post( + "/push_redis?", + format = "application/json", + data = "" +)] +pub fn push_redis( + data: Json, + token: &RawStr, + conn: RedisDbConn, + config: State, +) -> Status { + let push_secret: String = config.push_secret.to_string(); + // This can be improved. + // See https://github.com/Terkwood/prawnalith/issues/60 + if token.as_str() == push_secret { + match data.ingest(conn) { + Ok(_) => Status::NoContent, + Err(PushDataError::Redis) => Status::InternalServerError, + Err(_) => Status::UnprocessableEntity, + } + } else { + Status::Unauthorized + } +} + pub fn startup(config: Config) { rocket::ignite() .manage(config) .attach(RedisDbConn::fairing()) - .mount("/", routes![tanks, tanks_options]) + .mount("/", routes![tanks, tanks_options, push_redis]) .launch(); }