Skip to content

Commit

Permalink
user-ingest: Push matchdata_salts to rmq stream
Browse files Browse the repository at this point in the history
  • Loading branch information
raimannma committed Oct 10, 2024
1 parent 267d477 commit 8672650
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 8 deletions.
1 change: 1 addition & 0 deletions user-ingest/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions user-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ reqwest = "0.12.8"
tempfile = "3.13.0"
futures = "0.3.31"
lapin = "2.5.0"
serde_json = "1.0.128"
2 changes: 1 addition & 1 deletion user-ingest/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), Proc

download_to_file(salts, data_type, &local_path).await?;
s3::upload_to_s3(&local_path, &s3_path).await?;
rmq::add_to_queue(&s3_path).await?;
rmq::add_to_queue("db_ingest_queue", &s3_path).await?;
info!("Uploaded {}", s3_path);

local_file.close().map_err(ProcessError::Io)
Expand Down
7 changes: 7 additions & 0 deletions user-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ async fn main() -> Result<(), io::Error> {
let semaphore = Arc::new(semaphore);
let mut tasks = FuturesUnordered::new();
while let Some(salts) = salts_channel_receiver.recv().await {
let serialized_salts = serde_json::to_string(&salts);
if let Ok(serialized_salts) = serialized_salts {
match rmq::add_to_queue("matchdata_salts", &serialized_salts).await {
Ok(_) => debug!("Sent salts to queue"),
Err(e) => error!("Failed to send salts to queue: {:?}", e),
}
}
let permit = semaphore.clone().acquire_owned().await.unwrap();

debug!("Received metadata download task: {:?}", salts);
Expand Down
4 changes: 2 additions & 2 deletions user-ingest/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::io;

Expand All @@ -10,7 +10,7 @@ pub enum ProcessError {
RmqError(lapin::Error),
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Salts {
pub cluster_id: u32,
pub match_id: u64,
Expand Down
8 changes: 3 additions & 5 deletions user-ingest/src/rmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@ static RABBITMQ_PASS: LazyLock<String> =
static RABBITMQ_HOST: LazyLock<String> = LazyLock::new(|| std::env::var("RABBITMQ_HOST").unwrap());
static RABBITMQ_PORT: LazyLock<String> = LazyLock::new(|| std::env::var("RABBITMQ_PORT").unwrap());

const RABBITMQ_QUEUE: &str = "db_ingest_queue";

static RABBITMQ_CONNECTION: OnceCell<Connection> = OnceCell::const_new();
static RABBITMQ_CHANNEL: OnceCell<Channel> = OnceCell::const_new();

pub async fn add_to_queue(body: &str) -> Result<(), ProcessError> {
pub async fn add_to_queue(routing_key: &str, body: &str) -> Result<(), ProcessError> {
let rmq_channel = get_rmq_channel().await?;
info!("Sending message {} to queue: {}", body, RABBITMQ_QUEUE);
info!("Sending message {} to queue: {}", body, routing_key);
rmq_channel
.basic_publish(
"",
RABBITMQ_QUEUE,
routing_key,
BasicPublishOptions::default(),
body.as_bytes(),
BasicProperties::default(),
Expand Down

0 comments on commit 8672650

Please sign in to comment.