Skip to content

Commit

Permalink
feat: create worker queue using redis for uploading metadata json in …
Browse files Browse the repository at this point in the history
…the background
  • Loading branch information
kespinola committed Sep 21, 2023
1 parent 642eb9c commit f2b0a67
Show file tree
Hide file tree
Showing 14 changed files with 1,030 additions and 338 deletions.
1,053 changes: 718 additions & 335 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
name = "holaplex-hub-nfts"
version = "0.1.1"
publish = false
authors = [
"Holaplex <[email protected]>",
]
authors = ["Holaplex <[email protected]>"]
edition = "2021"
description = "Holaplex Hub nfts service"
readme = "./README.md"
Expand All @@ -29,6 +27,7 @@ async-graphql = { version = "5.0.4", features = [
"dataloader",
"apollo_tracing",
] }
redis = { version = "0.20.0", features = ["aio"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.93"
solana-program = "1"
Expand Down
45 changes: 45 additions & 0 deletions api/src/background_worker/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use super::tasks::BackgroundTask;
use serde::{Deserialize, Serialize};

#[derive(Debug)]
pub struct Job<T: BackgroundTask> {
pub id: i64,
pub task: T,
}

impl<T: BackgroundTask> Serialize for Job<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let payload = self.task.payload();
let payload_str = serde_json::to_string(&payload).map_err(serde::ser::Error::custom)?;

let mut state = serializer.serialize_struct("Job", 3)?;
state.serialize_field("id", &self.id)?;
state.serialize_field("task", &payload_str)?;
state.end()
}
}

impl<'de, T: BackgroundTask + for<'a> Deserialize<'a>> Deserialize<'de> for Job<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct JobHelper {
id: i64,
task: String,
}

let helper = JobHelper::deserialize(deserializer)?;

let task: T = serde_json::from_str(&helper.task).map_err(serde::de::Error::custom)?;

Ok(Job {
id: helper.id,
task,
})
}
}
79 changes: 79 additions & 0 deletions api/src/background_worker/job_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use super::{job::Job, tasks::BackgroundTask};
use crate::db::Connection;
use redis::AsyncCommands;
use redis::Client;
use std::error::Error;
use std::fmt;
use std::sync::{Arc, Mutex};

#[derive(Debug)]
struct LockError(String);

impl fmt::Display for LockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Error for LockError {}

pub struct JobQueue {
client: Arc<Mutex<Client>>,
db_pool: Arc<Connection>,
}

impl JobQueue {
pub async fn new(redis_url: &str, db_pool: Connection) -> Self {
let client = Client::open(redis_url).expect("Failed to create Redis client");
Self {
client: Arc::new(Mutex::new(client)),
db_pool: Arc::new(db_pool),
}
}

pub async fn enqueue<T: BackgroundTask>(
&self,
job: &Job<T>,
) -> Result<(), Box<dyn std::error::Error>> {
let client_guard = self
.client
.lock()
.map_err(|e| Box::new(LockError(e.to_string())) as Box<dyn Error>)?;
let mut conn = client_guard.get_async_connection().await?;

let payload = serde_json::to_string(&job.task.payload())?;

redis::cmd("LPUSH")
.arg("job_queue")
.arg(payload)
.query_async(&mut conn)
.await?;
Ok(())
}

pub async fn dequeue<T: BackgroundTask>(
&self,
) -> Result<Option<Job<T>>, Box<dyn std::error::Error>> {
let client_guard = self
.client
.lock()
.map_err(|e| Box::new(LockError(e.to_string())) as Box<dyn Error>)?;
let mut conn = client_guard.get_async_connection()?;

let payload: Option<String> = redis::cmd("RPOP")
.arg("job_queue")
.query_async(&mut conn)
.await?;

if let Some(payload) = payload {
let task: Box<dyn BackgroundTask> = serde_json::from_str(&payload)?;
let job = Job {
id: generate_unique_id(), // You would need to implement this function
task,
};
Ok(Some(job))
} else {
Ok(None)
}
}
}
4 changes: 4 additions & 0 deletions api/src/background_worker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod job_queue;
mod worker;
mod job;
mod tasks;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use super::BackgroundTask;
8 changes: 8 additions & 0 deletions api/src/background_worker/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
use serde_json::Value as Json;

pub trait BackgroundTask: Send + Sync + std::fmt::Debug {
fn process(&self) -> Result<(), Box<dyn std::error::Error>>;
fn payload(&self) -> Json;
}

80 changes: 80 additions & 0 deletions api/src/background_worker/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use super::job_queue::JobQueue;
use crate::db::Connection;
use hub_core::{
tokio,
tracing::{error, info},
};
use std::sync::Arc;

pub struct Worker {
job_queue: Arc<JobQueue>,
db_pool: Arc<Connection>,
}

