From d82b32c83748bf5196ef95509eb8a450658d9cfb Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Tue, 26 Sep 2023 15:06:00 +0200 Subject: [PATCH] refactor: update all mutations that upload metadata json to use metadata json upload job queue. also modify these mutations to use transactions and re-organize structure of mutations to make more readable. --- .env | 1 + Cargo.lock | 452 ++--------- api/Cargo.toml | 2 +- api/src/background_worker/job.rs | 58 +- api/src/background_worker/job_queue.rs | 114 ++- api/src/background_worker/mod.rs | 8 +- .../tasks/metadata_json_upload_task.rs | 717 +++++++++++++++++- api/src/background_worker/tasks/mod.rs | 50 +- api/src/background_worker/worker.rs | 127 ++-- api/src/collection.rs | 58 -- api/src/entities/collections.rs | 6 + api/src/entities/drops.rs | 8 + api/src/entities/job_trackings.rs | 37 +- api/src/entities/metadata_jsons.rs | 14 +- api/src/entities/mod.rs | 2 +- api/src/entities/sea_orm_active_enums.rs | 14 +- api/src/events.rs | 26 +- api/src/handlers.rs | 4 +- api/src/lib.rs | 15 +- api/src/main.rs | 34 +- api/src/metadata_json.rs | 170 ----- api/src/mutations/collection.rs | 253 +++--- api/src/mutations/drop.rs | 364 +++++---- api/src/mutations/mint.rs | 524 +++++++------ api/src/nft_storage.rs | 2 +- api/src/objects/metadata_json.rs | 178 +++-- migration/src/lib.rs | 10 +- ...lable_metadata_jsons_identifier_and_uri.rs | 39 + 28 files changed, 1863 insertions(+), 1424 deletions(-) delete mode 100644 api/src/collection.rs delete mode 100644 api/src/metadata_json.rs create mode 100644 migration/src/m20230922_150621_nullable_metadata_jsons_identifier_and_uri.rs diff --git a/.env b/.env index fd6cf4f..ba460bd 100644 --- a/.env +++ b/.env @@ -10,3 +10,4 @@ IPFS_ENDPOINT=https://nftstorage.link/ipfs CREDIT_SHEET=credits.toml ASSET_CDN=https://assets.holaplex.tools NFT_STORAGE_AUTH_TOKEN="" +REDIS_URL=redis://localhost:6379 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a9850dd..46b0024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -239,7 +239,7 @@ dependencies = [ "futures-util", "poem", "serde_json", - "tokio-util 0.7.8", + "tokio-util", ] [[package]] @@ -287,15 +287,6 @@ dependencies = [ "syn 2.0.28", ] -[[package]] -name = "atoi" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" -dependencies = [ - "num-traits", -] - [[package]] name = "atoi" version = "2.0.0" @@ -338,19 +329,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "bae" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33b8de67cc41132507eeece2584804efcb15f85ba516e34c944b7667f480397a" -dependencies = [ - "heck 0.3.3", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "base-x" version = "0.2.11" @@ -741,7 +719,7 @@ version = "4.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 2.0.28", @@ -770,7 +748,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "tokio-util 0.7.8", + "tokio-util", ] [[package]] @@ -1088,12 +1066,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "dtoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" - [[package]] name = "duct" version = "0.13.6" @@ -1294,17 +1266,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-intrusive" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" -dependencies = [ - "futures-core", - "lock_api", - "parking_lot 0.11.2", -] - [[package]] name = "futures-intrusive" version = "0.5.0" @@ -1313,7 +1274,7 @@ checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" dependencies = [ "futures-core", "lock_api", - "parking_lot 0.12.1", + "parking_lot", ] [[package]] @@ -1443,7 +1404,7 @@ dependencies = [ "indexmap 1.9.3", "slab", "tokio", - "tokio-util 0.7.8", + "tokio-util", "tracing", ] @@ -1520,7 +1481,7 @@ dependencies = [ "http", "httpdate", "mime", - "sha1 0.10.5", + "sha1", ] [[package]] @@ -1532,15 +1493,6 @@ dependencies = [ "http", ] -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "heck" version = "0.4.1" @@ -1632,7 +1584,7 @@ dependencies = [ "rand 0.8.5", "rdkafka", "reqwest", - "sea-orm 0.10.7", + "sea-orm", "serde_json", "serde_with", "strum 0.24.1", @@ -1699,7 +1651,7 @@ dependencies = [ "prost-types", "redis", "reqwest", - "sea-orm 0.12.2", + "sea-orm", "serde", "serde_json", "solana-program", @@ -1734,7 +1686,7 @@ checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes 1.4.0", "fnv", - "itoa 1.0.9", + "itoa", ] [[package]] @@ -1775,7 +1727,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa 1.0.9", + "itoa", "pin-project-lite", "socket2 0.4.9", "tokio", @@ -1903,12 +1855,6 @@ dependencies = [ "either", ] -[[package]] -name = "itoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" - [[package]] name = "itoa" version = "1.0.9" @@ -2504,16 +2450,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "ouroboros" -version = "0.15.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1358bd1558bd2a083fed428ffeda486fbfb323e698cdda7794259d592ca72db" -dependencies = [ - "aliasable", - "ouroboros_macro 0.15.6", -] - [[package]] name = "ouroboros" version = "0.17.2" @@ -2521,30 +2457,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2ba07320d39dfea882faa70554b4bd342a5f273ed59ba7c1c6b4c840492c954" dependencies = [ "aliasable", - "ouroboros_macro 0.17.2", + "ouroboros_macro", "static_assertions", ] -[[package]] -name = "ouroboros_macro" -version = "0.15.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f7d21ccd03305a674437ee1248f3ab5d4b1db095cf1caf49f1713ddf61956b7" -dependencies = [ - "Inflector", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "ouroboros_macro" version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec4c6225c69b4ca778c0aea097321a64c421cf4577b331c61b229267edabb6f8" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro-error", "proc-macro2", "quote", @@ -2557,17 +2480,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -2575,21 +2487,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.8", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -2600,7 +2498,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.3.5", + "redox_syscall", "smallvec", "windows-targets", ] @@ -2774,7 +2672,7 @@ dependencies = [ "http", "hyper", "mime", - "parking_lot 0.12.1", + "parking_lot", "percent-encoding", "pin-project-lite", "poem-derive", @@ -2789,7 +2687,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", - "tokio-util 0.7.8", + "tokio-util", "tracing", ] @@ -2883,7 +2781,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot 0.12.1", + "parking_lot", "protobuf", "thiserror", ] @@ -2905,7 +2803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes 1.4.0", - "heck 0.4.1", + "heck", "itertools", "lazy_static", "log", @@ -3089,33 +2987,25 @@ dependencies = [ [[package]] name = "redis" -version = "0.20.2" +version = "0.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4f0ceb2ec0dd769483ecd283f6615aa83dcd0be556d5294c6e659caefe7cc54" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" dependencies = [ "async-trait", "bytes 1.4.0", "combine", - "dtoa", "futures-util", - "itoa 0.4.8", + "itoa", "percent-encoding", "pin-project-lite", - "sha1 0.6.1", + "ryu", + "sha1_smol", + "socket2 0.4.9", "tokio", - "tokio-util 0.6.10", + "tokio-util", "url", ] -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -3207,7 +3097,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", - "tokio-util 0.7.8", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -3421,41 +3311,13 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bd3534a9978d0aa7edd2808dc1f8f31c4d0ecd31ddf71d997b3c98e9f3c9114" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro-error", "proc-macro2", "quote", "syn 2.0.28", ] -[[package]] -name = "sea-orm" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88694d01b528a94f90ad87f8d2f546d060d070eee180315c67d158cb69476034" -dependencies = [ - "async-stream", - "async-trait", - "chrono", - "futures", - "futures-util", - "log", - "ouroboros 0.15.6", - "rust_decimal", - "sea-orm-macros 0.10.7", - "sea-query 0.27.2", - "sea-query-binder 0.2.2", - "sea-strum", - "serde", - "serde_json", - "sqlx 0.6.3", - "thiserror", - "time 0.3.25", - "tracing", - "url", - "uuid", -] - [[package]] name = "sea-orm" version = "0.12.2" @@ -3468,14 +3330,14 @@ dependencies = [ "chrono", "futures", "log", - "ouroboros 0.17.2", + "ouroboros", "rust_decimal", - "sea-orm-macros 0.12.2", - "sea-query 0.30.1", - "sea-query-binder 0.5.0", + "sea-orm-macros", + "sea-query", + "sea-query-binder", "serde", "serde_json", - "sqlx 0.7.1", + "sqlx", "strum 0.25.0", "thiserror", "time 0.3.25", @@ -3501,26 +3363,13 @@ dependencies = [ "url", ] -[[package]] -name = "sea-orm-macros" -version = "0.10.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7216195de9c6b2474fd0efab486173dccd0eff21f28cc54aa4c0205d52fb3af0" -dependencies = [ - "bae", - "heck 0.3.3", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sea-orm-macros" version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd90e73d5f5b184bad525767da29fbfec132b4e62ebd6f60d2f2737ec6468f62" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "sea-bae", @@ -3538,27 +3387,13 @@ dependencies = [ "clap", "dotenvy", "futures", - "sea-orm 0.12.2", + "sea-orm", "sea-orm-cli", "sea-schema", "tracing", "tracing-subscriber", ] -[[package]] -name = "sea-query" -version = "0.27.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4f0fc4d8e44e1d51c739a68d336252a18bc59553778075d5e32649be6ec92ed" -dependencies = [ - "chrono", - "rust_decimal", - "sea-query-derive 0.2.0", - "serde_json", - "time 0.3.25", - "uuid", -] - [[package]] name = "sea-query" version = "0.30.1" @@ -3571,23 +3406,8 @@ dependencies = [ "inherent", "ordered-float", "rust_decimal", - "sea-query-derive 0.4.0", - "serde_json", - "time 0.3.25", - "uuid", -] - -[[package]] -name = "sea-query-binder" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c2585b89c985cfacfe0ec9fc9e7bb055b776c1a2581c4e3c6185af2b8bf8865" -dependencies = [ - "chrono", - "rust_decimal", - "sea-query 0.27.2", + "sea-query-derive", "serde_json", - "sqlx 0.6.3", "time 0.3.25", "uuid", ] @@ -3601,33 +3421,20 @@ dependencies = [ "bigdecimal", "chrono", "rust_decimal", - "sea-query 0.30.1", + "sea-query", "serde_json", - "sqlx 0.7.1", + "sqlx", "time 0.3.25", "uuid", ] -[[package]] -name = "sea-query-derive" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34cdc022b4f606353fe5dc85b09713a04e433323b70163e81513b141c6ae6eb5" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "syn 1.0.109", - "thiserror", -] - [[package]] name = "sea-query-derive" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd78f2e0ee8e537e9195d1049b752e0433e2cac125426bccb7b5c3e508096117" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 1.0.109", @@ -3641,7 +3448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cd9561232bd1b82ea748b581f15909d11de0db6563ddcf28c5d908aee8282f1" dependencies = [ "futures", - "sea-query 0.30.1", + "sea-query", "sea-schema-derive", ] @@ -3651,34 +3458,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6f686050f76bffc4f635cda8aea6df5548666b830b52387e8bc7de11056d11e" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 1.0.109", ] -[[package]] -name = "sea-strum" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391d06a6007842cfe79ac6f7f53911b76dfd69fc9a6769f1cf6569d12ce20e1b" -dependencies = [ - "sea-strum_macros", -] - -[[package]] -name = "sea-strum_macros" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69b4397b825df6ccf1e98bcdabef3bbcfc47ff5853983467850eeab878384f21" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", -] - [[package]] name = "seahash" version = "4.1.0" @@ -3749,7 +3534,7 @@ version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" dependencies = [ - "itoa 1.0.9", + "itoa", "ryu", "serde", ] @@ -3770,7 +3555,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 1.0.9", + "itoa", "ryu", "serde", ] @@ -3803,15 +3588,6 @@ dependencies = [ "syn 2.0.28", ] -[[package]] -name = "sha1" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" -dependencies = [ - "sha1_smol", -] - [[package]] name = "sha1" version = "0.10.5" @@ -3996,7 +3772,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "parking_lot 0.12.1", + "parking_lot", "rand 0.7.3", "rustc_version", "rustversion", @@ -4061,75 +3837,19 @@ dependencies = [ "unicode_categories", ] -[[package]] -name = "sqlx" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8de3b03a925878ed54a954f621e64bf55a3c1bd29652d0d1a17830405350188" -dependencies = [ - "sqlx-core 0.6.3", - "sqlx-macros 0.6.3", -] - [[package]] name = "sqlx" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e58421b6bc416714d5115a2ca953718f6c621a51b68e4f4922aea5a4391a721" dependencies = [ - "sqlx-core 0.7.1", - "sqlx-macros 0.7.1", + "sqlx-core", + "sqlx-macros", "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", ] -[[package]] -name = "sqlx-core" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8241483a83a3f33aa5fff7e7d9def398ff9990b2752b6c6112b83c6d246029" -dependencies = [ - "ahash 0.7.6", - "atoi 1.0.0", - "bitflags 1.3.2", - "byteorder", - "bytes 1.4.0", - "chrono", - "crc", - "crossbeam-queue", - "dotenvy", - "either", - "event-listener", - "futures-channel", - "futures-core", - "futures-intrusive 0.4.2", - "futures-util", - "hashlink", - "hex", - "indexmap 1.9.3", - "itoa 1.0.9", - "libc", - "log", - "memchr", - "num-bigint", - "once_cell", - "paste", - "percent-encoding", - "rust_decimal", - "serde", - "serde_json", - "sha2 0.10.7", - "smallvec", - "sqlformat", - "sqlx-rt", - "stringprep", - "thiserror", - "time 0.3.25", - "url", - "uuid", -] - [[package]] name = "sqlx-core" version = "0.7.1" @@ -4137,7 +3857,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd4cef4251aabbae751a3710927945901ee1d97ee96d757f6880ebb9a79bfd53" dependencies = [ "ahash 0.8.3", - "atoi 2.0.0", + "atoi", "bigdecimal", "byteorder", "bytes 1.4.0", @@ -4149,7 +3869,7 @@ dependencies = [ "event-listener", "futures-channel", "futures-core", - "futures-intrusive 0.5.0", + "futures-intrusive", "futures-io", "futures-util", "hashlink", @@ -4178,26 +3898,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "sqlx-macros" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9" -dependencies = [ - "dotenvy", - "either", - "heck 0.4.1", - "once_cell", - "proc-macro2", - "quote", - "serde_json", - "sha2 0.10.7", - "sqlx-core 0.6.3", - "sqlx-rt", - "syn 1.0.109", - "url", -] - [[package]] name = "sqlx-macros" version = "0.7.1" @@ -4206,7 +3906,7 @@ checksum = "208e3165167afd7f3881b16c1ef3f2af69fa75980897aac8874a0696516d12c2" dependencies = [ "proc-macro2", "quote", - "sqlx-core 0.7.1", + "sqlx-core", "sqlx-macros-core", "syn 1.0.109", ] @@ -4219,7 +3919,7 @@ checksum = "8a4a8336d278c62231d87f24e8a7a74898156e34c1c18942857be2acb29c7dfc" dependencies = [ "dotenvy", "either", - "heck 0.4.1", + "heck", "hex", "once_cell", "proc-macro2", @@ -4227,7 +3927,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.7", - "sqlx-core 0.7.1", + "sqlx-core", "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", @@ -4243,7 +3943,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" dependencies = [ - "atoi 2.0.0", + "atoi", "base64 0.21.2", "bigdecimal", "bitflags 2.3.3", @@ -4262,7 +3962,7 @@ dependencies = [ "hex", "hkdf", "hmac 0.12.1", - "itoa 1.0.9", + "itoa", "log", "md-5", "memchr", @@ -4272,10 +3972,10 @@ dependencies = [ "rsa", "rust_decimal", "serde", - "sha1 0.10.5", + "sha1", "sha2 0.10.7", "smallvec", - "sqlx-core 0.7.1", + "sqlx-core", "stringprep", "thiserror", "time 0.3.25", @@ -4290,7 +3990,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" dependencies = [ - "atoi 2.0.0", + "atoi", "base64 0.21.2", "bigdecimal", "bitflags 2.3.3", @@ -4307,7 +4007,7 @@ dependencies = [ "hkdf", "hmac 0.12.1", "home", - "itoa 1.0.9", + "itoa", "log", "md-5", "memchr", @@ -4317,10 +4017,10 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "sha1 0.10.5", + "sha1", "sha2 0.10.7", "smallvec", - "sqlx-core 0.7.1", + "sqlx-core", "stringprep", "thiserror", "time 0.3.25", @@ -4329,31 +4029,25 @@ dependencies = [ "whoami", ] -[[package]] -name = "sqlx-rt" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024" - [[package]] name = "sqlx-sqlite" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4c21bf34c7cae5b283efb3ac1bcc7670df7561124dc2f8bdc0b59be40f79a2" dependencies = [ - "atoi 2.0.0", + "atoi", "chrono", "flume", "futures-channel", "futures-core", "futures-executor", - "futures-intrusive 0.5.0", + "futures-intrusive", "futures-util", "libsqlite3-sys", "log", "percent-encoding", "serde", - "sqlx-core 0.7.1", + "sqlx-core", "time 0.3.25", "tracing", "url", @@ -4415,7 +4109,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "rustversion", @@ -4476,7 +4170,7 @@ checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand 2.0.0", - "redox_syscall 0.3.5", + "redox_syscall", "rustix", "windows-sys", ] @@ -4529,7 +4223,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" dependencies = [ "deranged", - "itoa 1.0.9", + "itoa", "serde", "time-core", "time-macros", @@ -4576,7 +4270,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "socket2 0.5.3", "tokio-macros", @@ -4627,20 +4321,6 @@ dependencies = [ "tungstenite", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes 1.4.0", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.8" @@ -4819,7 +4499,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "sha1 0.10.5", + "sha1", "thiserror", "url", "utf-8", diff --git a/api/Cargo.toml b/api/Cargo.toml index 68d7bf1..421db1c 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -27,7 +27,7 @@ async-graphql = { version = "5.0.4", features = [ "dataloader", "apollo_tracing", ] } -redis = { version = "0.20.0", features = ["aio"] } +redis = { version = "0.23.3", features = ["tokio-comp"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.93" solana-program = "1" diff --git a/api/src/background_worker/job.rs b/api/src/background_worker/job.rs index 5be17d8..32d6b21 100644 --- a/api/src/background_worker/job.rs +++ b/api/src/background_worker/job.rs @@ -1,45 +1,43 @@ +use serde::{ + de::{Deserialize, Deserializer, Error as DeError}, + Serialize, +}; + use super::tasks::BackgroundTask; -use serde::{Deserialize, Serialize}; -#[derive(Debug)] -pub struct Job { +#[derive(Serialize, Debug)] +pub struct Job> { pub id: i64, pub task: T, + _context_marker: std::marker::PhantomData, } -impl Serialize for Job { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let payload = self.task.payload(); - let payload_str = serde_json::to_string(&payload).map_err(serde::ser::Error::custom)?; - - let mut state = serializer.serialize_struct("Job", 3)?; - state.serialize_field("id", &self.id)?; - state.serialize_field("task", &payload_str)?; - state.end() +impl Job +where + T: Serialize + Send + Sync + BackgroundTask, +{ + #[must_use] + pub fn new(id: i64, task: T) -> Self { + Self { + id, + task, + _context_marker: std::marker::PhantomData, + } } } -impl<'de, T: BackgroundTask + for<'a> Deserialize<'a>> Deserialize<'de> for Job { +impl<'de, C, T> Deserialize<'de> for Job +where + C: Clone, + T: Serialize + Send + Sync + BackgroundTask, + T: for<'a> Deserialize<'a>, +{ fn deserialize(deserializer: D) -> Result where - D: serde::Deserializer<'de>, + D: Deserializer<'de>, { - #[derive(Deserialize)] - struct JobHelper { - id: i64, - task: String, - } - - let helper = JobHelper::deserialize(deserializer)?; + let (id, task) = Deserialize::deserialize(deserializer).map_err(DeError::custom)?; - let task: T = serde_json::from_str(&helper.task).map_err(serde::de::Error::custom)?; - - Ok(Job { - id: helper.id, - task, - }) + Ok(Job::new(id, task)) } } diff --git a/api/src/background_worker/job_queue.rs b/api/src/background_worker/job_queue.rs index 95f2ba4..d0aef49 100644 --- a/api/src/background_worker/job_queue.rs +++ b/api/src/background_worker/job_queue.rs @@ -1,10 +1,13 @@ +use std::{error::Error as StdError, fmt, sync::Arc}; + +use hub_core::{prelude::*, thiserror, tokio::sync::Mutex}; +use redis::{Client, RedisError}; +use sea_orm::{error::DbErr, ActiveModelTrait}; +use serde::{Deserialize, Serialize}; +use serde_json::Error as SerdeJsonError; + use super::{job::Job, tasks::BackgroundTask}; -use crate::db::Connection; -use redis::AsyncCommands; -use redis::Client; -use std::error::Error; -use std::fmt; -use std::sync::{Arc, Mutex}; +use crate::{db::Connection, entities::job_trackings}; #[derive(Debug)] struct LockError(String); @@ -15,65 +18,94 @@ impl fmt::Display for LockError { } } -impl Error for LockError {} +impl StdError for LockError {} +// Job queue errors +#[derive(thiserror::Error, Debug)] +pub enum JobQueueError { + #[error("Redis error: {0}")] + RedisConnection(#[from] RedisError), + #[error("Database error: {0}")] + Database(#[from] DbErr), + #[error("Serialization error: {0}")] + Serde(#[from] SerdeJsonError), + #[error("Background task error: {0}")] + BackgroundTask(#[from] Error), +} +#[derive(Clone)] pub struct JobQueue { client: Arc>, - db_pool: Arc, + db_pool: Connection, } impl JobQueue { - pub async fn new(redis_url: &str, db_pool: Connection) -> Self { - let client = Client::open(redis_url).expect("Failed to create Redis client"); - Self { - client: Arc::new(Mutex::new(client)), - db_pool: Arc::new(db_pool), - } + pub fn new(client: Arc>, db_pool: Connection) -> Self { + Self { client, db_pool } } - pub async fn enqueue( - &self, - job: &Job, - ) -> Result<(), Box> { - let client_guard = self - .client - .lock() - .map_err(|e| Box::new(LockError(e.to_string())) as Box)?; + pub async fn enqueue(&self, task: T) -> Result<(), JobQueueError> + where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + BackgroundTask, + C: Clone, + { + let client_guard = self.client.lock().await; let mut conn = client_guard.get_async_connection().await?; + let db_conn = self.db_pool.get(); - let payload = serde_json::to_string(&job.task.payload())?; + let payload = task.payload()?; + let new_job_tracking = job_trackings::Entity::create(task.name(), payload, "queued"); + + let new_job_tracking = new_job_tracking.insert(db_conn).await?; + + let job_to_enqueue = Job::new(new_job_tracking.id, task); + + let payload = serde_json::to_string(&job_to_enqueue)?; redis::cmd("LPUSH") .arg("job_queue") .arg(payload) .query_async(&mut conn) .await?; + Ok(()) } - pub async fn dequeue( - &self, - ) -> Result>, Box> { - let client_guard = self - .client - .lock() - .map_err(|e| Box::new(LockError(e.to_string())) as Box)?; - let mut conn = client_guard.get_async_connection()?; + pub async fn dequeue(&self) -> Result>, JobQueueError> + where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + BackgroundTask, + C: Clone, + { + let client_guard = self.client.lock().await; + let mut conn = client_guard.get_async_connection().await?; + let db_conn = self.db_pool.get(); - let payload: Option = redis::cmd("RPOP") + let res: Option = redis::cmd("BRPOP") .arg("job_queue") + .arg(0) .query_async(&mut conn) .await?; - if let Some(payload) = payload { - let task: Box = serde_json::from_str(&payload)?; - let job = Job { - id: generate_unique_id(), // You would need to implement this function - task, - }; - Ok(Some(job)) - } else { - Ok(None) + if let Some(job_data) = res { + let job: Job = serde_json::from_str(&job_data)?; + + let job_tracking = job_trackings::Entity::find_by_id(job.id) + .one(db_conn) + .await?; + + if let Some(job_tracking) = job_tracking { + if job_tracking.status == "completed" || job_tracking.status == "processing" { + return Ok(None); + } + + let job_tracking_am = + job_trackings::Entity::update_status(job_tracking, "processing"); + + job_tracking_am.save(db_conn).await?; + } + + return Ok(Some(job)); } + + Ok(None) } } diff --git a/api/src/background_worker/mod.rs b/api/src/background_worker/mod.rs index b5d8123..4d75334 100644 --- a/api/src/background_worker/mod.rs +++ b/api/src/background_worker/mod.rs @@ -1,4 +1,4 @@ -mod job_queue; -mod worker; -mod job; -mod tasks; \ No newline at end of file +pub mod job; +pub mod job_queue; +pub mod tasks; +pub mod worker; diff --git a/api/src/background_worker/tasks/metadata_json_upload_task.rs b/api/src/background_worker/tasks/metadata_json_upload_task.rs index a33c3e6..836e7ec 100644 --- a/api/src/background_worker/tasks/metadata_json_upload_task.rs +++ b/api/src/background_worker/tasks/metadata_json_upload_task.rs @@ -1 +1,716 @@ -use super::BackgroundTask; +use hub_core::{anyhow::Result, producer::Producer}; +use sea_orm::{prelude::*, Set}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use super::{BackgroundTask, BackgroundTaskError}; +use crate::{ + blockchains::{polygon::Polygon, solana::Solana, CollectionEvent, DropEvent}, + db::Connection, + entities::{ + collection_creators, collection_mints, collections, drops, metadata_jsons, mint_creators, + sea_orm_active_enums::Blockchain as BlockchainEnum, update_histories, + }, + mutations::collection::fetch_owner, + nft_storage::NftStorageClient, + objects::MetadataJsonInput, + proto::{ + CreateEditionTransaction, EditionInfo, MasterEdition, MetaplexMasterEditionTransaction, + MetaplexMetadata, MintMetaplexMetadataTransaction, NftEventKey, NftEvents, + UpdateEdtionTransaction, UpdateSolanaMintPayload, + }, +}; + +#[async_trait::async_trait] +trait After { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError>; +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CreateDrop { + pub drop_id: Uuid, +} + +#[async_trait::async_trait] +impl After for CreateDrop { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + let (drop, collection) = drops::Entity::find_by_id_with_collection(self.drop_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let collection = collection.ok_or(BackgroundTaskError::RecordNotFound)?; + let supply = collection.supply; + let seller_fee_basis_points = collection.seller_fee_basis_points; + + let metadata_json = metadata_jsons::Entity::find_by_id(collection.id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + let creators = collection_creators::Entity::find() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .all(conn) + .await?; + + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain) + .await + .map_err(|_| BackgroundTaskError::NoProjectWallet)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: collection.created_by.to_string(), + project_id: collection.project_id.to_string(), + }; + + match collection.blockchain { + BlockchainEnum::Solana => { + context + .solana + .event() + .create_drop( + drop.drop_type, + event_key, + MetaplexMasterEditionTransaction { + master_edition: Some(MasterEdition { + owner_address, + supply, + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + }, + ) + .await?; + }, + BlockchainEnum::Polygon => { + context + .polygon + .create_drop(drop.drop_type, event_key, CreateEditionTransaction { + amount: supply.ok_or(BackgroundTaskError::NoSupply)?, + edition_info: Some(EditionInfo { + creator: creators + .get(0) + .ok_or(BackgroundTaskError::NoCreator)? + .address + .clone(), + collection: metadata_json.name, + uri: metadata_uri, + description: metadata_json.description, + image_uri: metadata_json.image, + }), + fee_receiver: owner_address, + fee_numerator: seller_fee_basis_points.into(), + }) + .await?; + }, + BlockchainEnum::Ethereum => { + return Err(BackgroundTaskError::BlockchainNotSupported); + }, + }; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MintToCollection { + pub collection_mint_id: Uuid, +} + +#[async_trait::async_trait] +impl After for MintToCollection { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + let (collection_mint, collection) = + collection_mints::Entity::find_by_id_with_collection(self.collection_mint_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let collection = collection.ok_or(BackgroundTaskError::RecordNotFound)?; + let seller_fee_basis_points = collection.seller_fee_basis_points; + + let metadata_json = metadata_jsons::Entity::find_by_id(collection_mint.id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + let creators = mint_creators::Entity::find() + .filter(mint_creators::Column::CollectionMintId.eq(collection_mint.id)) + .all(conn) + .await?; + + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain) + .await + .map_err(|_| BackgroundTaskError::NoProjectWallet)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + let event_key = NftEventKey { + id: collection_mint.id.to_string(), + user_id: collection_mint.created_by.to_string(), + project_id: collection.project_id.to_string(), + }; + + let recipient_address = collection_mint.owner.ok_or(BackgroundTaskError::NoOwner)?; + let compressed = collection_mint.compressed.unwrap_or_default(); + + match collection.blockchain { + BlockchainEnum::Solana => { + context + .solana + .event() + .mint_to_collection(event_key, MintMetaplexMetadataTransaction { + metadata: Some(MetaplexMetadata { + owner_address, + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + recipient_address, + compressed, + collection_id: collection.id.to_string(), + }) + .await?; + }, + BlockchainEnum::Ethereum | BlockchainEnum::Polygon => { + return Err(BackgroundTaskError::BlockchainNotSupported); + }, + }; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CreateCollection { + pub collection_id: Uuid, +} + +#[async_trait::async_trait] +impl After for CreateCollection { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + + let collection = collections::Entity::find_by_id(self.collection_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let seller_fee_basis_points = collection.seller_fee_basis_points; + + let metadata_json = metadata_jsons::Entity::find_by_id(collection.id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + let creators = collection_creators::Entity::find() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .all(conn) + .await?; + + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain) + .await + .map_err(|_| BackgroundTaskError::NoProjectWallet)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: collection.created_by.to_string(), + project_id: collection.project_id.to_string(), + }; + + match collection.blockchain { + BlockchainEnum::Solana => { + context + .solana + .event() + .create_collection(event_key, MetaplexMasterEditionTransaction { + master_edition: Some(MasterEdition { + owner_address, + supply: Some(0), + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + }) + .await?; + }, + BlockchainEnum::Ethereum | BlockchainEnum::Polygon => { + return Err(BackgroundTaskError::BlockchainNotSupported); + }, + }; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct QueueMintToDrop { + pub drop_id: Uuid, + pub collection_mint_id: Uuid, +} + +#[async_trait::async_trait] +impl After for QueueMintToDrop { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + + let metadata_json = metadata_jsons::Entity::find_by_id(self.collection_mint_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct UpdateMint { + pub update_history_id: Uuid, +} + +#[async_trait::async_trait] +impl After for UpdateMint { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + + let update_history = update_histories::Entity::find() + .filter(update_histories::Column::Id.eq(self.update_history_id)) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let (collection_mint, collection) = + collection_mints::Entity::find_by_id_with_collection(update_history.mint_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let metadata_json = metadata_jsons::Entity::find_by_id(collection_mint.id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + let creators = mint_creators::Entity::find() + .filter(mint_creators::Column::CollectionMintId.eq(collection_mint.id)) + .all(conn) + .await?; + + let collection = collection.ok_or(BackgroundTaskError::RecordNotFound)?; + + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain) + .await + .map_err(|_| BackgroundTaskError::NoProjectWallet)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + match collection.blockchain { + BlockchainEnum::Solana => { + context + .solana + .event() + .update_collection_mint( + NftEventKey { + id: update_history.id.to_string(), + project_id: collection.project_id.to_string(), + user_id: update_history.created_by.to_string(), + }, + UpdateSolanaMintPayload { + metadata: Some(MetaplexMetadata { + owner_address, + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: collection.seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + collection_id: collection.id.to_string(), + mint_id: update_history.mint_id.to_string(), + }, + ) + .await?; + }, + BlockchainEnum::Ethereum | BlockchainEnum::Polygon => { + return Err(BackgroundTaskError::BlockchainNotSupported); + }, + }; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PatchCollection { + pub collection_id: Uuid, + pub updated_by_id: Uuid, +} + +#[async_trait::async_trait] +impl After for PatchCollection { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + + let collection = collections::Entity::find_by_id(self.collection_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let seller_fee_basis_points = collection.seller_fee_basis_points; + + let metadata_json = metadata_jsons::Entity::find_by_id(collection.id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + let creators = collection_creators::Entity::find() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .all(conn) + .await?; + + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain) + .await + .map_err(|_| BackgroundTaskError::NoProjectWallet)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: self.updated_by_id.to_string(), + project_id: collection.project_id.to_string(), + }; + + match collection.blockchain { + BlockchainEnum::Solana => { + context + .solana + .event() + .update_collection(event_key, MetaplexMasterEditionTransaction { + master_edition: Some(MasterEdition { + owner_address, + supply: Some(0), + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + }) + .await?; + }, + BlockchainEnum::Polygon | BlockchainEnum::Ethereum => { + return Err(BackgroundTaskError::BlockchainNotSupported); + }, + }; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PatchDrop { + pub drop_id: Uuid, + pub updated_by_id: Uuid, +} + +#[async_trait::async_trait] +impl After for PatchDrop { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + let metadata_uri = context + .nft_storage + .ipfs_endpoint + .join(&identifier)? + .to_string(); + + let conn = db.get(); + + let (drop, collection) = drops::Entity::find_by_id_with_collection(self.drop_id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + + let collection = collection.ok_or(BackgroundTaskError::RecordNotFound)?; + + let seller_fee_basis_points = collection.seller_fee_basis_points; + + let metadata_json = metadata_jsons::Entity::find_by_id(collection.id) + .one(conn) + .await? + .ok_or(BackgroundTaskError::RecordNotFound)?; + let creators = collection_creators::Entity::find() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .all(conn) + .await?; + + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain) + .await + .map_err(|_| BackgroundTaskError::NoProjectWallet)?; + + let mut metadata_json_am: metadata_jsons::ActiveModel = metadata_json.clone().into(); + + metadata_json_am.uri = Set(Some(metadata_uri.clone())); + metadata_json_am.identifier = Set(Some(identifier.clone())); + + metadata_json_am.update(conn).await?; + + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: self.updated_by_id.to_string(), + project_id: collection.project_id.to_string(), + }; + + match collection.blockchain { + BlockchainEnum::Solana => { + context + .solana + .event() + .update_drop( + drop.drop_type, + event_key, + MetaplexMasterEditionTransaction { + master_edition: Some(MasterEdition { + owner_address, + supply: collection.supply.map(TryInto::try_into).transpose()?, + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + }, + ) + .await?; + }, + BlockchainEnum::Polygon => { + context + .polygon + .event() + .update_drop(drop.drop_type, event_key, UpdateEdtionTransaction { + edition_info: Some(EditionInfo { + description: metadata_json.description, + image_uri: metadata_json.image, + collection: metadata_json.name, + uri: metadata_uri, + creator: creators + .get(0) + .ok_or(BackgroundTaskError::NoCreator)? + .address + .clone(), + }), + }) + .await?; + }, + BlockchainEnum::Ethereum => { + return Err(BackgroundTaskError::BlockchainNotSupported); + }, + }; + + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Caller { + CreateDrop(CreateDrop), + PatchDrop(PatchDrop), + MintToCollection(MintToCollection), + CreateCollection(CreateCollection), + PatchCollection(PatchCollection), + QueueMintToDrop(QueueMintToDrop), + UpdateMint(UpdateMint), +} + +#[async_trait::async_trait] +impl After for Caller { + async fn after( + &self, + db: Connection, + context: Context, + identifier: String, + ) -> Result<(), BackgroundTaskError> { + match self { + Self::CreateDrop(inner) => inner.after(db, context, identifier).await, + Self::MintToCollection(inner) => inner.after(db, context, identifier).await, + Self::CreateCollection(inner) => inner.after(db, context, identifier).await, + Self::QueueMintToDrop(inner) => inner.after(db, context, identifier).await, + Self::UpdateMint(inner) => inner.after(db, context, identifier).await, + Self::PatchCollection(inner) => inner.after(db, context, identifier).await, + Self::PatchDrop(inner) => inner.after(db, context, identifier).await, + } + } +} + +#[derive(Clone, Serialize, Debug, Deserialize)] +pub struct MetadataJsonUploadTask { + pub metadata_json: MetadataJsonInput, + pub caller: Caller, +} + +impl MetadataJsonUploadTask { + #[must_use] + pub fn new(metadata_json: MetadataJsonInput, caller: Caller) -> Self { + Self { + metadata_json, + caller, + } + } +} + +#[derive(Clone)] +pub struct Context { + nft_storage: NftStorageClient, + solana: Solana, + polygon: Polygon, + producer: Producer, +} + +impl Context { + #[must_use] + pub fn new( + nft_storage: NftStorageClient, + solana: Solana, + polygon: Polygon, + producer: Producer, + ) -> Self { + Self { + nft_storage, + solana, + polygon, + producer, + } + } +} + +#[async_trait::async_trait] +impl BackgroundTask for MetadataJsonUploadTask { + fn name(&self) -> &'static str { + "MetadataJsonUploadTask" + } + + fn payload(&self) -> Result { + serde_json::to_value(self).map_err(Into::into) + } + + async fn process(&self, db: Connection, context: Context) -> Result<(), BackgroundTaskError> { + let response = context.nft_storage.upload(&self.metadata_json).await?; + let cid = response.value.cid; + + self.caller.after(db, context.clone(), cid).await?; + + Ok(()) + } +} diff --git a/api/src/background_worker/tasks/mod.rs b/api/src/background_worker/tasks/mod.rs index 54eb749..8f94882 100644 --- a/api/src/background_worker/tasks/mod.rs +++ b/api/src/background_worker/tasks/mod.rs @@ -1,8 +1,50 @@ -use serde::{Deserialize, Serialize}; +use hub_core::{anyhow::Result, prelude::*, producer::SendError, thiserror, url}; +use sea_orm::error::DbErr; use serde_json::Value as Json; -pub trait BackgroundTask: Send + Sync + std::fmt::Debug { - fn process(&self) -> Result<(), Box>; - fn payload(&self) -> Json; +use crate::db::Connection; + +mod metadata_json_upload_task; + +#[derive(thiserror::Error, Debug)] +pub enum BackgroundTaskError { + #[error("Uri string parse error: {0}")] + UrlParse(#[from] url::ParseError), + #[error("Hub core error: {0}")] + HubCore(#[from] Error), + #[error("Database error: {0}")] + Database(#[from] DbErr), + #[error("Blockchain not supported")] + BlockchainNotSupported, + #[error("Db record not found")] + RecordNotFound, + #[error("No supply")] + NoSupply, + #[error("No owner")] + NoOwner, + #[error("No project wallet")] + NoProjectWallet, + #[error("Unable to send nft event")] + ProducerSend(#[from] SendError), + #[error("Unable to convert value: {0}")] + Conversion(#[from] std::convert::Infallible), + #[error("No creator")] + NoCreator, +} + +#[async_trait::async_trait] +pub trait BackgroundTask: Send + Sync + std::fmt::Debug { + async fn process(&self, db: Connection, context: C) -> Result<(), BackgroundTaskError>; + fn payload(&self) -> Result; + fn name(&self) -> &'static str; } +pub use metadata_json_upload_task::{ + Caller as MetadataJsonUploadCaller, Context as MetadataJsonUploadContext, + CreateCollection as MetadataJsonUploadCreateCollection, + CreateDrop as MetadataJsonUploadCreateDrop, MetadataJsonUploadTask, + MintToCollection as MetadataJsonUploadMintToCollection, + PatchCollection as MetadataJsonUploadPatchCollection, PatchDrop as MetadataJsonUploadPatchDrop, + QueueMintToDrop as MetadataJsonUploadQueueMintToDrop, + UpdateMint as MetadataJsonUploadUpdateMint, +}; diff --git a/api/src/background_worker/worker.rs b/api/src/background_worker/worker.rs index 1a2f7f1..e79dee7 100644 --- a/api/src/background_worker/worker.rs +++ b/api/src/background_worker/worker.rs @@ -1,79 +1,88 @@ -use super::job_queue::JobQueue; -use crate::db::Connection; use hub_core::{ - tokio, + thiserror, tokio, tracing::{error, info}, }; -use std::sync::Arc; +use sea_orm::{error::DbErr, ActiveModelTrait}; +use serde::{Deserialize, Serialize}; -pub struct Worker { - job_queue: Arc, - db_pool: Arc, +use super::{ + job_queue::{JobQueue, JobQueueError}, + tasks::BackgroundTask, +}; +use crate::{db::Connection, entities::job_trackings}; + +#[derive(thiserror::Error, Debug)] +pub enum WorkerError { + #[error("Job queue error: {0}")] + JobQueue(#[from] JobQueueError), + #[error("Database error: {0}")] + Database(#[from] DbErr), +} +pub struct Worker> { + job_queue: JobQueue, + db_pool: Connection, + context: C, + _task_marker: std::marker::PhantomData, } -impl Worker { - pub fn new(job_queue: Arc, db_pool: Connection) -> Self { +impl Worker +where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + BackgroundTask, + C: Clone, +{ + pub fn new(job_queue: JobQueue, db_pool: Connection, context: C) -> Self { Self { job_queue, - db_pool: Arc::new(db_pool), + db_pool, + context, + _task_marker: std::marker::PhantomData, } } - pub async fn start(&self) { + pub async fn start(&self) -> Result<(), WorkerError> { loop { - if let Ok(Some(mut job)) = self.job_queue.dequeue().await { - let job_queue_clone = self.job_queue.clone(); - let job_id = job.id; - let db_pool_clone = Arc::clone(&self.db_pool); - tokio::spawn(async move { - if JobTracking::find_by_id(job_id, &db_pool_clone) - .await? - .is_none() + // Dequeue the next job to process + let job_option = self.job_queue.dequeue::().await?; + let db_conn = self.db_pool.get(); + + if let Some(job) = job_option { + // Process the job + let model = job_trackings::Entity::find_by_id(job.id) + .one(db_conn) + .await?; + + if let Some(model) = model { + match job + .task + .process(self.db_pool.clone(), self.context.clone()) + .await { + Ok(_) => { + // If successful, update the status in the job_trackings table to "completed" + let job_tracking_am = + job_trackings::Entity::update_status(model, "completed"); - // Create a new record in the database - JobTracking::create( - job_id, - "JobType", - job.task.payload(), - "processing", - &db_pool_clone, - ) - .await?; + job_tracking_am.update(db_conn).await?; - // sora elle espinola - // Process the job using the trait method - match job.task.process() { - Ok(_) => { - // Update the job status in the database to "completed" - JobTracking::update_status(&job, "completed", &db_pool_clone) - .await - .unwrap(); - }, - Err(e) => { - println!("Job processing failed: {}", e); + info!("Successfully processed job {}", job.id); + }, + Err(e) => { + // If an error occurs, update the status in the job_trackings table to "failed" + let job_tracking_am = + job_trackings::Entity::update_status(model, "failed"); - // Re-queue the job and update the job status in the database to "queued" - job_queue_clone - .enqueue(&job) - .await - .expect("Failed to re-queue job"); - JobTracking::update_status(&job, "queued", &db_pool_clone) - .await - .unwrap(); - }, - } - } else { - info!("Duplicate job detected, skipping: {}", job_id); - } - Ok::<(), sea_orm::DbErr>(()) - }) - .await - .unwrap_or_else(|e| { - error!("An error occurred: {}", e); + job_tracking_am.update(db_conn).await?; - Ok::<(), sea_orm::DbErr>(()) - }); + // Log the error (or handle it in some other way) + error!("Error processing job {}: {}", job.id, e); + }, + } + } else { + error!("Job tracking record not found for job {}", job.id); + } + } else { + // If no job was dequeued, you might want to add a delay here to avoid busy-waiting + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } } diff --git a/api/src/collection.rs b/api/src/collection.rs deleted file mode 100644 index 0bb22d6..0000000 --- a/api/src/collection.rs +++ /dev/null @@ -1,58 +0,0 @@ -use hub_core::anyhow::Result; -use sea_orm::{prelude::*, Set}; - -use crate::{ - db::Connection, - entities::{ - collection_creators::ActiveModel as CollectionCreatorActiveModel, - collections::{ActiveModel, Model}, - }, - objects::Creator, -}; - -#[derive(Debug, Clone)] -pub struct Collection { - collection: ActiveModel, - creators: Option>, -} - -impl Collection { - #[must_use] - pub fn new(collection: ActiveModel) -> Self { - Self { - collection, - creators: None, - } - } - - pub fn creators(&mut self, creators: Vec) -> &Collection { - self.creators = Some(creators); - - self - } - - /// Res - /// - /// # Errors - /// This function fails if unable to save `collection` or `collection_creators` to the db - pub async fn save(&self, db: &Connection) -> Result { - let conn = db.get(); - - let collection = self.collection.clone().insert(conn).await?; - - let creators = self.creators.clone().unwrap_or_default(); - - for creator in creators { - let am = CollectionCreatorActiveModel { - collection_id: Set(collection.id), - address: Set(creator.address), - verified: Set(creator.verified.unwrap_or_default()), - share: Set(creator.share.try_into()?), - }; - - am.insert(conn).await?; - } - - Ok(collection) - } -} diff --git a/api/src/entities/collections.rs b/api/src/entities/collections.rs index a414d18..2c87027 100644 --- a/api/src/entities/collections.rs +++ b/api/src/entities/collections.rs @@ -64,3 +64,9 @@ impl Related for Entity { Relation::MintHistories.def() } } + +impl Entity { + pub fn find_by_id(id: Uuid) -> Select { + Self::find().filter(Column::Id.eq(id)) + } +} diff --git a/api/src/entities/drops.rs b/api/src/entities/drops.rs index ed5a48e..0310732 100644 --- a/api/src/entities/drops.rs +++ b/api/src/entities/drops.rs @@ -42,3 +42,11 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_by_id_with_collection( + id: Uuid, + ) -> sea_orm::SelectTwo { + Self::find_by_id(id).select_also(super::collections::Entity) + } +} diff --git a/api/src/entities/job_trackings.rs b/api/src/entities/job_trackings.rs index dfc1b15..b07532b 100644 --- a/api/src/entities/job_trackings.rs +++ b/api/src/entities/job_trackings.rs @@ -1,4 +1,5 @@ -use sea_orm::entity::prelude::*; +use hub_core::chrono; +use sea_orm::{entity::prelude::*, Set}; use serde_json::Value as Json; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] @@ -16,4 +17,36 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} -impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file +impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + // Find a job tracking record by its ID + pub fn find_by_id(id: i64) -> Select { + Self::find().filter(Column::Id.eq(id)) + } + + // Create a new job tracking record + pub fn create(job_type: &str, payload: Json, status: &str) -> ActiveModel { + let now: DateTimeWithTimeZone = chrono::Utc::now().into(); + + let active_model = ActiveModel { + job_type: Set(job_type.to_string()), + payload: Set(payload), + status: Set(status.to_string()), + created_at: Set(now), + updated_at: Set(now), + ..Default::default() + }; + + active_model + } + + // Update the status of an existing job tracking record + pub fn update_status(model: Model, new_status: &str) -> ActiveModel { + let mut active_model: ActiveModel = model.into(); + + active_model.status = Set(new_status.to_string()); + + active_model + } +} diff --git a/api/src/entities/metadata_jsons.rs b/api/src/entities/metadata_jsons.rs index 3afb243..01e7c48 100644 --- a/api/src/entities/metadata_jsons.rs +++ b/api/src/entities/metadata_jsons.rs @@ -11,11 +11,13 @@ use sea_orm::entity::prelude::*; pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub id: Uuid, - pub identifier: String, + #[sea_orm(nullable)] + pub identifier: Option, /// The assigned name of the NFT. pub name: String, /// The URI for the complete metadata JSON. - pub uri: String, + #[sea_orm(nullable)] + pub uri: Option, /// The symbol of the NFT. pub symbol: String, /// The description of the NFT. @@ -23,8 +25,10 @@ pub struct Model { /// The image URI for the NFT. pub image: String, /// An optional animated version of the NFT art. + #[sea_orm(nullable)] pub animation_url: Option, /// An optional URL where viewers can find more information on the NFT, such as the collection's homepage or Twitter page. + #[sea_orm(nullable)] pub external_url: Option, } @@ -49,3 +53,9 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_by_id(id: Uuid) -> Select { + Self::find().filter(Column::Id.eq(id)) + } +} diff --git a/api/src/entities/mod.rs b/api/src/entities/mod.rs index 4815af4..7ac87a3 100644 --- a/api/src/entities/mod.rs +++ b/api/src/entities/mod.rs @@ -8,6 +8,7 @@ pub mod collection_mints; pub mod collections; pub mod customer_wallets; pub mod drops; +pub mod job_trackings; pub mod metadata_json_attributes; pub mod metadata_json_files; pub mod metadata_jsons; @@ -19,4 +20,3 @@ pub mod sea_orm_active_enums; pub mod switch_collection_histories; pub mod transfer_charges; pub mod update_histories; -pub mod job_trackings; diff --git a/api/src/entities/sea_orm_active_enums.rs b/api/src/entities/sea_orm_active_enums.rs index 69989a4..20e827f 100644 --- a/api/src/entities/sea_orm_active_enums.rs +++ b/api/src/entities/sea_orm_active_enums.rs @@ -47,7 +47,19 @@ pub enum CreationStatus { Queued, } -#[derive(Debug, Clone, Default, PartialEq, Eq, EnumIter, DeriveActiveEnum, Enum, Copy)] +#[derive( + Debug, + Clone, + Default, + PartialEq, + Eq, + EnumIter, + DeriveActiveEnum, + Enum, + Copy, + Serialize, + Deserialize, +)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "drop_type")] pub enum DropType { #[default] diff --git a/api/src/events.rs b/api/src/events.rs index bd73902..9cacdb5 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -1,5 +1,5 @@ use hub_core::{ - chrono::{DateTime, NaiveDateTime, Offset, Utc}, + chrono::{NaiveDateTime, TimeZone, Utc}, credits::{CreditsClient, TransactionId}, metrics::KeyValue, prelude::*, @@ -365,13 +365,13 @@ impl Processor { let metadata_json = metadata_jsons::ActiveModel { id: Set(id.parse()?), name: Set(name), - uri: Set(uri), + uri: Set(Some(uri)), symbol: Set(symbol), description: Set(description.unwrap_or_default()), image: Set(image), animation_url: Set(None), external_url: Set(None), - identifier: Set(String::new()), + identifier: Set(Some(String::new())), }; let json_model = metadata_json.insert(self.db.get()).await?; @@ -420,8 +420,10 @@ impl Processor { image, } = metadata.ok_or(ProcessorErrorKind::MissingCollectionMetadata)?; + let id = id.parse()?; + let mint_am = collection_mints::ActiveModel { - id: Set(id.parse()?), + id: Set(id), collection_id: Set(collection_id.parse()?), address: Set(Some(mint_address)), owner: Set(Some(owner)), @@ -440,15 +442,15 @@ impl Processor { let mint_model = mint_am.insert(self.db.get()).await?; let metadata_json = metadata_jsons::ActiveModel { - id: Set(id.parse()?), + id: Set(id), name: Set(name), - uri: Set(uri), + uri: Set(Some(uri)), symbol: Set(symbol), description: Set(description.unwrap_or_default()), image: Set(image), animation_url: Set(None), external_url: Set(None), - identifier: Set(String::new()), + identifier: Set(Some(String::new())), }; let json_model = metadata_json.insert(self.db.get()).await?; @@ -526,10 +528,10 @@ impl Processor { let created_at = timestamp .and_then(|t| { - Some(DateTime::from_naive_utc_and_offset( - NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos.try_into().ok()?)?, - Utc.fix(), - )) + let naive_datetime = + NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos.try_into().ok()?)?; + + Some(Utc.from_utc_datetime(&naive_datetime)) }) .ok_or(ProcessorErrorKind::InvalidTimestamp)?; @@ -556,7 +558,7 @@ impl Processor { collection_mint_id: Set(mint.id), sender: Set(mint.owner.ok_or(ProcessorErrorKind::RecordMissingOwner)?), recipient: Set(new_owner.clone()), - created_at: Set(created_at), + created_at: Set(created_at.into()), ..Default::default() }; diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 5c3aa37..f79bba5 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -42,8 +42,8 @@ pub async fn graphql_handler( .data(state.credits.clone()) .data(state.solana.clone()) .data(state.polygon.clone()) - .data(state.nft_storage.clone()) - .data(state.asset_proxy.clone()), + .data(state.asset_proxy.clone()) + .data(state.metadata_json_upload_job_queue.clone()), ) .await .into()) diff --git a/api/src/lib.rs b/api/src/lib.rs index a365c0a..7d823ea 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -2,26 +2,25 @@ #![warn(clippy::pedantic, clippy::cargo)] #![allow(clippy::module_name_repetitions)] +pub mod background_worker; pub mod blockchains; -pub mod collection; pub mod dataloaders; pub mod db; pub mod entities; pub mod events; pub mod handlers; -pub mod metadata_json; pub mod metrics; pub mod mutations; pub mod nft_storage; pub mod objects; pub mod queries; -pub mod background_worker; use async_graphql::{ dataloader::DataLoader, extensions::{ApolloTracing, Logger}, EmptySubscription, Schema, }; +use background_worker::job_queue::JobQueue; use blockchains::{polygon::Polygon, solana::Solana}; use dataloaders::{ CollectionDropLoader, CollectionLoader, CollectionMintHistoriesLoader, CollectionMintLoader, @@ -45,7 +44,6 @@ use hub_core::{ }; use metrics::Metrics; use mutations::Mutation; -use nft_storage::NftStorageClient; use poem::{async_trait, FromRequest, Request, RequestBody}; use queries::Query; @@ -117,6 +115,9 @@ pub struct Args { #[command(flatten)] pub nft_storage: nft_storage::NftStorageArgs, + + #[arg(long, env)] + pub redis_url: String, } #[derive(Debug, Clone, Copy)] @@ -236,8 +237,8 @@ pub struct AppState { pub credits: CreditsClient, pub solana: Solana, pub polygon: Polygon, - pub nft_storage: NftStorageClient, pub asset_proxy: AssetProxy, + pub metadata_json_upload_job_queue: JobQueue, } impl AppState { @@ -250,8 +251,8 @@ impl AppState { credits: CreditsClient, solana: Solana, polygon: Polygon, - nft_storage: NftStorageClient, asset_proxy: AssetProxy, + metadata_json_upload_job_queue: JobQueue, ) -> Self { Self { schema, @@ -260,8 +261,8 @@ impl AppState { credits, solana, polygon, - nft_storage, asset_proxy, + metadata_json_upload_job_queue, } } } diff --git a/api/src/main.rs b/api/src/main.rs index b7c3369..7f67077 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -1,6 +1,13 @@ //! +use std::sync::Arc; + use holaplex_hub_nfts::{ + background_worker::{ + job_queue::JobQueue, + tasks::{MetadataJsonUploadContext, MetadataJsonUploadTask}, + worker::Worker, + }, blockchains::{polygon::Polygon, solana::Solana}, build_schema, db::Connection, @@ -12,6 +19,7 @@ use holaplex_hub_nfts::{ }; use hub_core::{prelude::*, tokio}; use poem::{get, listener::TcpListener, middleware::AddData, post, EndpointExt, Route, Server}; +use redis::Client as RedisClient; pub fn main() { let opts = hub_core::StartConfig { @@ -23,6 +31,7 @@ pub fn main() { port, db, nft_storage, + redis_url, } = args; common.rt.block_on(async move { @@ -52,19 +61,38 @@ pub fn main() { let solana = Solana::new(producer.clone()); let polygon = Polygon::new(producer.clone()); + let redis_client = RedisClient::open(redis_url)?; + let redis_client = Arc::new(tokio::sync::Mutex::new(redis_client)); + + let metadata_json_upload_task_context = MetadataJsonUploadContext::new( + nft_storage, + solana.clone(), + polygon.clone(), + producer.clone(), + ); + + let job_queue = JobQueue::new(redis_client, connection.clone()); + let worker = Worker::::new( + job_queue.clone(), + connection.clone(), + metadata_json_upload_task_context, + ); + let state = AppState::new( schema, connection.clone(), producer.clone(), credits.clone(), - solana, - polygon, - nft_storage, + solana.clone(), + polygon.clone(), common.asset_proxy, + job_queue.clone(), ); let cons = common.consumer_cfg.build::().await?; + tokio::spawn(async move { worker.start().await }); + tokio::spawn(async move { cons.consume( |b| { diff --git a/api/src/metadata_json.rs b/api/src/metadata_json.rs deleted file mode 100644 index 9c95854..0000000 --- a/api/src/metadata_json.rs +++ /dev/null @@ -1,170 +0,0 @@ -use hub_core::{anyhow::Result, prelude::anyhow, uuid::Uuid}; -use metadata_jsons::Column as MetadataJsonColumn; -use sea_orm::{prelude::*, sea_query::OnConflict, Set, TransactionTrait}; - -use crate::{ - db::Connection, - entities::{ - metadata_json_attributes, metadata_json_files, metadata_jsons, - prelude::{MetadataJsonAttributes, MetadataJsonFiles, MetadataJsons}, - }, - nft_storage::NftStorageClient, - objects::MetadataJsonInput, -}; -#[derive(Clone, Debug)] -pub struct MetadataJson { - pub metadata_json: MetadataJsonInput, - pub uri: Option, - pub identifier: Option, -} - -impl MetadataJson { - #[must_use] - pub fn new(metadata_json: MetadataJsonInput) -> Self { - Self { - metadata_json, - uri: None, - identifier: None, - } - } - - /// Fetches metadata from the database and constructs a `Self` instance. - /// - /// # Arguments - /// - /// * `id` - The ID of the metadata to fetch. - /// * `db` - The database connection to use. - /// - /// # Errors - /// - /// This function fails if there is no matching `metadata_json` entry in the database - /// or if it is unable to fetch related data from the database - pub async fn fetch(id: Uuid, db: &Connection) -> Result { - let (metadata_json_model, attributes) = metadata_jsons::Entity::find_by_id(id) - .find_with_related(MetadataJsonAttributes) - .all(db.get()) - .await? - .first() - .map(ToOwned::to_owned) - .ok_or(anyhow!("no metadata_json entry found in db"))?; - - let files = metadata_json_files::Entity::find() - .filter(metadata_json_files::Column::MetadataJsonId.eq(id)) - .all(db.get()) - .await?; - - let metadata_json = (metadata_json_model.clone(), attributes, Some(files)).into(); - - Ok(Self { - metadata_json, - uri: Some(metadata_json_model.uri.clone()), - identifier: Some(metadata_json_model.identifier), - }) - } - - /// Res - /// - /// # Errors - /// This function fails if unable to upload `metadata_json` to nft.storage - pub async fn upload(&mut self, nft_storage: &NftStorageClient) -> Result<&Self> { - let response = nft_storage.upload(self.metadata_json.clone()).await?; - let cid = response.value.cid; - - let uri = nft_storage.ipfs_endpoint.join(&cid)?.to_string(); - - self.uri = Some(uri); - self.identifier = Some(cid); - - Ok(self) - } - - /// Res - /// - /// # Errors - /// This function fails if unable to save `metadata_json` to the db - pub async fn save(&self, id: Uuid, db: &Connection) -> Result { - let payload = self.metadata_json.clone(); - let identifier = self - .identifier - .clone() - .ok_or_else(|| anyhow!("no identifier. call #upload before #save"))?; - let uri = self - .uri - .clone() - .ok_or_else(|| anyhow!("no uri. call #upload before #save"))?; - - let metadata_json_active_model = metadata_jsons::ActiveModel { - id: Set(id), - identifier: Set(identifier), - name: Set(payload.name), - uri: Set(uri), - symbol: Set(payload.symbol), - description: Set(payload.description), - image: Set(payload.image), - animation_url: Set(payload.animation_url), - external_url: Set(payload.external_url), - }; - - let metadata_json = MetadataJsons::insert(metadata_json_active_model) - .on_conflict( - OnConflict::column(MetadataJsonColumn::Id) - .update_columns([ - MetadataJsonColumn::Identifier, - MetadataJsonColumn::Name, - MetadataJsonColumn::Uri, - MetadataJsonColumn::Symbol, - MetadataJsonColumn::Description, - MetadataJsonColumn::Image, - MetadataJsonColumn::AnimationUrl, - MetadataJsonColumn::ExternalUrl, - ]) - .clone(), - ) - .exec_with_returning(db.get()) - .await?; - - let tx = db.get().clone().begin().await?; - - MetadataJsonAttributes::delete_many() - .filter(metadata_json_attributes::Column::MetadataJsonId.eq(metadata_json.id)) - .exec(&tx) - .await?; - - for attribute in payload.attributes { - let am = metadata_json_attributes::ActiveModel { - metadata_json_id: Set(metadata_json.id), - trait_type: Set(attribute.trait_type), - value: Set(attribute.value), - ..Default::default() - }; - - am.insert(&tx).await?; - } - - tx.commit().await?; - - if let Some(files) = payload.properties.unwrap_or_default().files { - let tx = db.get().clone().begin().await?; - - MetadataJsonFiles::delete_many() - .filter(metadata_json_files::Column::MetadataJsonId.eq(metadata_json.id)) - .exec(&tx) - .await?; - - for file in files { - let metadata_json_file_am = metadata_json_files::ActiveModel { - metadata_json_id: Set(metadata_json.id), - uri: Set(file.uri), - file_type: Set(file.file_type), - ..Default::default() - }; - - metadata_json_file_am.insert(&tx).await?; - } - - tx.commit().await?; - } - - Ok(metadata_json) - } -} diff --git a/api/src/mutations/collection.rs b/api/src/mutations/collection.rs index d1c0476..ab23c2f 100644 --- a/api/src/mutations/collection.rs +++ b/api/src/mutations/collection.rs @@ -12,8 +12,14 @@ use sea_orm::{prelude::*, ModelTrait, Set, TransactionTrait}; use serde::{Deserialize, Serialize}; use crate::{ - blockchains::{polygon::Polygon, solana::Solana, CollectionEvent}, - collection::Collection, + background_worker::{ + job_queue::JobQueue, + tasks::{ + MetadataJsonUploadCaller, MetadataJsonUploadCreateCollection, + MetadataJsonUploadPatchCollection, MetadataJsonUploadTask, + }, + }, + blockchains::{solana::Solana, CollectionEvent}, entities::{ collection_creators, collection_mints, collections, metadata_jsons, prelude::{CollectionCreators, CollectionMints, Collections, Drops, MetadataJsons}, @@ -21,14 +27,13 @@ use crate::{ sea_orm_active_enums::{Blockchain, Blockchain as BlockchainEnum, CreationStatus}, switch_collection_histories, }, - metadata_json::MetadataJson, objects::{Collection as CollectionObject, CollectionMint, Creator, MetadataJsonInput}, proto::{ nft_events::Event as NftEvent, CollectionCreation, CollectionImport, CreationStatus as NftCreationStatus, Creator as ProtoCreator, MasterEdition, MetaplexMasterEditionTransaction, NftEventKey, NftEvents, }, - Actions, AppContext, NftStorageClient, + Actions, AppContext, }; #[derive(Default)] @@ -61,9 +66,8 @@ impl Mutation { let conn = db.get(); let credits = ctx.data::>()?; - let solana = ctx.data::()?; - let nft_storage = ctx.data::()?; let nfts_producer = ctx.data::>()?; + let metadata_json_upload_job_queue = ctx.data::()?; let owner_address = fetch_owner(conn, input.project, input.blockchain).await?; @@ -83,6 +87,8 @@ impl Mutation { ) .await?; + let tx = conn.begin().await?; + let collection_am = collections::ActiveModel { blockchain: Set(input.blockchain), supply: Set(Some(0)), @@ -94,48 +100,33 @@ impl Mutation { ..Default::default() }; - let collection = Collection::new(collection_am) - .creators(input.creators.clone()) - .save(db) - .await?; + let collection = collection_am.insert(&tx).await?; - let metadata_json = MetadataJson::new(input.metadata_json) - .upload(nft_storage) - .await? - .save(collection.id, db) - .await?; + for creator in input.creators { + let am = collection_creators::ActiveModel { + collection_id: Set(collection.id), + address: Set(creator.address), + verified: Set(creator.verified.unwrap_or_default()), + share: Set(creator.share.try_into()?), + }; - let event_key = NftEventKey { - id: collection.id.to_string(), - user_id: user_id.to_string(), - project_id: input.project.to_string(), - }; + am.insert(&tx).await?; + } - match input.blockchain { - BlockchainEnum::Solana => { - solana - .event() - .create_collection(event_key, MetaplexMasterEditionTransaction { - master_edition: Some(MasterEdition { - owner_address, - supply: Some(0), - name: metadata_json.name, - symbol: metadata_json.symbol, - metadata_uri: metadata_json.uri, - seller_fee_basis_points: 0, - creators: input - .creators - .into_iter() - .map(TryFrom::try_from) - .collect::>()?, - }), - }) - .await?; - }, - BlockchainEnum::Ethereum | BlockchainEnum::Polygon => { - return Err(Error::new("blockchain not supported as this time")); - }, - }; + input.metadata_json.save(collection.id, &tx).await?; + + tx.commit().await?; + + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask { + metadata_json: input.metadata_json, + caller: MetadataJsonUploadCaller::CreateCollection( + MetadataJsonUploadCreateCollection { + collection_id: collection.id, + }, + ), + }) + .await?; nfts_producer .send( @@ -185,7 +176,7 @@ impl Mutation { let collection = Collections::find() .filter(collections::Column::Id.eq(input.id)) - .one(db.get()) + .one(conn) .await? .ok_or(Error::new("collection not found"))?; @@ -197,6 +188,10 @@ impl Mutation { .one(conn) .await? .ok_or(Error::new("metadata json not found"))?; + let metadata_uri = metadata_json + .uri + .ok_or(Error::new("metadata uri not found"))?; + let creators = CollectionCreators::find() .filter(collection_creators::Column::CollectionId.eq(collection.id)) .all(conn) @@ -229,7 +224,7 @@ impl Mutation { owner_address, name: metadata_json.name, symbol: metadata_json.symbol, - metadata_uri: metadata_json.uri, + metadata_uri, seller_fee_basis_points: 0, supply: Some(0), creators: creators @@ -264,7 +259,7 @@ impl Mutation { let AppContext { db, user_id, .. } = ctx.data::()?; let user_id = user_id.0.ok_or(Error::new("X-USER-ID header not found"))?; - let txn = db.get().begin().await?; + let conn = db.get(); validate_solana_address(&input.collection)?; @@ -274,9 +269,11 @@ impl Mutation { .eq(input.collection.clone()) .and(collections::Column::ProjectId.eq(input.project)), ) - .one(db.get()) + .one(conn) .await?; + let txn = conn.begin().await?; + if let Some(collection) = collection.clone() { let mints = CollectionMints::find() .filter(collection_mints::Column::CollectionId.eq(collection.id)) @@ -334,22 +331,22 @@ impl Mutation { input: PatchCollectionInput, ) -> Result { let PatchCollectionInput { - id: _, metadata_json, creators, + .. } = input; - let AppContext { db, user_id, .. } = ctx.data::()?; - let conn = db.get(); - let nft_storage = ctx.data::()?; + let solana = ctx.data::()?; - let _polygon = ctx.data::()?; + let metadata_json_upload_job_queue = ctx.data::()?; let user_id = user_id.0.ok_or(Error::new("X-USER-ID header not found"))?; + let conn = db.get(); + let collection = Collections::find() .filter(collections::Column::Id.eq(input.id)) - .one(db.get()) + .one(conn) .await? .ok_or(Error::new("collection not found"))?; @@ -366,13 +363,21 @@ impl Mutation { validate_json(collection.blockchain, metadata_json)?; } + let metadata_json_model = metadata_jsons::Entity::find() + .filter(metadata_jsons::Column::Id.eq(collection.id)) + .one(conn) + .await? + .ok_or(Error::new("metadata json not found"))?; + let current_creators = collection_creators::Entity::find() .filter(collection_creators::Column::CollectionId.eq(collection.id)) .all(conn) .await?; - if let Some(creators) = creators.clone() { - let creators = creators + let tx = conn.begin().await?; + + let creators: Vec = if let Some(creators) = creators { + let creator_ams = creators .clone() .into_iter() .map(|creator| { @@ -385,77 +390,74 @@ impl Mutation { }) .collect::>>()?; - conn.transaction::<_, (), DbErr>(|txn| { - Box::pin(async move { - collection_creators::Entity::delete_many() - .filter(collection_creators::Column::CollectionId.eq(collection.id)) - .exec(txn) - .await?; - - collection_creators::Entity::insert_many(creators) - .exec(txn) - .await?; - - Ok(()) - }) - }) - .await?; - } - - let metadata_json_model = metadata_jsons::Entity::find() - .filter(metadata_jsons::Column::Id.eq(collection.id)) - .one(conn) - .await? - .ok_or(Error::new("metadata json not found"))?; + collection_creators::Entity::delete_many() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .exec(&tx) + .await?; - let metadata_json_model = if let Some(metadata_json) = metadata_json { - metadata_json_model.clone().delete(conn).await?; + collection_creators::Entity::insert_many(creator_ams) + .exec(&tx) + .await?; - MetadataJson::new(metadata_json.clone()) - .upload(nft_storage) - .await? - .save(collection.id, db) - .await? + creators + .into_iter() + .map(TryFrom::try_from) + .collect::>()? } else { - metadata_json_model + current_creators.into_iter().map(Into::into).collect() }; - let event_key = NftEventKey { - id: collection.id.to_string(), - user_id: user_id.to_string(), - project_id: collection.project_id.to_string(), - }; + if let Some(metadata_json) = metadata_json { + metadata_json_model.delete(&tx).await?; - match collection.blockchain { - BlockchainEnum::Solana => { - let creators = if let Some(creators) = creators.clone() { - creators - .into_iter() - .map(TryFrom::try_from) - .collect::>()? - } else { - current_creators.into_iter().map(Into::into).collect() - }; + metadata_json.save(collection.id, &tx).await?; - solana - .event() - .update_collection(event_key, MetaplexMasterEditionTransaction { - master_edition: Some(MasterEdition { - owner_address, - supply: Some(0), - name: metadata_json_model.name, - symbol: metadata_json_model.symbol, - metadata_uri: metadata_json_model.uri, - seller_fee_basis_points: 0, - creators, - }), - }) - .await?; - }, - BlockchainEnum::Polygon | BlockchainEnum::Ethereum => { - return Err(Error::new("blockchain not supported yet")); - }, - }; + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask { + metadata_json, + caller: MetadataJsonUploadCaller::PatchCollection( + MetadataJsonUploadPatchCollection { + collection_id: collection.id, + updated_by_id: user_id, + }, + ), + }) + .await?; + } else { + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: user_id.to_string(), + project_id: collection.project_id.to_string(), + }; + + let metadata_uri = metadata_json_model + .uri + .ok_or(Error::new("metadata uri not found"))?; + + match collection.blockchain { + BlockchainEnum::Solana => { + solana + .event() + .update_collection(event_key, MetaplexMasterEditionTransaction { + master_edition: Some(MasterEdition { + owner_address, + supply: Some(0), + name: metadata_json_model.name, + symbol: metadata_json_model.symbol, + metadata_uri, + seller_fee_basis_points: 0, + creators, + }), + }) + .await?; + }, + BlockchainEnum::Polygon | BlockchainEnum::Ethereum => { + return Err(Error::new("blockchain not supported as this time")); + }, + }; + } + + tx.commit().await?; Ok(PatchCollectionPayload { collection: collection.into(), @@ -485,6 +487,7 @@ impl Mutation { } = ctx.data::()?; let credits = ctx.data::>()?; let solana = ctx.data::()?; + let conn = db.get(); let user_id = user_id.0.ok_or(Error::new("X-USER-ID header not found"))?; let org_id = organization_id @@ -492,7 +495,7 @@ impl Mutation { .ok_or(Error::new("X-ORG-ID header not found"))?; let balance = balance.0.ok_or(Error::new("X-BALANCE header not found"))?; let (mint, collection) = CollectionMints::find_by_id_with_collection(mint) - .one(db.get()) + .one(conn) .await? .ok_or(Error::new("Mint not found"))?; @@ -500,7 +503,7 @@ impl Mutation { let new_collection = Collections::find() .filter(collections::Column::Address.eq(collection_address.to_string())) - .one(db.get()) + .one(conn) .await? .ok_or(Error::new("Collection not found"))?; @@ -514,7 +517,7 @@ impl Mutation { if new_collection .find_related(Drops) - .one(db.get()) + .one(conn) .await? .is_some() { @@ -551,7 +554,7 @@ impl Mutation { ..Default::default() }; - let history = history_am.insert(db.get()).await?; + let history = history_am.insert(conn).await?; solana .event() diff --git a/api/src/mutations/drop.rs b/api/src/mutations/drop.rs index 63882aa..c8eb56f 100644 --- a/api/src/mutations/drop.rs +++ b/api/src/mutations/drop.rs @@ -9,21 +9,26 @@ use serde::{Deserialize, Serialize}; use super::collection::{validate_creators, validate_json, validate_solana_creator_verification}; use crate::{ + background_worker::{ + job_queue::JobQueue, + tasks::{ + MetadataJsonUploadCaller, MetadataJsonUploadCreateDrop, MetadataJsonUploadPatchDrop, + MetadataJsonUploadTask, + }, + }, blockchains::{polygon::Polygon, solana::Solana, DropEvent}, - collection::Collection, entities::{ collection_creators, collections, drops, metadata_jsons, prelude::{CollectionCreators, Collections, Drops, MetadataJsons}, project_wallets, sea_orm_active_enums::{Blockchain as BlockchainEnum, CreationStatus, DropType}, }, - metadata_json::MetadataJson, objects::{Creator, Drop, MetadataJsonInput}, proto::{ self, nft_events::Event as NftEvent, CreationStatus as NftCreationStatus, EditionInfo, NftEventKey, NftEvents, }, - Actions, AppContext, NftStorageClient, + Actions, AppContext, }; #[derive(Default)] @@ -57,10 +62,8 @@ impl Mutation { let conn = db.get(); let credits = ctx.data::>()?; - let solana = ctx.data::()?; - let polygon = ctx.data::()?; - let nft_storage = ctx.data::()?; let nfts_producer = ctx.data::>()?; + let metadata_json_upload_job_queue = ctx.data::()?; let owner_address = fetch_owner(conn, input.project, input.blockchain).await?; let supply = if input.drop_type == DropType::Open { @@ -86,6 +89,8 @@ impl Mutation { let seller_fee_basis_points = input.seller_fee_basis_points.unwrap_or_default(); + let tx = conn.begin().await?; + let collection_am = collections::ActiveModel { blockchain: Set(input.blockchain), supply: Set(supply), @@ -97,23 +102,20 @@ impl Mutation { ..Default::default() }; - let collection = Collection::new(collection_am) - .creators(input.creators.clone()) - .save(db) - .await?; + let collection = collection_am.insert(&tx).await?; - let metadata_jsons::Model { - name, - symbol, - uri, - description, - image, - .. - } = MetadataJson::new(input.metadata_json) - .upload(nft_storage) - .await? - .save(collection.id, db) - .await?; + for creator in input.creators { + let am = collection_creators::ActiveModel { + collection_id: Set(collection.id), + address: Set(creator.address), + verified: Set(creator.verified.unwrap_or_default()), + share: Set(creator.share.try_into()?), + }; + + am.insert(&tx).await?; + } + + input.metadata_json.save(collection.id, &tx).await?; let drop = drops::ActiveModel { project_id: Set(input.project), @@ -129,67 +131,18 @@ impl Mutation { ..Default::default() }; - let drop_model = drop.insert(conn).await?; - let event_key = NftEventKey { - id: collection.id.to_string(), - user_id: user_id.to_string(), - project_id: input.project.to_string(), - }; + let drop_model = drop.insert(&tx).await?; - let payload = proto::MetaplexMasterEditionTransaction { - master_edition: Some(proto::MasterEdition { - owner_address: owner_address.clone(), - supply, - name: name.clone(), - symbol, - metadata_uri: uri.clone(), - seller_fee_basis_points: seller_fee_basis_points.into(), - creators: input - .creators - .clone() - .into_iter() - .map(TryFrom::try_from) - .collect::>()?, - }), - }; + tx.commit().await?; - match input.blockchain { - BlockchainEnum::Solana => { - solana - .event() - .create_drop(input.drop_type, event_key, payload) - .await?; - }, - BlockchainEnum::Polygon => { - let amount = input.supply.ok_or(Error::new("supply is required"))?; - polygon - .create_drop( - input.drop_type, - event_key, - proto::CreateEditionTransaction { - amount: amount.try_into()?, - edition_info: Some(proto::EditionInfo { - creator: input - .creators - .get(0) - .ok_or(Error::new("creator is required"))? - .clone() - .address, - collection: name, - uri, - description, - image_uri: image, - }), - fee_receiver: owner_address, - fee_numerator: seller_fee_basis_points.into(), - }, - ) - .await?; - }, - BlockchainEnum::Ethereum => { - return Err(Error::new("blockchain not supported as this time")); - }, - }; + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask::new( + input.metadata_json, + MetadataJsonUploadCaller::CreateDrop(MetadataJsonUploadCreateDrop { + drop_id: drop_model.id, + }), + )) + .await?; nfts_producer .send( @@ -261,6 +214,10 @@ impl Mutation { .one(conn) .await? .ok_or(Error::new("metadata json not found"))?; + + let metadata_uri = metadata_json + .uri + .ok_or(Error::new("metadata uri not found"))?; let creators = CollectionCreators::find() .filter(collection_creators::Column::CollectionId.eq(collection.id)) .all(conn) @@ -297,7 +254,7 @@ impl Mutation { supply: collection.supply.map(TryInto::try_into).transpose()?, name: metadata_json.name, symbol: metadata_json.symbol, - metadata_uri: metadata_json.uri, + metadata_uri, seller_fee_basis_points: collection.seller_fee_basis_points.into(), creators: creators .into_iter() @@ -462,16 +419,13 @@ impl Mutation { let AppContext { db, user_id, .. } = ctx.data::()?; let conn = db.get(); - let nft_storage = ctx.data::()?; + let metadata_json_upload_job_queue = ctx.data::()?; let solana = ctx.data::()?; let polygon = ctx.data::()?; let user_id = user_id.0.ok_or(Error::new("X-USER-ID header not found"))?; - let (drop_model, collection_model) = Drops::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(Collections) - .filter(drops::Column::Id.eq(id)) + let (drop_model, collection_model) = drops::Entity::find_by_id_with_collection(id) .one(conn) .await? .ok_or(Error::new("drop not found"))?; @@ -493,17 +447,25 @@ impl Mutation { validate_json(collection.blockchain, metadata_json)?; } + let current_creators = collection_creators::Entity::find() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .all(conn) + .await?; + + let metadata_json_model = metadata_jsons::Entity::find() + .filter(metadata_jsons::Column::Id.eq(collection.id)) + .one(conn) + .await? + .ok_or(Error::new("metadata json not found"))?; + + let tx = conn.begin().await?; + let mut collection_am: collections::ActiveModel = collection.into(); if let Some(seller_fee_basis_points) = seller_fee_basis_points { collection_am.seller_fee_basis_points = Set(seller_fee_basis_points.try_into()?); } - let collection = collection_am.update(conn).await?; - - let current_creators = collection_creators::Entity::find() - .filter(collection_creators::Column::CollectionId.eq(collection.id)) - .all(conn) - .await?; + let collection = collection_am.update(&tx).await?; let mut drop_am = drops::ActiveModel::from(drop_model.clone()); @@ -522,8 +484,8 @@ impl Mutation { }) .transpose()?); - if let Some(creators) = creators.clone() { - let creators = creators + let creators = if let Some(creators) = creators { + let creator_ams = creators .clone() .into_iter() .map(|creator| { @@ -536,111 +498,102 @@ impl Mutation { }) .collect::>>()?; - conn.transaction::<_, (), DbErr>(|txn| { - Box::pin(async move { - collection_creators::Entity::delete_many() - .filter(collection_creators::Column::CollectionId.eq(collection.id)) - .exec(txn) - .await?; - - collection_creators::Entity::insert_many(creators) - .exec(txn) - .await?; + collection_creators::Entity::delete_many() + .filter(collection_creators::Column::CollectionId.eq(collection.id)) + .exec(&tx) + .await?; - Ok(()) - }) - }) - .await?; - } + collection_creators::Entity::insert_many(creator_ams) + .exec(&tx) + .await?; - let drop_model = drop_am.update(conn).await?; - - let metadata_json_model = metadata_jsons::Entity::find() - .filter(metadata_jsons::Column::Id.eq(collection.id)) - .one(conn) - .await? - .ok_or(Error::new("metadata json not found"))?; - - let metadata_json_model = if let Some(metadata_json) = metadata_json { - metadata_json_model.clone().delete(conn).await?; - - MetadataJson::new(metadata_json.clone()) - .upload(nft_storage) - .await? - .save(collection.id, db) - .await? + creators + .into_iter() + .map(TryFrom::try_from) + .collect::>()? } else { - metadata_json_model + current_creators.into_iter().map(Into::into).collect() }; - let event_key = NftEventKey { - id: collection.id.to_string(), - user_id: user_id.to_string(), - project_id: drop_model.project_id.to_string(), - }; + let drop_model = drop_am.update(&tx).await?; - match collection.blockchain { - BlockchainEnum::Solana => { - let creators = if let Some(creators) = creators.clone() { - creators - .into_iter() - .map(TryFrom::try_from) - .collect::>()? - } else { - current_creators.into_iter().map(Into::into).collect() - }; + if let Some(metadata_json) = metadata_json { + metadata_json_model.delete(&tx).await?; - solana - .event() - .update_drop( - drop_model.drop_type, - event_key, - proto::MetaplexMasterEditionTransaction { - master_edition: Some(proto::MasterEdition { - owner_address, - supply: collection.supply.map(TryInto::try_into).transpose()?, - name: metadata_json_model.name, - symbol: metadata_json_model.symbol, - metadata_uri: metadata_json_model.uri, - seller_fee_basis_points: collection.seller_fee_basis_points.into(), - creators, - }), - }, - ) - .await?; - }, - BlockchainEnum::Polygon => { - let creator = if let Some(creators) = creators { - creators[0].address.clone() - } else { - current_creators - .get(0) - .ok_or(Error::new("No current creator found in db"))? - .address - .clone() - }; + metadata_json.save(collection.id, &tx).await?; - polygon - .event() - .update_drop( - drop_model.drop_type, - event_key, - proto::UpdateEdtionTransaction { - edition_info: Some(EditionInfo { - description: metadata_json_model.description, - image_uri: metadata_json_model.image, - collection: metadata_json_model.name, - uri: metadata_json_model.uri, - creator, - }), - }, - ) - .await?; - }, - BlockchainEnum::Ethereum => { - return Err(Error::new("blockchain not supported yet")); - }, - }; + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask { + metadata_json, + caller: MetadataJsonUploadCaller::PatchDrop(MetadataJsonUploadPatchDrop { + drop_id: drop_model.id, + updated_by_id: user_id, + }), + }) + .await?; + } else { + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: user_id.to_string(), + project_id: drop_model.project_id.to_string(), + }; + + let metadata_uri = metadata_json_model + .uri + .ok_or(Error::new("metadata uri not found"))?; + + match collection.blockchain { + BlockchainEnum::Solana => { + solana + .event() + .update_drop( + drop_model.drop_type, + event_key, + proto::MetaplexMasterEditionTransaction { + master_edition: Some(proto::MasterEdition { + owner_address, + supply: collection.supply.map(TryInto::try_into).transpose()?, + name: metadata_json_model.name, + symbol: metadata_json_model.symbol, + metadata_uri, + seller_fee_basis_points: collection + .seller_fee_basis_points + .into(), + creators, + }), + }, + ) + .await?; + }, + BlockchainEnum::Polygon => { + polygon + .event() + .update_drop( + drop_model.drop_type, + event_key, + proto::UpdateEdtionTransaction { + edition_info: Some(EditionInfo { + description: metadata_json_model.description, + image_uri: metadata_json_model.image, + collection: metadata_json_model.name, + uri: metadata_uri, + creator: creators + .get(0) + .ok_or(Error::new("no creator found"))? + .address + .clone(), + }), + }, + ) + .await?; + }, + BlockchainEnum::Ethereum => { + return Err(Error::new("blockchain not supported yet")); + }, + }; + } + + tx.commit().await?; Ok(PatchDropPayload { drop: Drop::new(drop_model, collection), @@ -710,11 +663,38 @@ impl CreateDropInput { validate_end_time(&self.end_time)?; validate_creators(self.blockchain, &self.creators)?; validate_json(self.blockchain, &self.metadata_json)?; + validate_polygon_supply(self.blockchain, self.supply)?; + validate_polygon_creator(self.blockchain, &self.creators)?; Ok(()) } } +/// Validates the creators for polygon drops. +/// # Returns +/// - Ok(()) if the creators are provided for polygon drops. +/// # Errors +/// - Err with an appropriate error message if the creators are not provided for polygon drops. +fn validate_polygon_creator(blockchain: BlockchainEnum, creators: &[Creator]) -> Result<()> { + if blockchain == BlockchainEnum::Polygon && creators.len() > 1 { + return Err(Error::new("Only one creator is allowed for polygon drops")); + } + + Ok(()) +} + +/// Validates the supply for polygon drops. +/// # Returns +/// - Ok(()) if the supply is provided for polygon drops. +/// # Errors +/// - Err with an appropriate error message if the supply is not provided for polygon drops. +fn validate_polygon_supply(blockchain: BlockchainEnum, supply: Option) -> Result<()> { + if blockchain == BlockchainEnum::Polygon && supply.is_none() { + return Err(Error::new("Supply is required for polygon drops")); + } + + Ok(()) +} /// Validates the end time of the NFT drop. /// # Returns /// - Ok(()) if the end time is in the future or if it's not provided. diff --git a/api/src/mutations/mint.rs b/api/src/mutations/mint.rs index 5bcb89c..f6a527c 100644 --- a/api/src/mutations/mint.rs +++ b/api/src/mutations/mint.rs @@ -16,25 +16,33 @@ use super::collection::{ fetch_owner, validate_creators, validate_json, validate_solana_creator_verification, }; use crate::{ + background_worker::{ + job_queue::JobQueue, + tasks::{ + MetadataJsonUploadCaller, MetadataJsonUploadMintToCollection, + MetadataJsonUploadQueueMintToDrop, MetadataJsonUploadTask, + MetadataJsonUploadUpdateMint, + }, + }, blockchains::{ polygon::Polygon, solana::{MintDropTransaction, Solana}, CollectionEvent, DropEvent, }, entities::{ - collection_creators, collection_mints, collections, drops, mint_creators, mint_histories, - prelude::{CollectionCreators, CollectionMints, Collections, Drops}, + collection_creators, collection_mints, collections, drops, metadata_json_attributes, + metadata_json_files, metadata_jsons, mint_creators, mint_histories, + prelude::{CollectionCreators, CollectionMints, Collections}, project_wallets, sea_orm_active_enums::{Blockchain as BlockchainEnum, CreationStatus}, update_histories, }, - metadata_json::MetadataJson, objects::{CollectionMint, Creator, MetadataJsonInput}, proto::{ self, nft_events::Event as NftEvent, CreationStatus as NftCreationStatus, MetaplexMetadata, MintCollectionCreation, MintCreation, NftEventKey, NftEvents, RetryUpdateSolanaMintPayload, }, - Actions, AppContext, NftStorageClient, OrganizationId, UserID, + Actions, AppContext, OrganizationId, UserID, }; #[derive(Default)] @@ -72,42 +80,23 @@ impl Mutation { .0 .ok_or(Error::new("X-CREDIT-BALANCE header not found"))?; - let drop_model = Drops::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(Collections) - .filter(drops::Column::Id.eq(input.drop)) + let (drop_model, collection) = drops::Entity::find_by_id_with_collection(input.drop) .one(conn) - .await?; + .await? + .ok_or(Error::new("drop not found"))?; - let (drop_model, collection_model) = drop_model.ok_or(Error::new("drop not found"))?; + let collection = collection.ok_or(Error::new("collection not found"))?; // Call check_drop_status to check that drop is currently running check_drop_status(&drop_model)?; - let collection = collection_model.ok_or(Error::new("collection not found"))?; - if collection.supply == Some(collection.total_mints) { return Err(Error::new("Collection is sold out")); } let edition = collection.total_mints.add(1); - // Fetch the project wallet address which will sign the transaction by hub-treasuries - let wallet = project_wallets::Entity::find() - .filter( - project_wallets::Column::ProjectId - .eq(drop_model.project_id) - .and(project_wallets::Column::Blockchain.eq(collection.blockchain)), - ) - .one(conn) - .await?; - - let owner_address = wallet - .ok_or(Error::new(format!( - "no project wallet found for {:?} blockchain", - collection.blockchain - )))? - .wallet_address; + let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain).await?; let TransactionId(credits_deduction_id) = credits .submit_pending_deduction( @@ -119,6 +108,8 @@ impl Mutation { ) .await?; + let tx = conn.begin().await?; + // insert a collection mint record into database let collection_mint_active_model = collection_mints::ActiveModel { collection_id: Set(collection.id), @@ -132,6 +123,100 @@ impl Mutation { }; let collection_mint_model = collection_mint_active_model.insert(conn).await?; + + let mut collection_am = collections::ActiveModel::from(collection.clone()); + collection_am.total_mints = Set(edition); + collection_am.update(&tx).await?; + + // inserts a mint histories record in the database + let mint_history_am = mint_histories::ActiveModel { + mint_id: Set(collection_mint_model.id), + wallet: Set(input.recipient.clone()), + collection_id: Set(collection.id), + tx_signature: Set(None), + status: Set(CreationStatus::Pending), + created_at: Set(Utc::now().into()), + ..Default::default() + }; + + mint_history_am.insert(&tx).await?; + + if collection.blockchain == BlockchainEnum::Solana { + let collection_metadata_json = + metadata_jsons::Entity::find_by_id(collection_mint_model.id) + .one(conn) + .await? + .ok_or(Error::new("metadata json not found"))?; + + let creators = mint_creators::Entity::find() + .filter(mint_creators::Column::CollectionMintId.eq(collection_mint_model.id)) + .all(conn) + .await?; + + let files = metadata_json_files::Entity::find() + .filter(metadata_json_files::Column::MetadataJsonId.eq(collection_metadata_json.id)) + .all(conn) + .await?; + let attributes = metadata_json_attributes::Entity::find() + .filter( + metadata_json_attributes::Column::MetadataJsonId + .eq(collection_metadata_json.id), + ) + .all(conn) + .await?; + + let mut metadata_json_am = metadata_jsons::ActiveModel::from(collection_metadata_json); + + metadata_json_am.id = Set(collection_mint_model.id); + + let collection_mint_metadata_json = metadata_json_am.insert(&tx).await?; + + // TODO: Look into bug regarding creators not being saved to the database when minting editions + let creators: Vec = creators + .into_iter() + .map(|creator| mint_creators::ActiveModel { + collection_mint_id: Set(collection_mint_model.id), + address: Set(creator.address), + verified: Set(creator.verified), + share: Set(creator.share), + }) + .collect(); + + mint_creators::Entity::insert_many(creators) + .exec(&tx) + .await?; + + let files: Vec = files + .into_iter() + .map(|file| metadata_json_files::ActiveModel { + metadata_json_id: Set(collection_mint_metadata_json.id), + uri: Set(file.uri), + file_type: Set(file.file_type), + ..Default::default() + }) + .collect(); + + metadata_json_files::Entity::insert_many(files) + .exec(&tx) + .await?; + + let attributes: Vec = attributes + .into_iter() + .map(|attribute| metadata_json_attributes::ActiveModel { + metadata_json_id: Set(collection_mint_metadata_json.id), + trait_type: Set(attribute.trait_type), + value: Set(attribute.value), + ..Default::default() + }) + .collect(); + + metadata_json_attributes::Entity::insert_many(attributes) + .exec(&tx) + .await?; + } + + tx.commit().await?; + let event_key = NftEventKey { id: collection_mint_model.id.to_string(), user_id: user_id.to_string(), @@ -140,11 +225,6 @@ impl Mutation { match collection.blockchain { BlockchainEnum::Solana => { - MetadataJson::fetch(collection.id, db) - .await? - .save(collection_mint_model.id, db) - .await?; - solana .event() .mint_drop( @@ -173,23 +253,6 @@ impl Mutation { }, }; - let mut collection_am = collections::ActiveModel::from(collection.clone()); - collection_am.total_mints = Set(edition); - collection_am.update(conn).await?; - - // inserts a mint histories record in the database - let purchase_am = mint_histories::ActiveModel { - mint_id: Set(collection_mint_model.id), - wallet: Set(input.recipient), - collection_id: Set(collection.id), - tx_signature: Set(None), - status: Set(CreationStatus::Pending), - created_at: Set(Utc::now().into()), - ..Default::default() - }; - - purchase_am.insert(conn).await?; - nfts_producer .send( Some(&NftEvents { @@ -360,9 +423,8 @@ impl Mutation { } = ctx.data::()?; let credits = ctx.data::>()?; let conn = db.get(); - let solana = ctx.data::()?; let nfts_producer = ctx.data::>()?; - let nft_storage = ctx.data::()?; + let metadata_json_upload_job_queue = ctx.data::()?; let UserID(id) = user_id; let OrganizationId(org) = organization_id; @@ -413,27 +475,28 @@ impl Mutation { ) .await?; + let tx = conn.begin().await?; + // insert a collection mint record into database let collection_mint_active_model = collection_mints::ActiveModel { collection_id: Set(collection.id), owner: Set(Some(input.recipient.clone())), creation_status: Set(CreationStatus::Pending), - seller_fee_basis_points: Set(collection.seller_fee_basis_points), + seller_fee_basis_points: Set(seller_fee_basis_points.try_into()?), created_by: Set(user_id), compressed: Set(Some(compressed)), credits_deduction_id: Set(Some(credits_deduction_id)), ..Default::default() }; - let collection_mint_model = collection_mint_active_model.insert(conn).await?; + let collection_mint_model = collection_mint_active_model.insert(&tx).await?; - let metadata_json = MetadataJson::new(input.metadata_json) - .upload(nft_storage) - .await? - .save(collection_mint_model.id, db) + input + .metadata_json + .save(collection_mint_model.id, &tx) .await?; - for creator in creators.clone() { + for creator in creators { let am = mint_creators::ActiveModel { collection_mint_id: Set(collection_mint_model.id), address: Set(creator.address), @@ -441,45 +504,12 @@ impl Mutation { share: Set(creator.share.try_into()?), }; - am.insert(conn).await?; + am.insert(&tx).await?; } - let event_key = NftEventKey { - id: collection_mint_model.id.to_string(), - user_id: user_id.to_string(), - project_id: collection.project_id.to_string(), - }; - - match collection.blockchain { - BlockchainEnum::Solana => { - solana - .event() - .mint_to_collection(event_key, proto::MintMetaplexMetadataTransaction { - metadata: Some(MetaplexMetadata { - owner_address, - name: metadata_json.name, - symbol: metadata_json.symbol, - metadata_uri: metadata_json.uri, - seller_fee_basis_points: seller_fee_basis_points.into(), - creators: creators - .into_iter() - .map(TryFrom::try_from) - .collect::>()?, - }), - recipient_address: input.recipient.to_string(), - compressed, - collection_id: collection.id.to_string(), - }) - .await?; - }, - BlockchainEnum::Ethereum | BlockchainEnum::Polygon => { - return Err(Error::new("blockchain not supported as this time")); - }, - }; - let mut collection_am = collections::ActiveModel::from(collection.clone()); collection_am.total_mints = Set(collection.total_mints.add(1)); - collection_am.update(conn).await?; + collection_am.update(&tx).await?; let mint_history_am = mint_histories::ActiveModel { mint_id: Set(collection_mint_model.id), @@ -491,7 +521,20 @@ impl Mutation { ..Default::default() }; - mint_history_am.insert(conn).await?; + mint_history_am.insert(&tx).await?; + + tx.commit().await?; + + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask { + caller: MetadataJsonUploadCaller::MintToCollection( + MetadataJsonUploadMintToCollection { + collection_mint_id: collection_mint_model.id, + }, + ), + metadata_json: input.metadata_json, + }) + .await?; nfts_producer .send( @@ -531,8 +574,7 @@ impl Mutation { } = ctx.data::()?; let credits = ctx.data::>()?; let conn = db.get(); - let solana = ctx.data::()?; - let nft_storage = ctx.data::()?; + let metadata_json_upload_job_queue = ctx.data::()?; let UserID(id) = user_id; let OrganizationId(org) = organization_id; @@ -545,9 +587,7 @@ impl Mutation { let creators = input.creators; - let (mint, collection) = CollectionMints::find() - .find_also_related(Collections) - .filter(collection_mints::Column::Id.eq(input.id)) + let (mint, collection) = CollectionMints::find_by_id_with_collection(input.id) .one(conn) .await? .ok_or(Error::new("Mint not found"))?; @@ -589,7 +629,7 @@ impl Mutation { }) .collect::>>()?; - let deduction_id = credits + let TransactionId(deduction_id) = credits .submit_pending_deduction( org_id, user_id, @@ -599,81 +639,47 @@ impl Mutation { ) .await?; + let tx = conn.begin().await?; + let mut mint_am: collection_mints::ActiveModel = mint.clone().into(); let update_history_am = update_histories::ActiveModel { mint_id: Set(mint.id), txn_signature: Set(None), - credit_deduction_id: Set(deduction_id.0), + credit_deduction_id: Set(deduction_id), created_by: Set(user_id), status: Set(CreationStatus::Pending), ..Default::default() }; - let update_history = update_history_am.insert(db.get()).await?; - let sfbp = mint.seller_fee_basis_points.try_into()?; - conn.transaction::<_, (), DbErr>(|txn| { - Box::pin(async move { - mint_creators::Entity::delete_many() - .filter(mint_creators::Column::CollectionMintId.eq(mint.id)) - .exec(txn) - .await?; + let update_history = update_history_am.insert(&tx).await?; - mint_creators::Entity::insert_many(creators_am) - .exec(txn) - .await?; + mint_creators::Entity::delete_many() + .filter(mint_creators::Column::CollectionMintId.eq(mint.id)) + .exec(&tx) + .await?; - if let Some(sfbp) = input.seller_fee_basis_points { - mint_am.seller_fee_basis_points = Set(sfbp.try_into().unwrap_or_default()); - mint_am.update(txn).await?; - } + mint_creators::Entity::insert_many(creators_am) + .exec(&tx) + .await?; - Ok(()) - }) - }) - .await?; + if let Some(sfbp) = input.seller_fee_basis_points { + mint_am.seller_fee_basis_points = Set(sfbp.try_into().unwrap_or_default()); + mint_am.update(&tx).await?; + } - let metadata_json = MetadataJson::new(input.metadata_json) - .upload(nft_storage) - .await? - .save(mint.id, db) - .await?; + input.metadata_json.save(mint.id, &tx).await?; - match collection.blockchain { - BlockchainEnum::Solana => { - solana - .event() - .update_collection_mint( - NftEventKey { - id: update_history.id.to_string(), - project_id: collection.project_id.to_string(), - user_id: user_id.to_string(), - }, - proto::UpdateSolanaMintPayload { - metadata: Some(MetaplexMetadata { - owner_address, - name: metadata_json.name, - symbol: metadata_json.symbol, - metadata_uri: metadata_json.uri, - seller_fee_basis_points: input - .seller_fee_basis_points - .unwrap_or(sfbp) - .try_into()?, - creators: creators - .into_iter() - .map(TryFrom::try_from) - .collect::>()?, - }), - collection_id: collection.id.to_string(), - mint_id: update_history.mint_id.to_string(), - }, - ) - .await?; - }, - BlockchainEnum::Ethereum | BlockchainEnum::Polygon => { - return Err(Error::new("blockchain not supported as this time")); - }, - }; + tx.commit().await?; + + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask { + caller: MetadataJsonUploadCaller::UpdateMint(MetadataJsonUploadUpdateMint { + update_history_id: update_history.id, + }), + metadata_json: input.metadata_json, + }) + .await?; Ok(UpdateMintPayload { collection_mint: mint.into(), @@ -707,15 +713,15 @@ impl Mutation { .await? .ok_or(Error::new("Update history not found"))?; + let collection = collection.ok_or(Error::new("Collection not found"))?; + if update_history.status == CreationStatus::Created { return Err(Error::new("Mint already updated")); } let mut update_history_am = update_histories::ActiveModel::from(update_history.clone()); update_history_am.status = Set(CreationStatus::Pending); - update_history_am.update(db.get()).await?; - - let collection = collection.ok_or(Error::new("Collection not found"))?; + update_history_am.update(conn).await?; match collection.blockchain { BlockchainEnum::Solana => { @@ -796,15 +802,14 @@ impl Mutation { let owner_address = fetch_owner(conn, project_id, blockchain).await?; - let MetadataJson { - metadata_json, uri, .. - } = MetadataJson::fetch(collection_mint_model.id, db).await?; + let metadata_json = metadata_jsons::Entity::find_by_id(collection_mint_model.id) + .one(conn) + .await? + .ok_or(Error::new("metadata json not found"))?; - let event_key = NftEventKey { - id: collection_mint_model.id.to_string(), - user_id: user_id.to_string(), - project_id: project_id.to_string(), - }; + let metadata_uri = metadata_json + .uri + .ok_or(Error::new("metadata uri not found"))?; let creators = mint_creators::Entity::find_by_collection_mint_id(collection_mint_model.id) .all(conn) @@ -820,6 +825,12 @@ impl Mutation { ) .await?; + let event_key = NftEventKey { + id: collection_mint_model.id.to_string(), + user_id: user_id.to_string(), + project_id: project_id.to_string(), + }; + match collection.blockchain { BlockchainEnum::Solana => { solana @@ -829,7 +840,7 @@ impl Mutation { owner_address, name: metadata_json.name, symbol: metadata_json.symbol, - metadata_uri: uri.ok_or(Error::new("metadata uri not found"))?, + metadata_uri, seller_fee_basis_points: collection_mint_model .seller_fee_basis_points .into(), @@ -862,25 +873,32 @@ impl Mutation { let AppContext { db, user_id, .. } = ctx.data::()?; let conn = db.get(); - let nft_storage = ctx.data::()?; + + let metadata_json_upload_job_queue = ctx.data::()?; let nfts_producer = ctx.data::>()?; let UserID(id) = user_id; let user_id = id.ok_or(Error::new("X-USER-ID header not found"))?; - let (drop, collection) = drops::Entity::find_by_id(input.drop) - .find_also_related(Collections) + let (drop, collection) = drops::Entity::find_by_id_with_collection(input.drop) .one(conn) .await? .ok_or(Error::new("drop not found"))?; let collection_model = collection.ok_or(Error::new("collection not found"))?; + let creators = CollectionCreators::find() + .filter(collection_creators::Column::CollectionId.eq(collection_model.id)) + .all(conn) + .await?; + + let tx = conn.begin().await?; + let mut collection_am: collections::ActiveModel = collection_model.clone().into(); collection_am.supply = Set(collection_model.supply.map(|supply| supply.add(1))); - collection_am.update(conn).await?; + collection_am.update(&tx).await?; let mint = collection_mints::ActiveModel { collection_id: Set(drop.collection_id), @@ -892,18 +910,9 @@ impl Mutation { ..Default::default() }; - let mint_model = mint.insert(conn).await?; - - MetadataJson::new(input.metadata_json) - .upload(nft_storage) - .await? - .save(mint_model.id, db) - .await?; + let mint_model = mint.insert(&tx).await?; - let creators = CollectionCreators::find() - .filter(collection_creators::Column::CollectionId.eq(collection_model.id)) - .all(conn) - .await?; + input.metadata_json.save(mint_model.id, &tx).await?; let mint_creators: Vec<_> = creators .iter() @@ -916,7 +925,21 @@ impl Mutation { .collect(); mint_creators::Entity::insert_many(mint_creators) - .exec(conn) + .exec(&tx) + .await?; + + tx.commit().await?; + + metadata_json_upload_job_queue + .enqueue(MetadataJsonUploadTask { + caller: MetadataJsonUploadCaller::QueueMintToDrop( + MetadataJsonUploadQueueMintToDrop { + drop_id: drop.id, + collection_mint_id: mint_model.id, + }, + ), + metadata_json: input.metadata_json, + }) .await?; nfts_producer @@ -930,7 +953,7 @@ impl Mutation { Some(&NftEventKey { id: mint_model.id.to_string(), project_id: drop.project_id.to_string(), - user_id: user_id.to_string(), + user_id: mint_model.created_by.to_string(), }), ) .await?; @@ -953,10 +976,12 @@ impl Mutation { balance, .. } = ctx.data::()?; + let credits = ctx.data::>()?; - let conn = db.get(); - let solana = ctx.data::()?; let nfts_producer = ctx.data::>()?; + let solana = ctx.data::()?; + + let conn = db.get(); let UserID(id) = user_id; let OrganizationId(org) = organization_id; @@ -978,14 +1003,25 @@ impl Mutation { let collection = collection.ok_or(Error::new("collection not found"))?; + let drop = drops::Entity::find() + .filter(drops::Column::CollectionId.eq(collection.id)) + .one(conn) + .await? + .ok_or(Error::new("drop not found"))?; + let project_id = collection.project_id; let blockchain = collection.blockchain; let owner_address = fetch_owner(conn, project_id, blockchain).await?; - let MetadataJson { - metadata_json, uri, .. - } = MetadataJson::fetch(mint.id, db).await?; + let metadata_json = metadata_jsons::Entity::find_by_id(mint.id) + .one(conn) + .await? + .ok_or(Error::new("metadata json not found"))?; + + let metadata_uri = metadata_json + .uri + .ok_or(Error::new("No metadata json uri found"))?; let event_key = NftEventKey { id: mint.id.to_string(), @@ -1013,9 +1049,13 @@ impl Mutation { ) .await?; + let tx = conn.begin().await?; + let mut collection_am = collections::ActiveModel::from(collection.clone()); + collection_am.total_mints = Set(collection.total_mints.add(1)); - collection_am.update(conn).await?; + + collection_am.update(&tx).await?; let mut mint_am: collection_mints::ActiveModel = mint.into(); @@ -1025,7 +1065,21 @@ impl Mutation { mint_am.owner = Set(Some(input.recipient.clone())); mint_am.seller_fee_basis_points = Set(collection.seller_fee_basis_points); - let mint = mint_am.update(conn).await?; + let mint = mint_am.update(&tx).await?; + + let mint_history_am = mint_histories::ActiveModel { + mint_id: Set(mint.id), + wallet: Set(input.recipient.clone()), + collection_id: Set(collection.id), + tx_signature: Set(None), + status: Set(CreationStatus::Pending), + created_at: Set(Utc::now().into()), + ..Default::default() + }; + + mint_history_am.insert(&tx).await?; + + tx.commit().await?; match collection.blockchain { BlockchainEnum::Solana => { @@ -1038,8 +1092,7 @@ impl Mutation { owner_address, name: metadata_json.name, symbol: metadata_json.symbol, - metadata_uri: uri - .ok_or(Error::new("No metadata json uri found"))?, + metadata_uri, seller_fee_basis_points: mint.seller_fee_basis_points.into(), creators: creators.into_iter().map(Into::into).collect(), }), @@ -1055,12 +1108,6 @@ impl Mutation { }, }; - let drop = drops::Entity::find() - .filter(drops::Column::CollectionId.eq(collection.id)) - .one(conn) - .await? - .ok_or(Error::new("drop not found"))?; - nfts_producer .send( Some(&NftEvents { @@ -1077,18 +1124,6 @@ impl Mutation { ) .await?; - let mint_history_am = mint_histories::ActiveModel { - mint_id: Set(mint.id), - wallet: Set(input.recipient), - collection_id: Set(collection.id), - tx_signature: Set(None), - status: Set(CreationStatus::Pending), - created_at: Set(Utc::now().into()), - ..Default::default() - }; - - mint_history_am.insert(conn).await?; - Ok(MintQueuedPayload { collection_mint: mint.into(), }) @@ -1146,15 +1181,14 @@ impl Mutation { let owner_address = fetch_owner(conn, project_id, blockchain).await?; - let MetadataJson { - metadata_json, uri, .. - } = MetadataJson::fetch(mint.id, db).await?; + let metadata_json = metadata_jsons::Entity::find_by_id(mint.id) + .one(conn) + .await? + .ok_or(Error::new("metadata json not found"))?; - let event_key = NftEventKey { - id: mint.id.to_string(), - user_id: user_id.to_string(), - project_id: project_id.to_string(), - }; + let metadata_uri = metadata_json + .uri + .ok_or(Error::new("No metadata json uri found"))?; let creators = mint_creators::Entity::find_by_collection_mint_id(mint.id) .all(conn) @@ -1176,9 +1210,11 @@ impl Mutation { ) .await?; + let tx = conn.begin().await?; + let mut collection_am = collections::ActiveModel::from(collection.clone()); collection_am.total_mints = Set(collection.total_mints.add(1)); - collection_am.update(conn).await?; + collection_am.update(&tx).await?; let mut mint_am: collection_mints::ActiveModel = mint.into(); @@ -1188,7 +1224,27 @@ impl Mutation { mint_am.owner = Set(Some(input.recipient.clone())); mint_am.seller_fee_basis_points = Set(collection.seller_fee_basis_points); - let mint = mint_am.update(conn).await?; + let mint = mint_am.update(&tx).await?; + + let mint_history_am = mint_histories::ActiveModel { + mint_id: Set(mint.id), + wallet: Set(input.recipient.clone()), + collection_id: Set(collection.id), + tx_signature: Set(None), + status: Set(CreationStatus::Pending), + created_at: Set(Utc::now().into()), + ..Default::default() + }; + + mint_history_am.insert(&tx).await?; + + tx.commit().await?; + + let event_key = NftEventKey { + id: mint.id.to_string(), + user_id: user_id.to_string(), + project_id: project_id.to_string(), + }; match collection.blockchain { BlockchainEnum::Solana => { @@ -1201,7 +1257,7 @@ impl Mutation { owner_address, name: metadata_json.name, symbol: metadata_json.symbol, - metadata_uri: uri.ok_or(Error::new("No metadata json uri"))?, + metadata_uri, seller_fee_basis_points: mint.seller_fee_basis_points.into(), creators: creators.into_iter().map(Into::into).collect(), }), @@ -1217,18 +1273,6 @@ impl Mutation { }, }; - let mint_history_am = mint_histories::ActiveModel { - mint_id: Set(mint.id), - wallet: Set(input.recipient), - collection_id: Set(collection.id), - tx_signature: Set(None), - status: Set(CreationStatus::Pending), - created_at: Set(Utc::now().into()), - ..Default::default() - }; - - mint_history_am.insert(conn).await?; - nfts_producer .send( Some(&NftEvents { diff --git a/api/src/nft_storage.rs b/api/src/nft_storage.rs index 0807762..c534050 100644 --- a/api/src/nft_storage.rs +++ b/api/src/nft_storage.rs @@ -68,7 +68,7 @@ impl NftStorageClient { /// /// # Errors /// If the upload fails - pub async fn upload(&self, data: impl Serialize) -> Result { + pub async fn upload(&self, data: &impl Serialize) -> Result { self.post("/upload".to_string(), data) .await? .json() diff --git a/api/src/objects/metadata_json.rs b/api/src/objects/metadata_json.rs index 75ca582..24000b0 100644 --- a/api/src/objects/metadata_json.rs +++ b/api/src/objects/metadata_json.rs @@ -1,14 +1,11 @@ use async_graphql::{ComplexObject, Context, InputObject, Result, SimpleObject}; use hub_core::{assets::AssetProxy, uuid::Uuid}; use reqwest::Url; +use sea_orm::{prelude::*, sea_query::OnConflict, DatabaseTransaction, Set}; use serde::{Deserialize, Serialize}; use crate::{ - entities::{ - metadata_json_attributes::{self, Model as MetadataJsonAttributeModel}, - metadata_json_files::Model as MetadataJsonFileModel, - metadata_jsons::{self, Model as MetadataJsonModel}, - }, + entities::{metadata_json_attributes, metadata_json_files, metadata_jsons}, AppContext, }; @@ -18,12 +15,14 @@ use crate::{ #[derive(Clone, Debug, PartialEq, Eq, SimpleObject)] #[graphql(complex, concrete(name = "MetadataJson", params()))] pub struct MetadataJson { + // The id of the metadata json. pub id: Uuid, - pub identifier: String, + // The assigned identifier of the metadata json uri. + pub identifier: Option, /// The assigned name of the NFT. pub name: String, /// The URI for the complete metadata JSON. - pub uri: String, + pub uri: Option, /// The symbol of the NFT. pub symbol: String, /// The description of the NFT. @@ -62,34 +61,6 @@ impl MetadataJson { } } -impl From for MetadataJson { - fn from( - metadata_jsons::Model { - id, - identifier, - name, - uri, - symbol, - description, - image, - animation_url, - external_url, - }: metadata_jsons::Model, - ) -> Self { - Self { - id, - identifier, - name, - uri, - symbol, - description, - image_original: image, - animation_url, - external_url, - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize, InputObject)] pub struct MetadataJsonInput { pub name: String, @@ -136,52 +107,109 @@ pub struct Collection { pub family: Option, } -impl - From<( - MetadataJsonModel, - Vec, - Option>, - )> for MetadataJsonInput -{ - fn from( - (metadata_json, attributes, files): ( - MetadataJsonModel, - Vec, - Option>, - ), - ) -> Self { - let input = MetadataJsonInput { - name: metadata_json.name, - symbol: metadata_json.symbol, - description: metadata_json.description, - image: metadata_json.image, - animation_url: metadata_json.animation_url, - collection: None, - attributes: attributes.iter().map(|a| a.clone().into()).collect(), - external_url: metadata_json.external_url, - properties: Some(Property { - files: files.map(|files| files.iter().map(|f| f.clone().into()).collect()), - category: None, - }), +impl MetadataJsonInput { + /// Saves the metadata json to the database. If the metadata json already exists, it will update the existing record. + /// # Arguments + /// * `id` - The id of the metadata json + /// * `tx` - The database transaction to use + /// # Returns + /// Returns Ok with () + /// # Errors + /// Returns Err if unable to save records to the database + pub async fn save(&self, id: Uuid, tx: &DatabaseTransaction) -> Result<()> { + let metadata_json = self.clone(); + let metadata_json_active_model = metadata_jsons::ActiveModel { + id: Set(id), + identifier: Set(None), + name: Set(metadata_json.name), + uri: Set(None), + symbol: Set(metadata_json.symbol), + description: Set(metadata_json.description), + image: Set(metadata_json.image), + animation_url: Set(metadata_json.animation_url), + external_url: Set(metadata_json.external_url), }; - input - } -} -impl From for Attribute { - fn from(attribute: MetadataJsonAttributeModel) -> Self { - Attribute { - trait_type: attribute.trait_type, - value: attribute.value, + let metadata_json_model = metadata_jsons::Entity::insert(metadata_json_active_model) + .on_conflict( + OnConflict::column(metadata_jsons::Column::Id) + .update_columns([ + metadata_jsons::Column::Identifier, + metadata_jsons::Column::Name, + metadata_jsons::Column::Uri, + metadata_jsons::Column::Symbol, + metadata_jsons::Column::Description, + metadata_jsons::Column::Image, + metadata_jsons::Column::AnimationUrl, + metadata_jsons::Column::ExternalUrl, + ]) + .to_owned(), + ) + .exec_with_returning(tx) + .await?; + + metadata_json_attributes::Entity::delete_many() + .filter(metadata_json_attributes::Column::MetadataJsonId.eq(metadata_json_model.id)) + .exec(tx) + .await?; + + for attribute in metadata_json.attributes { + let am = metadata_json_attributes::ActiveModel { + metadata_json_id: Set(metadata_json_model.id), + trait_type: Set(attribute.trait_type), + value: Set(attribute.value), + ..Default::default() + }; + + am.insert(tx).await?; + } + + if let Some(files) = metadata_json.properties.unwrap_or_default().files { + metadata_json_files::Entity::delete_many() + .filter(metadata_json_files::Column::MetadataJsonId.eq(metadata_json_model.id)) + .exec(tx) + .await?; + + for file in files { + let metadata_json_file_am = metadata_json_files::ActiveModel { + metadata_json_id: Set(metadata_json_model.id), + uri: Set(file.uri), + file_type: Set(file.file_type), + ..Default::default() + }; + + metadata_json_file_am.insert(tx).await?; + } } + + Ok(()) } } -impl From for File { - fn from(file: MetadataJsonFileModel) -> Self { - File { - uri: file.uri, - file_type: file.file_type, +impl From for MetadataJson { + fn from( + metadata_jsons::Model { + id, + identifier, + name, + uri, + symbol, + description, + image, + animation_url, + external_url, + }: metadata_jsons::Model, + ) -> Self { + Self { + id, + identifier, + name, + uri, + symbol, + description, + image_original: image, + animation_url, + external_url, } } } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 52ecf9e..a778326 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -53,15 +53,13 @@ mod m20230725_144506_drop_solana_collections_table; mod m20230807_090847_create_histories_table; mod m20230818_163948_downcase_polygon_addresses; mod m20230821_131630_create_switch_collection_histories_table; -<<<<<<< HEAD mod m20230905_100852_add_type_to_drop; mod m20230910_204731_add_queued_variant_to_mints_status; mod m20230910_212742_make_owner_address_optional_for_mint; mod m20230911_144938_make_compressed_column_optional; -mod m20230915_111128_create_mints_creation_status_idx; -======= mod m20230914_154759_add_job_trackings_table; ->>>>>>> 502f5ad (feat: create worker queue using redis for uploading metadata json in the background) +mod m20230915_111128_create_mints_creation_status_idx; +mod m20230922_150621_nullable_metadata_jsons_identifier_and_uri; pub struct Migrator; @@ -122,15 +120,13 @@ impl MigratorTrait for Migrator { Box::new(m20230807_090847_create_histories_table::Migration), Box::new(m20230818_163948_downcase_polygon_addresses::Migration), Box::new(m20230821_131630_create_switch_collection_histories_table::Migration), -<<<<<<< HEAD Box::new(m20230905_100852_add_type_to_drop::Migration), Box::new(m20230910_204731_add_queued_variant_to_mints_status::Migration), Box::new(m20230910_212742_make_owner_address_optional_for_mint::Migration), Box::new(m20230911_144938_make_compressed_column_optional::Migration), Box::new(m20230915_111128_create_mints_creation_status_idx::Migration), -======= Box::new(m20230914_154759_add_job_trackings_table::Migration), ->>>>>>> 502f5ad (feat: create worker queue using redis for uploading metadata json in the background) + Box::new(m20230922_150621_nullable_metadata_jsons_identifier_and_uri::Migration), ] } } diff --git a/migration/src/m20230922_150621_nullable_metadata_jsons_identifier_and_uri.rs b/migration/src/m20230922_150621_nullable_metadata_jsons_identifier_and_uri.rs new file mode 100644 index 0000000..58669d4 --- /dev/null +++ b/migration/src/m20230922_150621_nullable_metadata_jsons_identifier_and_uri.rs @@ -0,0 +1,39 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(MetadataJsons::Table) + .modify_column(ColumnDef::new(MetadataJsons::Uri).null()) + .modify_column(ColumnDef::new(MetadataJsons::Identifier).null()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(MetadataJsons::Table) + .modify_column(ColumnDef::new(MetadataJsons::Uri).not_null()) + .modify_column(ColumnDef::new(MetadataJsons::Identifier).not_null()) + .to_owned(), + ) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum MetadataJsons { + Table, + Uri, + Identifier, +}