From c32b5818a30a11dd3540b051f50fd91f6d615f75 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Mon, 10 Apr 2023 14:09:44 -0700 Subject: [PATCH] Adopt `sqlx migrate` (#1221) We now use `sqlx` ([1]) and its CLI ([2]) to manage our SQL schema and migrate it to new versions. The schema is now represented as a series of SQL scripts in `db/`, along with teardown scripts to enable rollbacks. The objective here is to enable easy migrations, namely: - adding new tables - adding indices to tables - adding columns with default values to tables More complex migrations which transform or delete data may require more sophisticated tools than what's here, and even these "easy" migrations require some planning and operational tools that aren't present in this repository. This commit contains these changes: - the `publish-sql-schema` workflow now publishes all migration scripts in `db` to a Google Cloud Storage bucket to later be consumed by Divvi Up ops tools - the interop test containers use `sqlx::migrate::Migrator` instead of `janus_cli` to write the schema into their database - `janus_aggregator_core::datastore::test_util::ephemeral_dataastore` uses `sqlx::migrate::Migrator` to apply migrations to ephemeral databases - `db/schema.sql` is copied to a `sqlx` migration script, and a corresponding `down` script is added which deletes tables, indices, etc. [1]: https://crates.io/crates/sqlx [2]: https://crates.io/crates/sqlx-cli Part of #30 --- .github/workflows/publish-sql-schema.yml | 5 +- Cargo.lock | 310 ++++++++++++++++-- Dockerfile.interop | 1 - Dockerfile.interop_aggregator | 9 +- aggregator/Cargo.toml | 2 + aggregator/src/datastore.rs | 21 +- db/20230405185602_initial-schema.down.sql | 27 ++ db/20230405185602_initial-schema.up.sql | 221 +++++++++++++ interop_binaries/Cargo.toml | 1 + interop_binaries/build.rs | 2 +- interop_binaries/config/janus_cli.yaml | 4 - .../config/janus_interop_aggregator.yaml | 1 + interop_binaries/setup.sh | 1 - .../src/bin/janus_interop_aggregator.rs | 11 + 14 files changed, 570 insertions(+), 46 deletions(-) create mode 100644 db/20230405185602_initial-schema.down.sql create mode 100644 db/20230405185602_initial-schema.up.sql delete mode 100644 interop_binaries/config/janus_cli.yaml diff --git a/.github/workflows/publish-sql-schema.yml b/.github/workflows/publish-sql-schema.yml index 4fd915c4e..d8945c3f1 100644 --- a/.github/workflows/publish-sql-schema.yml +++ b/.github/workflows/publish-sql-schema.yml @@ -29,4 +29,7 @@ jobs: id: get_version run: echo VERSION=${GITHUB_REF/refs\/tags\//} >> $GITHUB_OUTPUT - name: "Upload schema file(s)" - run: gcloud alpha storage cp db/schema.sql gs://janus-artifacts-sql-schemas/${{ steps.get_version.outputs.VERSION }}/schema.sql + run: |- + gcloud alpha storage cp --recursive \ + db \ + gs://janus-artifacts-sql-schemas/${{ steps.get_version.outputs.VERSION }}/ diff --git a/Cargo.lock b/Cargo.lock index db4a809b1..f3a1d9c47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.19" @@ -166,7 +177,7 @@ checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -180,6 +191,15 @@ dependencies = [ "syn 2.0.10", ] +[[package]] +name = "atoi" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -567,6 +587,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" + [[package]] name = "crc32fast" version = "1.3.2" @@ -611,6 +646,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.11" @@ -693,7 +738,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -704,7 +749,7 @@ checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ "darling_core", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -717,7 +762,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.3", ] [[package]] @@ -780,7 +825,7 @@ checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -803,6 +848,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -813,6 +867,17 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -824,6 +889,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dunce" version = "1.0.2" @@ -897,6 +968,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1034,6 +1111,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.11.2", +] + [[package]] name = "futures-io" version = "0.3.28" @@ -1164,6 +1252,18 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown", +] [[package]] name = "hdrhistogram" @@ -1217,6 +1317,9 @@ name = "heck" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -1400,7 +1503,7 @@ dependencies = [ "once_cell", "openssl", "openssl-sys", - "parking_lot", + "parking_lot 0.12.1", "tokio", "tokio-openssl", "tower-layer", @@ -1587,6 +1690,7 @@ dependencies = [ "serde_yaml 0.9.19", "signal-hook", "signal-hook-tokio", + "sqlx", "tempfile", "testcontainers", "thiserror", @@ -1758,6 +1862,7 @@ dependencies = [ "ring", "serde", "serde_json", + "sqlx", "testcontainers", "tokio", "tracing", @@ -2169,7 +2274,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2219,7 +2324,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2401,6 +2506,17 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -2408,7 +2524,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.3", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -2424,6 +2554,12 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "paste" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" + [[package]] name = "pem" version = "1.1.0" @@ -2484,7 +2620,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2545,7 +2681,7 @@ checksum = "d0c2c18e40b92144b05e6f3ae9d1ee931f0d1afa9410ac8b97486c6eaaf91201" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2596,7 +2732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a49e86d2c26a24059894a3afa13fd17d063419b05dfb83f06d9c3566060c3f5a" dependencies = [ "proc-macro2", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2638,7 +2774,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "version_check", ] @@ -2672,7 +2808,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot", + "parking_lot 0.12.1", "protobuf", "thiserror", ] @@ -2717,7 +2853,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3182,7 +3318,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3352,6 +3488,110 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "sqlformat" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" +dependencies = [ + "itertools", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8de3b03a925878ed54a954f621e64bf55a3c1bd29652d0d1a17830405350188" +dependencies = [ + "sqlx-core", + "sqlx-macros", +] + +[[package]] +name = "sqlx-core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa8241483a83a3f33aa5fff7e7d9def398ff9990b2752b6c6112b83c6d246029" +dependencies = [ + "ahash", + "atoi", + "base64 0.13.1", + "bitflags", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "dirs", + "dotenvy", + "either", + "event-listener", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-util", + "hashlink", + "hex", + "hkdf", + "hmac", + "indexmap", + "itoa", + "libc", + "log", + "md-5", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "rand", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "sqlformat", + "sqlx-rt", + "stringprep", + "thiserror", + "tokio-stream", + "url", + "webpki-roots", + "whoami", +] + +[[package]] +name = "sqlx-macros" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.0", + "once_cell", + "proc-macro2", + "quote", + "sha2", + "sqlx-core", + "sqlx-rt", + "syn 1.0.109", + "url", +] + +[[package]] +name = "sqlx-rt" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024" +dependencies = [ + "once_cell", + "tokio", + "tokio-rustls", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -3401,7 +3641,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3412,9 +3652,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.104" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae548ec36cf198c0ef7710d3c230987c2d6d7bd98ad6edc0274462724c585ce" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -3446,7 +3686,7 @@ checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "unicode-xid", ] @@ -3604,7 +3844,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -3659,7 +3899,7 @@ dependencies = [ "futures-channel", "futures-util", "log", - "parking_lot", + "parking_lot 0.12.1", "percent-encoding", "phf", "pin-project-lite", @@ -3794,7 +4034,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3871,7 +4111,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -4060,6 +4300,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.0" @@ -4234,7 +4480,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "wasm-bindgen-shared", ] @@ -4268,7 +4514,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4319,6 +4565,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "whoami" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c70234412ca409cc04e864e89523cb0fc37f5e1344ebed5a3ebf4192b6b9f68" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -4520,7 +4776,7 @@ checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "synstructure", ] diff --git a/Dockerfile.interop b/Dockerfile.interop index 868a60ef2..2bcbfc899 100644 --- a/Dockerfile.interop +++ b/Dockerfile.interop @@ -22,7 +22,6 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry --mount=type=cache,targe FROM alpine:3.17.3 ARG BINARY RUN mkdir /logs -COPY --from=builder /src/db/schema.sql /db/schema.sql COPY --from=builder /$BINARY /$BINARY EXPOSE 8080 # Store the build argument in an environment variable so we can reference it diff --git a/Dockerfile.interop_aggregator b/Dockerfile.interop_aggregator index fff750664..6a0885bfd 100644 --- a/Dockerfile.interop_aggregator +++ b/Dockerfile.interop_aggregator @@ -25,27 +25,24 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ cargo build --profile $PROFILE -p janus_aggregator \ --bin aggregation_job_creator \ --bin aggregation_job_driver \ - --bin collect_job_driver \ - --bin janus_cli && \ + --bin collect_job_driver && \ cp /src/target/$PROFILE/aggregation_job_creator /aggregation_job_creator && \ cp /src/target/$PROFILE/aggregation_job_driver /aggregation_job_driver && \ - cp /src/target/$PROFILE/collect_job_driver /collect_job_driver && \ - cp /src/target/$PROFILE/janus_cli /janus_cli + cp /src/target/$PROFILE/collect_job_driver /collect_job_driver FROM postgres:14-alpine RUN mkdir /logs && mkdir /etc/janus RUN apk add --update supervisor && rm -rf /tmp/* /var/cache/apk/* +COPY db /etc/janus/migrations COPY interop_binaries/setup.sh /usr/local/bin/setup.sh COPY interop_binaries/config/supervisord.conf /etc/janus/supervisord.conf COPY interop_binaries/config/janus_interop_aggregator.yaml /etc/janus/janus_interop_aggregator.yaml COPY interop_binaries/config/aggregation_job_creator.yaml /etc/janus/aggregation_job_creator.yaml COPY interop_binaries/config/aggregation_job_driver.yaml /etc/janus/aggregation_job_driver.yaml COPY interop_binaries/config/collect_job_driver.yaml /etc/janus/collect_job_driver.yaml -COPY interop_binaries/config/janus_cli.yaml /etc/janus/janus_cli.yaml COPY --from=builder /janus_interop_aggregator /usr/local/bin/janus_interop_aggregator COPY --from=builder /aggregation_job_creator /usr/local/bin/aggregation_job_creator COPY --from=builder /aggregation_job_driver /usr/local/bin/aggregation_job_driver COPY --from=builder /collect_job_driver /usr/local/bin/collect_job_driver -COPY --from=builder /janus_cli /usr/local/bin/janus_cli EXPOSE 8080 ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/janus/supervisord.conf"] diff --git a/aggregator/Cargo.toml b/aggregator/Cargo.toml index 44604c151..ba9acef1d 100644 --- a/aggregator/Cargo.toml +++ b/aggregator/Cargo.toml @@ -23,6 +23,7 @@ test-util = [ "janus_core/test-util", "janus_messages/test-util", "dep:lazy_static", + "dep:sqlx", "dep:testcontainers", ] kube-rustls = ["kube/rustls-tls"] @@ -68,6 +69,7 @@ serde_json = "1.0.95" serde_yaml = "0.9.19" signal-hook = "0.3.15" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } +sqlx = { version = "0.6.3", optional = true, features = ["runtime-tokio-rustls", "migrate", "postgres"] } testcontainers = { version = "0.14.0", optional = true } thiserror = "1.0" tokio = { version = "1.27", features = ["full", "tracing"] } diff --git a/aggregator/src/datastore.rs b/aggregator/src/datastore.rs index 7aee8bc21..665012218 100644 --- a/aggregator/src/datastore.rs +++ b/aggregator/src/datastore.rs @@ -4900,8 +4900,10 @@ pub mod test_util { use lazy_static::lazy_static; use rand::{distributions::Standard, thread_rng, Rng}; use ring::aead::{LessSafeKey, UnboundKey, AES_128_GCM}; + use sqlx::{migrate::Migrator, Connection, PgConnection}; use std::{ env::{self, VarError}, + path::PathBuf, process::Command, str::FromStr, }; @@ -4909,9 +4911,6 @@ pub mod test_util { use tokio_postgres::{Config, NoTls}; use tracing::trace; - /// The Janus database schema. - pub const SCHEMA: &str = include_str!("../../db/schema.sql"); - lazy_static! { static ref CONTAINER_CLIENT: testcontainers::clients::Cli = testcontainers::clients::Cli::default(); @@ -5071,8 +5070,20 @@ pub mod test_util { /// Dropping the second return value causes the database to be shut down & cleaned up. pub async fn ephemeral_datastore(clock: C) -> (Datastore, DbHandle) { let db_handle = ephemeral_db_handle(); - let client = db_handle.pool().get().await.unwrap(); - client.batch_execute(SCHEMA).await.unwrap(); + + let mut connection = PgConnection::connect(&db_handle.connection_string) + .await + .unwrap(); + + // We deliberately avoid using sqlx::migrate! or other compile-time macros to ensure that + // changes to the migration scripts will be picked up by every run of the tests. + let migrations_path = PathBuf::from_str(env!("CARGO_MANIFEST_DIR")) + .unwrap() + .join("..") + .join("db"); + let migrator = Migrator::new(migrations_path).await.unwrap(); + migrator.run(&mut connection).await.unwrap(); + (db_handle.datastore(clock), db_handle) } diff --git a/db/20230405185602_initial-schema.down.sql b/db/20230405185602_initial-schema.down.sql new file mode 100644 index 000000000..3b70bf57a --- /dev/null +++ b/db/20230405185602_initial-schema.down.sql @@ -0,0 +1,27 @@ +DROP TABLE IF EXISTS outstanding_batches CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregate_share_jobs_interval_containment_index CASCADE; +DROP TABLE IF EXISTS aggregate_share_jobs CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS collect_jobs_interval_containment_index CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS collect_jobs_lease_expiry CASCADE; +DROP TABLE IF EXISTS collect_jobs CASCADE; +DROP TYPE IF EXISTS COLLECT_JOB_STATE CASCADE; +DROP TABLE IF EXISTS batch_aggregations CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS report_aggregations_client_report_id_index CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS report_aggregations_aggregation_job_id_index CASCADE; +DROP TABLE IF EXISTS report_aggregations CASCADE; +DROP TYPE IF EXISTS REPORT_AGGREGATION_STATE CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregation_jobs_task_and_batch_interval CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregation_jobs_task_and_batch_id CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregation_jobs_state_and_lease_expiry CASCADE; +DROP TABLE IF EXISTS aggregation_jobs CASCADE; +DROP TYPE IF EXISTS AGGREGATION_JOB_STATE CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS client_reports_task_and_timestamp_index CASCADE; +DROP TABLE IF EXISTS client_reports CASCADE; +DROP TABLE IF EXISTS task_vdaf_verify_keys CASCADE; +DROP TABLE IF EXISTS task_hpke_keys CASCADE; +DROP TABLE IF EXISTS task_collector_auth_tokens CASCADE; +DROP TABLE IF EXISTS task_aggregator_auth_tokens CASCADE; +DROP TABLE IF EXISTS tasks CASCADE; +DROP TYPE IF EXISTS AGGREGATOR_ROLE CASCADE; +DROP EXTENSION IF EXISTS btree_gist CASCADE; +DROP EXTENSION IF EXISTS pgcrypto CASCADE; diff --git a/db/20230405185602_initial-schema.up.sql b/db/20230405185602_initial-schema.up.sql new file mode 100644 index 000000000..6add1f5c7 --- /dev/null +++ b/db/20230405185602_initial-schema.up.sql @@ -0,0 +1,221 @@ +-- Load pgcrypto for gen_random_bytes. +CREATE EXTENSION pgcrypto; +-- Load an extension to allow indexing over both BIGINT and TSRANGE in a multicolumn GiST index. +CREATE EXTENSION btree_gist; + +-- Identifies which aggregator role is being played for this task. +CREATE TYPE AGGREGATOR_ROLE AS ENUM( + 'LEADER', + 'HELPER' +); + +-- Corresponds to a DAP task, containing static data associated with the task. +CREATE TABLE tasks( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BYTEA UNIQUE NOT NULL, -- 32-byte TaskID as defined by the DAP specification + aggregator_role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator for this task + aggregator_endpoints TEXT[] NOT NULL, -- aggregator HTTPS endpoints, leader first + query_type JSONB NOT NULL, -- the query type in use for this task, along with its parameters + vdaf JSON NOT NULL, -- the VDAF instance in use for this task, along with its parameters + max_batch_query_count BIGINT NOT NULL, -- the maximum number of times a given batch may be collected + task_expiration TIMESTAMP NOT NULL, -- the time after which client reports are no longer accepted + report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled. + min_batch_size BIGINT NOT NULL, -- the minimum number of reports in a batch to allow it to be collected + time_precision BIGINT NOT NULL, -- the duration to which clients are expected to round their report timestamps, in seconds + tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds + collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) + input_share_aad_public_share_length_prefix BOOLEAN NOT NULL DEFAULT false -- selects which input share AAD format should be used (true: includes a length prefix for the public share, akin to DAP-03, false: no length prefix for the public share) +); + +-- The aggregator authentication tokens used by a given task. +CREATE TABLE task_aggregator_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages to/from the other aggregator (encrypted) + + CONSTRAINT aggregator_auth_token_unique_task_id_and_ord UNIQUE(task_id, ord), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- The collector authentication tokens used by a given task. +CREATE TABLE task_collector_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages from the collector (encrypted) + + CONSTRAINT collector_auth_token_unique_task_id_and_ord UNIQUE(task_id, ord), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- The HPKE public keys (aka configs) and private keys used by a given task. +CREATE TABLE task_hpke_keys( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the HPKE key is associated with + config_id SMALLINT NOT NULL, -- HPKE config ID + config BYTEA NOT NULL, -- HPKE config, including public key (encoded HpkeConfig message) + private_key BYTEA NOT NULL, -- private key (encrypted) + + CONSTRAINT unique_task_id_and_config_id UNIQUE(task_id, config_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- The VDAF verification keys used by a given task. +-- TODO(#229): support multiple verification parameters per task +CREATE TABLE task_vdaf_verify_keys( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the verification key is associated with + vdaf_verify_key BYTEA NOT NULL, -- the VDAF verification key (encrypted) + + CONSTRAINT vdaf_verify_key_unique_task_id UNIQUE(task_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- Individual reports received from clients. +CREATE TABLE client_reports( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the report is associated with + report_id BYTEA NOT NULL, -- 16-byte ReportID as defined by the DAP specification + client_timestamp TIMESTAMP NOT NULL, -- report timestamp, from client + extensions BYTEA, -- encoded sequence of Extension messages (populated for leader only) + public_share BYTEA, -- encoded public share (opaque VDAF message, populated for leader only) + leader_input_share BYTEA, -- encoded, decrypted leader input share (populated for leader only) + helper_encrypted_input_share BYTEA, -- encdoed HpkeCiphertext message containing the helper's input share (populated for leader only) + + CONSTRAINT unique_task_id_and_report_id UNIQUE(task_id, report_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +CREATE INDEX client_reports_task_and_timestamp_index ON client_reports(task_id, client_timestamp); + +-- Specifies the possible state of an aggregation job. +CREATE TYPE AGGREGATION_JOB_STATE AS ENUM( + 'IN_PROGRESS', -- at least one included report is in a non-terminal (START, WAITING) state, processing can continue + 'FINISHED', -- all reports have reached a terminal state (FINISHED, FAILED, INVALID) + 'ABANDONED' -- we have given up on the aggregation job entirely +); + +-- An aggregation job, representing the aggregation of a number of client reports. +CREATE TABLE aggregation_jobs( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- ID of related task + aggregation_job_id BYTEA NOT NULL, -- 32-byte AggregationJobID as defined by the DAP specification + batch_identifier BYTEA, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector; present for leader tasks and fixed size tasks, NULL otherwise) + batch_interval TSRANGE, -- batch interval, as a TSRANGE, populated only for leader time-interval tasks. (will match batch_identifier when present) + aggregation_param BYTEA NOT NULL, -- encoded aggregation parameter (opaque VDAF message) + state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job + + lease_expiry TIMESTAMP NOT NULL DEFAULT TIMESTAMP '-infinity', -- when lease on this aggregation job expires; -infinity implies no current lease + lease_token BYTEA, -- a value identifying the current leaseholder; NULL implies no current lease + lease_attempts BIGINT NOT NULL DEFAULT 0, -- the number of lease acquiries since the last successful lease release + + CONSTRAINT unique_aggregation_job_id UNIQUE(aggregation_job_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +CREATE INDEX aggregation_jobs_state_and_lease_expiry ON aggregation_jobs(state, lease_expiry) WHERE state = 'IN_PROGRESS'; +CREATE INDEX aggregation_jobs_task_and_batch_id ON aggregation_jobs(task_id, batch_identifier); +CREATE INDEX aggregation_jobs_task_and_batch_interval ON aggregation_jobs USING gist (task_id, batch_interval) WHERE batch_interval IS NOT NULL; + +-- Specifies the possible state of aggregating a single report. +CREATE TYPE REPORT_AGGREGATION_STATE AS ENUM( + 'START', -- the aggregator is waiting to decrypt its input share & compute initial preparation state + 'WAITING', -- the aggregator is waiting for a message from its peer before proceeding + 'FINISHED', -- the aggregator has completed the preparation process and recovered an output share + 'FAILED', -- an error has occurred and an output share cannot be recovered + 'INVALID' -- an aggregator received an unexpected message +); + +-- An aggregation attempt for a single client report. An aggregation job logically contains a number +-- of report aggregations. A single client report might be aggregated in multiple aggregation jobs & +-- therefore have multiple associated report aggregations. +CREATE TABLE report_aggregations( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + aggregation_job_id BIGINT NOT NULL, -- the aggregation job ID this report aggregation is associated with + client_report_id BIGINT NOT NULL, -- the client report ID this report aggregation is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of client reports in the aggregation job + state REPORT_AGGREGATION_STATE NOT NULL, -- the current state of this report aggregation + prep_state BYTEA, -- the current preparation state (opaque VDAF message, only if in state WAITING) + prep_msg BYTEA, -- the next preparation message to be sent to the helper (opaque VDAF message, only if in state WAITING if this aggregator is the leader) + out_share BYTEA, -- the output share (opaque VDAF message, only if in state FINISHED) + error_code SMALLINT, -- error code corresponding to a DAP ReportShareError value; null if in a state other than FAILED + + CONSTRAINT unique_ord UNIQUE(aggregation_job_id, ord), + CONSTRAINT fk_aggregation_job_id FOREIGN KEY(aggregation_job_id) REFERENCES aggregation_jobs(id), + CONSTRAINT fk_client_report_id FOREIGN KEY(client_report_id) REFERENCES client_reports(id) +); +CREATE INDEX report_aggregations_aggregation_job_id_index ON report_aggregations(aggregation_job_id); +CREATE INDEX report_aggregations_client_report_id_index ON report_aggregations(client_report_id); + +-- Information on aggregation for a single batch. This information may be incremental if the VDAF +-- supports incremental aggregation. +CREATE TABLE batch_aggregations( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- the task ID + batch_identifier BYTEA NOT NULL, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector) + aggregation_param BYTEA NOT NULL, -- the aggregation parameter (opaque VDAF message) + aggregate_share BYTEA NOT NULL, -- the (possibly-incremental) aggregate share + report_count BIGINT NOT NULL, -- the (possibly-incremental) client report count + checksum BYTEA NOT NULL, -- the (possibly-incremental) checksum + + CONSTRAINT unique_task_id_interval_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- Specifies the possible state of a collect job. +CREATE TYPE COLLECT_JOB_STATE AS ENUM( + 'START', -- the aggregator is waiting to run this collect job + 'FINISHED', -- this collect job has run successfully and is ready for collection + 'ABANDONED', -- this collect job has been abandoned & will never be run again + 'DELETED' -- this collect job has been deleted +); + +-- The leader's view of collect requests from the Collector. +CREATE TABLE collect_jobs( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + collect_job_id UUID NOT NULL, -- UUID used by collector to refer to this job + task_id BIGINT NOT NULL, -- the task ID being collected + batch_identifier BYTEA NOT NULL, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector) + batch_interval TSRANGE, -- batch interval, as a TSRANGE, populated only for time-interval tasks. (will always match batch_identifier) + aggregation_param BYTEA NOT NULL, -- the aggregation parameter (opaque VDAF message) + state COLLECT_JOB_STATE NOT NULL, -- the current state of this collect job + report_count BIGINT, -- the number of reports included in this collect job (only if in state FINISHED) + helper_aggregate_share BYTEA, -- the helper's encrypted aggregate share (HpkeCiphertext, only if in state FINISHED) + leader_aggregate_share BYTEA, -- the leader's unencrypted aggregate share (opaque VDAF message, only if in state FINISHED) + + lease_expiry TIMESTAMP NOT NULL DEFAULT TIMESTAMP '-infinity', -- when lease on this collect job expires; -infinity implies no current lease + lease_token BYTEA, -- a value identifying the current leaseholder; NULL implies no current lease + lease_attempts BIGINT NOT NULL DEFAULT 0, -- the number of lease acquiries since the last successful lease release + + CONSTRAINT unique_collect_job_task_id_interval_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +-- TODO(#224): verify that this index is optimal for purposes of acquiring collect jobs. +CREATE INDEX collect_jobs_lease_expiry ON collect_jobs(lease_expiry); +CREATE INDEX collect_jobs_interval_containment_index ON collect_jobs USING gist (task_id, batch_interval); + +-- The helper's view of aggregate share jobs. +CREATE TABLE aggregate_share_jobs( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- the task ID being collected + batch_identifier BYTEA NOT NULL, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector) + batch_interval TSRANGE, -- batch interval, as a TSRANGE, populated only for time-interval tasks. (will always match batch_identifier) + aggregation_param BYTEA NOT NULL, -- the aggregation parameter (opaque VDAF message) + helper_aggregate_share BYTEA NOT NULL, -- the helper's unencrypted aggregate share + report_count BIGINT NOT NULL, -- the count of reports included helper_aggregate_share + checksum BYTEA NOT NULL, -- the checksum over the reports included in helper_aggregate_share + + CONSTRAINT unique_aggregate_share_job_task_id_interval_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +CREATE INDEX aggregate_share_jobs_interval_containment_index ON aggregate_share_jobs USING gist (task_id, batch_interval); + +-- The leader's view of outstanding batches, which are batches which have not yet started +-- collection. Used for fixed-size tasks only. +CREATE TABLE outstanding_batches( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- the task ID containing the batch + batch_id BYTEA NOT NULL, -- 32-byte BatchID as defined by the DAP specification. + + CONSTRAINT unique_task_id_batch_id UNIQUE(task_id, batch_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); diff --git a/interop_binaries/Cargo.toml b/interop_binaries/Cargo.toml index cfbacc7b8..5b2d5b514 100644 --- a/interop_binaries/Cargo.toml +++ b/interop_binaries/Cargo.toml @@ -42,6 +42,7 @@ reqwest = { version = "0.11.16", default-features = false, features = ["rustls-t ring = "0.16.20" serde = { version = "1.0.159", features = ["derive"] } serde_json = "1.0.95" +sqlx = { version = "0.6.3", features = ["runtime-tokio-rustls", "migrate", "postgres"] } testcontainers = { version = "0.14" } tokio = { version = "1.27", features = ["full", "tracing"] } tracing = "0.1.37" diff --git a/interop_binaries/build.rs b/interop_binaries/build.rs index 37e0991d9..4cfc179b4 100644 --- a/interop_binaries/build.rs +++ b/interop_binaries/build.rs @@ -36,7 +36,7 @@ fn main() { println!("cargo:rerun-if-changed=../aggregator"); println!("cargo:rerun-if-changed=../client"); println!("cargo:rerun-if-changed=../core"); - println!("cargo:rerun-if-changed=../db/schema.sql"); + println!("cargo:rerun-if-changed=../db"); println!("cargo:rerun-if-changed=../integration_tests"); println!("cargo:rerun-if-changed=../interop_binaries"); diff --git a/interop_binaries/config/janus_cli.yaml b/interop_binaries/config/janus_cli.yaml deleted file mode 100644 index 10a655bb1..000000000 --- a/interop_binaries/config/janus_cli.yaml +++ /dev/null @@ -1,4 +0,0 @@ -database: - url: postgres://postgres@127.0.0.1:5432/postgres -logging_config: - force_json_output: true diff --git a/interop_binaries/config/janus_interop_aggregator.yaml b/interop_binaries/config/janus_interop_aggregator.yaml index 7ff5cb9c5..dbaf77776 100644 --- a/interop_binaries/config/janus_interop_aggregator.yaml +++ b/interop_binaries/config/janus_interop_aggregator.yaml @@ -4,3 +4,4 @@ health_check_listen_address: 0.0.0.0:8000 logging_config: force_json_output: true listen_address: 0.0.0.0:8080 +sql_migrations_source: /etc/janus/migrations diff --git a/interop_binaries/setup.sh b/interop_binaries/setup.sh index 188753bfe..77fe0575b 100755 --- a/interop_binaries/setup.sh +++ b/interop_binaries/setup.sh @@ -1,5 +1,4 @@ #!/bin/bash -/usr/local/bin/janus_cli write-schema --config-file /etc/janus/janus_cli.yaml /usr/bin/supervisorctl -c /etc/janus/supervisord.conf start janus_interop_aggregator /usr/bin/supervisorctl -c /etc/janus/supervisord.conf start aggregation_job_creator /usr/bin/supervisorctl -c /etc/janus/supervisord.conf start aggregation_job_driver diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index 74d5b8fa2..bd899b833 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -17,9 +17,11 @@ use janus_interop_binaries::{ use janus_messages::{BatchId, Duration, HpkeConfig, TaskId, Time}; use prio::codec::Decode; use serde::{Deserialize, Serialize}; +use sqlx::{migrate::Migrator, Connection, PgConnection}; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, + path::PathBuf, sync::Arc, }; use tokio::sync::Mutex; @@ -316,6 +318,9 @@ struct Config { /// Path prefix, e.g. `/dap/`, to serve DAP from. #[serde(default = "default_dap_serving_prefix")] dap_serving_prefix: String, + + /// Path at which `sqlx` migration files can be found. Migrations will be applied at startup. + sql_migrations_source: PathBuf, } impl BinaryConfig for Config { @@ -333,6 +338,12 @@ async fn main() -> anyhow::Result<()> { janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { let datastore = Arc::new(ctx.datastore); + // Apply SQL migrations to database + let mut connection = + PgConnection::connect(ctx.config.common_config.database.url.as_str()).await?; + let migrator = Migrator::new(ctx.config.sql_migrations_source).await?; + migrator.run(&mut connection).await?; + // Run an HTTP server with both the DAP aggregator endpoints and the interoperation test // endpoints. let filter = make_filter(Arc::clone(&datastore), ctx.config.dap_serving_prefix)?;