impl Worker {
pub fn new(job_queue: Arc<JobQueue>, db_pool: Connection) -> Self {
Self {
job_queue,
db_pool: Arc::new(db_pool),
}
}

pub async fn start(&self) {
loop {
if let Ok(Some(mut job)) = self.job_queue.dequeue().await {
let job_queue_clone = self.job_queue.clone();
let job_id = job.id;
let db_pool_clone = Arc::clone(&self.db_pool);
tokio::spawn(async move {
if JobTracking::find_by_id(job_id, &db_pool_clone)
.await?
.is_none()
{

// Create a new record in the database
JobTracking::create(
job_id,
"JobType",
job.task.payload(),
"processing",
&db_pool_clone,
)
.await?;

// sora elle espinola
// Process the job using the trait method
match job.task.process() {
Ok(_) => {
// Update the job status in the database to "completed"
JobTracking::update_status(&job, "completed", &db_pool_clone)
.await
.unwrap();
},
Err(e) => {
println!("Job processing failed: {}", e);

// Re-queue the job and update the job status in the database to "queued"
job_queue_clone
.enqueue(&job)
.await
.expect("Failed to re-queue job");
JobTracking::update_status(&job, "queued", &db_pool_clone)
.await
.unwrap();
},
}
} else {
info!("Duplicate job detected, skipping: {}", job_id);
}
Ok::<(), sea_orm::DbErr>(())
})
.await
.unwrap_or_else(|e| {
error!("An error occurred: {}", e);

Ok::<(), sea_orm::DbErr>(())
});
}
}
}
}
19 changes: 19 additions & 0 deletions api/src/entities/job_trackings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use sea_orm::entity::prelude::*;
use serde_json::Value as Json;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "job_trackings")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub job_type: String,
pub payload: Json,
pub status: String,
pub created_at: DateTimeWithTimeZone,
pub updated_at: DateTimeWithTimeZone,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions api/src/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ pub mod sea_orm_active_enums;
pub mod switch_collection_histories;
pub mod transfer_charges;
pub mod update_histories;
pub mod job_trackings;
1 change: 1 addition & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod mutations;
pub mod nft_storage;
pub mod objects;
pub mod queries;
pub mod background_worker;

use async_graphql::{
dataloader::DataLoader,
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
version: '3.8'
services:
redis:
image: redis:7.2.1
ports:
- 6379:6379
volumes:
- holaplex_hub_nfts_redis:/data
db:
image: postgres:15.1
container_name: db
Expand Down Expand Up @@ -28,4 +34,6 @@ services:
- 29092:29092
volumes:
holaplex_hub_nfts:
driver: local
holaplex_hub_nfts_redis:
driver: local
8 changes: 8 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ mod m20230725_144506_drop_solana_collections_table;
mod m20230807_090847_create_histories_table;
mod m20230818_163948_downcase_polygon_addresses;
mod m20230821_131630_create_switch_collection_histories_table;
<<<<<<< HEAD
mod m20230905_100852_add_type_to_drop;
mod m20230910_204731_add_queued_variant_to_mints_status;
mod m20230910_212742_make_owner_address_optional_for_mint;
mod m20230911_144938_make_compressed_column_optional;
mod m20230915_111128_create_mints_creation_status_idx;
=======
mod m20230914_154759_add_job_trackings_table;
>>>>>>> 502f5ad (feat: create worker queue using redis for uploading metadata json in the background)

pub struct Migrator;

Expand Down Expand Up @@ -118,11 +122,15 @@ impl MigratorTrait for Migrator {
Box::new(m20230807_090847_create_histories_table::Migration),
Box::new(m20230818_163948_downcase_polygon_addresses::Migration),
Box::new(m20230821_131630_create_switch_collection_histories_table::Migration),
<<<<<<< HEAD
Box::new(m20230905_100852_add_type_to_drop::Migration),
Box::new(m20230910_204731_add_queued_variant_to_mints_status::Migration),
Box::new(m20230910_212742_make_owner_address_optional_for_mint::Migration),
Box::new(m20230911_144938_make_compressed_column_optional::Migration),
Box::new(m20230915_111128_create_mints_creation_status_idx::Migration),
=======
Box::new(m20230914_154759_add_job_trackings_table::Migration),
>>>>>>> 502f5ad (feat: create worker queue using redis for uploading metadata json in the background)
]
}
}
56 changes: 56 additions & 0 deletions migration/src/m20230914_154759_add_job_trackings_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(JobTrackings::Table)
.if_not_exists()
.col(
ColumnDef::new(JobTrackings::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(JobTrackings::JobType).string().not_null())
.col(ColumnDef::new(JobTrackings::Status).string().not_null())
.col(ColumnDef::new(JobTrackings::Payload).json().not_null())
.col(
ColumnDef::new(JobTrackings::CreatedAt)
.timestamp_with_time_zone()
.not_null(),
)
.col(
ColumnDef::new(JobTrackings::UpdatedAt)
.timestamp_with_time_zone()
.not_null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(JobTrackings::Table).to_owned())
.await
}
}

/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum JobTrackings {
Table,
Id,
Status,
JobType,
Payload,
CreatedAt,
UpdatedAt,
}

0 comments on commit f2b0a67

Please sign in to comment.