Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
add envconfig to parse envvars (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Oct 26, 2023
1 parent 9ceabe7 commit c9e835e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions capture-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ tokio = { workspace = true }
tracing-subscriber = { workspace = true }
tracing = { workspace = true }
time = { workspace = true }
envconfig = "0.10.0"
33 changes: 19 additions & 14 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use std::env;
use envconfig::Envconfig;
use std::net::SocketAddr;
use std::sync::Arc;

use capture::{billing_limits::BillingLimiter, redis::RedisClient, router, sink};
use time::Duration;
use tokio::signal;

#[derive(Envconfig)]
struct Config {
#[envconfig(default = "false")]
print_sink: bool,
#[envconfig(default = "127.0.0.1:3000")]
address: SocketAddr,
redis_url: String,
kafka_hosts: String,
kafka_topic: String,
}

async fn shutdown() {
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");
Expand All @@ -23,18 +34,15 @@ async fn shutdown() {

#[tokio::main]
async fn main() {
let use_print_sink = env::var("PRINT_SINK").is_ok();
let address = env::var("ADDRESS").unwrap_or(String::from("127.0.0.1:3000"));
let redis_addr =
env::var("REDIS_URL").expect("redis required; please set the REDIS_URL env var");
let config = Config::init_from_env().expect("Invalid configuration:");

let redis_client =
Arc::new(RedisClient::new(redis_addr).expect("failed to create redis client"));
Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client"));

let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone())
.expect("failed to create billing limiter");

let app = if use_print_sink {
let app = if config.print_sink {
router::router(
capture::time::SystemTime {},
sink::PrintSink {},
Expand All @@ -43,10 +51,7 @@ async fn main() {
true,
)
} else {
let brokers = env::var("KAFKA_HOSTS").expect("Expected KAFKA_HOSTS");
let topic = env::var("KAFKA_TOPIC").expect("Expected KAFKA_TOPIC");

let sink = sink::KafkaSink::new(topic, brokers).unwrap();
let sink = sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts).unwrap();

router::router(
capture::time::SystemTime {},
Expand All @@ -58,14 +63,14 @@ async fn main() {
};

// initialize tracing

tracing_subscriber::fmt::init();

// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`

tracing::info!("listening on {}", address);
tracing::info!("listening on {}", config.address);

axum::Server::bind(&address.parse().unwrap())
axum::Server::bind(&config.address)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(shutdown())
.await
Expand Down

0 comments on commit c9e835e

Please sign in to comment.