diff --git a/.github/workflows/publish-sql-schema.yml b/.github/workflows/publish-sql-schema.yml index 4fd915c4e..d8945c3f1 100644 --- a/.github/workflows/publish-sql-schema.yml +++ b/.github/workflows/publish-sql-schema.yml @@ -29,4 +29,7 @@ jobs: id: get_version run: echo VERSION=${GITHUB_REF/refs\/tags\//} >> $GITHUB_OUTPUT - name: "Upload schema file(s)" - run: gcloud alpha storage cp db/schema.sql gs://janus-artifacts-sql-schemas/${{ steps.get_version.outputs.VERSION }}/schema.sql + run: |- + gcloud alpha storage cp --recursive \ + db \ + gs://janus-artifacts-sql-schemas/${{ steps.get_version.outputs.VERSION }}/ diff --git a/Cargo.lock b/Cargo.lock index db4a809b1..f3a1d9c47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.19" @@ -166,7 +177,7 @@ checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -180,6 +191,15 @@ dependencies = [ "syn 2.0.10", ] +[[package]] +name = "atoi" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -567,6 +587,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" + [[package]] name = "crc32fast" version = "1.3.2" @@ -611,6 +646,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.11" @@ -693,7 +738,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -704,7 +749,7 @@ checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ "darling_core", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -717,7 +762,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.3", ] [[package]] @@ -780,7 +825,7 @@ checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -803,6 +848,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -813,6 +867,17 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -824,6 +889,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dunce" version = "1.0.2" @@ -897,6 +968,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1034,6 +1111,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.11.2", +] + [[package]] name = "futures-io" version = "0.3.28" @@ -1164,6 +1252,18 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown", +] [[package]] name = "hdrhistogram" @@ -1217,6 +1317,9 @@ name = "heck" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -1400,7 +1503,7 @@ dependencies = [ "once_cell", "openssl", "openssl-sys", - "parking_lot", + "parking_lot 0.12.1", "tokio", "tokio-openssl", "tower-layer", @@ -1587,6 +1690,7 @@ dependencies = [ "serde_yaml 0.9.19", "signal-hook", "signal-hook-tokio", + "sqlx", "tempfile", "testcontainers", "thiserror", @@ -1758,6 +1862,7 @@ dependencies = [ "ring", "serde", "serde_json", + "sqlx", "testcontainers", "tokio", "tracing", @@ -2169,7 +2274,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2219,7 +2324,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2401,6 +2506,17 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -2408,7 +2524,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.3", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -2424,6 +2554,12 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "paste" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" + [[package]] name = "pem" version = "1.1.0" @@ -2484,7 +2620,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2545,7 +2681,7 @@ checksum = "d0c2c18e40b92144b05e6f3ae9d1ee931f0d1afa9410ac8b97486c6eaaf91201" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2596,7 +2732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a49e86d2c26a24059894a3afa13fd17d063419b05dfb83f06d9c3566060c3f5a" dependencies = [ "proc-macro2", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -2638,7 +2774,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "version_check", ] @@ -2672,7 +2808,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot", + "parking_lot 0.12.1", "protobuf", "thiserror", ] @@ -2717,7 +2853,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3182,7 +3318,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3352,6 +3488,110 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "sqlformat" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" +dependencies = [ + "itertools", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8de3b03a925878ed54a954f621e64bf55a3c1bd29652d0d1a17830405350188" +dependencies = [ + "sqlx-core", + "sqlx-macros", +] + +[[package]] +name = "sqlx-core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa8241483a83a3f33aa5fff7e7d9def398ff9990b2752b6c6112b83c6d246029" +dependencies = [ + "ahash", + "atoi", + "base64 0.13.1", + "bitflags", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "dirs", + "dotenvy", + "either", + "event-listener", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-util", + "hashlink", + "hex", + "hkdf", + "hmac", + "indexmap", + "itoa", + "libc", + "log", + "md-5", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "rand", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "sqlformat", + "sqlx-rt", + "stringprep", + "thiserror", + "tokio-stream", + "url", + "webpki-roots", + "whoami", +] + +[[package]] +name = "sqlx-macros" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.0", + "once_cell", + "proc-macro2", + "quote", + "sha2", + "sqlx-core", + "sqlx-rt", + "syn 1.0.109", + "url", +] + +[[package]] +name = "sqlx-rt" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024" +dependencies = [ + "once_cell", + "tokio", + "tokio-rustls", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -3401,7 +3641,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3412,9 +3652,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.104" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae548ec36cf198c0ef7710d3c230987c2d6d7bd98ad6edc0274462724c585ce" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -3446,7 +3686,7 @@ checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "unicode-xid", ] @@ -3604,7 +3844,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -3659,7 +3899,7 @@ dependencies = [ "futures-channel", "futures-util", "log", - "parking_lot", + "parking_lot 0.12.1", "percent-encoding", "phf", "pin-project-lite", @@ -3794,7 +4034,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -3871,7 +4111,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", ] [[package]] @@ -4060,6 +4300,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.0" @@ -4234,7 +4480,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "wasm-bindgen-shared", ] @@ -4268,7 +4514,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4319,6 +4565,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "whoami" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c70234412ca409cc04e864e89523cb0fc37f5e1344ebed5a3ebf4192b6b9f68" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -4520,7 +4776,7 @@ checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" dependencies = [ "proc-macro2", "quote", - "syn 1.0.104", + "syn 1.0.109", "synstructure", ] diff --git a/Dockerfile.interop b/Dockerfile.interop index 868a60ef2..2bcbfc899 100644 --- a/Dockerfile.interop +++ b/Dockerfile.interop @@ -22,7 +22,6 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry --mount=type=cache,targe FROM alpine:3.17.3 ARG BINARY RUN mkdir /logs -COPY --from=builder /src/db/schema.sql /db/schema.sql COPY --from=builder /$BINARY /$BINARY EXPOSE 8080 # Store the build argument in an environment variable so we can reference it diff --git a/Dockerfile.interop_aggregator b/Dockerfile.interop_aggregator index fff750664..6a0885bfd 100644 --- a/Dockerfile.interop_aggregator +++ b/Dockerfile.interop_aggregator @@ -25,27 +25,24 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ cargo build --profile $PROFILE -p janus_aggregator \ --bin aggregation_job_creator \ --bin aggregation_job_driver \ - --bin collect_job_driver \ - --bin janus_cli && \ + --bin collect_job_driver && \ cp /src/target/$PROFILE/aggregation_job_creator /aggregation_job_creator && \ cp /src/target/$PROFILE/aggregation_job_driver /aggregation_job_driver && \ - cp /src/target/$PROFILE/collect_job_driver /collect_job_driver && \ - cp /src/target/$PROFILE/janus_cli /janus_cli + cp /src/target/$PROFILE/collect_job_driver /collect_job_driver FROM postgres:14-alpine RUN mkdir /logs && mkdir /etc/janus RUN apk add --update supervisor && rm -rf /tmp/* /var/cache/apk/* +COPY db /etc/janus/migrations COPY interop_binaries/setup.sh /usr/local/bin/setup.sh COPY interop_binaries/config/supervisord.conf /etc/janus/supervisord.conf COPY interop_binaries/config/janus_interop_aggregator.yaml /etc/janus/janus_interop_aggregator.yaml COPY interop_binaries/config/aggregation_job_creator.yaml /etc/janus/aggregation_job_creator.yaml COPY interop_binaries/config/aggregation_job_driver.yaml /etc/janus/aggregation_job_driver.yaml COPY interop_binaries/config/collect_job_driver.yaml /etc/janus/collect_job_driver.yaml -COPY interop_binaries/config/janus_cli.yaml /etc/janus/janus_cli.yaml COPY --from=builder /janus_interop_aggregator /usr/local/bin/janus_interop_aggregator COPY --from=builder /aggregation_job_creator /usr/local/bin/aggregation_job_creator COPY --from=builder /aggregation_job_driver /usr/local/bin/aggregation_job_driver COPY --from=builder /collect_job_driver /usr/local/bin/collect_job_driver -COPY --from=builder /janus_cli /usr/local/bin/janus_cli EXPOSE 8080 ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/janus/supervisord.conf"] diff --git a/aggregator/Cargo.toml b/aggregator/Cargo.toml index 44604c151..ba9acef1d 100644 --- a/aggregator/Cargo.toml +++ b/aggregator/Cargo.toml @@ -23,6 +23,7 @@ test-util = [ "janus_core/test-util", "janus_messages/test-util", "dep:lazy_static", + "dep:sqlx", "dep:testcontainers", ] kube-rustls = ["kube/rustls-tls"] @@ -68,6 +69,7 @@ serde_json = "1.0.95" serde_yaml = "0.9.19" signal-hook = "0.3.15" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } +sqlx = { version = "0.6.3", optional = true, features = ["runtime-tokio-rustls", "migrate", "postgres"] } testcontainers = { version = "0.14.0", optional = true } thiserror = "1.0" tokio = { version = "1.27", features = ["full", "tracing"] } diff --git a/aggregator/src/datastore.rs b/aggregator/src/datastore.rs index 7aee8bc21..665012218 100644 --- a/aggregator/src/datastore.rs +++ b/aggregator/src/datastore.rs @@ -4900,8 +4900,10 @@ pub mod test_util { use lazy_static::lazy_static; use rand::{distributions::Standard, thread_rng, Rng}; use ring::aead::{LessSafeKey, UnboundKey, AES_128_GCM}; + use sqlx::{migrate::Migrator, Connection, PgConnection}; use std::{ env::{self, VarError}, + path::PathBuf, process::Command, str::FromStr, }; @@ -4909,9 +4911,6 @@ pub mod test_util { use tokio_postgres::{Config, NoTls}; use tracing::trace; - /// The Janus database schema. - pub const SCHEMA: &str = include_str!("../../db/schema.sql"); - lazy_static! { static ref CONTAINER_CLIENT: testcontainers::clients::Cli = testcontainers::clients::Cli::default(); @@ -5071,8 +5070,20 @@ pub mod test_util { /// Dropping the second return value causes the database to be shut down & cleaned up. pub async fn ephemeral_datastore(clock: C) -> (Datastore, DbHandle) { let db_handle = ephemeral_db_handle(); - let client = db_handle.pool().get().await.unwrap(); - client.batch_execute(SCHEMA).await.unwrap(); + + let mut connection = PgConnection::connect(&db_handle.connection_string) + .await + .unwrap(); + + // We deliberately avoid using sqlx::migrate! or other compile-time macros to ensure that + // changes to the migration scripts will be picked up by every run of the tests. + let migrations_path = PathBuf::from_str(env!("CARGO_MANIFEST_DIR")) + .unwrap() + .join("..") + .join("db"); + let migrator = Migrator::new(migrations_path).await.unwrap(); + migrator.run(&mut connection).await.unwrap(); + (db_handle.datastore(clock), db_handle) } diff --git a/db/20230405185602_initial-schema.down.sql b/db/20230405185602_initial-schema.down.sql new file mode 100644 index 000000000..3b70bf57a --- /dev/null +++ b/db/20230405185602_initial-schema.down.sql @@ -0,0 +1,27 @@ +DROP TABLE IF EXISTS outstanding_batches CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregate_share_jobs_interval_containment_index CASCADE; +DROP TABLE IF EXISTS aggregate_share_jobs CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS collect_jobs_interval_containment_index CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS collect_jobs_lease_expiry CASCADE; +DROP TABLE IF EXISTS collect_jobs CASCADE; +DROP TYPE IF EXISTS COLLECT_JOB_STATE CASCADE; +DROP TABLE IF EXISTS batch_aggregations CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS report_aggregations_client_report_id_index CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS report_aggregations_aggregation_job_id_index CASCADE; +DROP TABLE IF EXISTS report_aggregations CASCADE; +DROP TYPE IF EXISTS REPORT_AGGREGATION_STATE CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregation_jobs_task_and_batch_interval CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregation_jobs_task_and_batch_id CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS aggregation_jobs_state_and_lease_expiry CASCADE; +DROP TABLE IF EXISTS aggregation_jobs CASCADE; +DROP TYPE IF EXISTS AGGREGATION_JOB_STATE CASCADE; +DROP INDEX CONCURRENTLY IF EXISTS client_reports_task_and_timestamp_index CASCADE; +DROP TABLE IF EXISTS client_reports CASCADE; +DROP TABLE IF EXISTS task_vdaf_verify_keys CASCADE; +DROP TABLE IF EXISTS task_hpke_keys CASCADE; +DROP TABLE IF EXISTS task_collector_auth_tokens CASCADE; +DROP TABLE IF EXISTS task_aggregator_auth_tokens CASCADE; +DROP TABLE IF EXISTS tasks CASCADE; +DROP TYPE IF EXISTS AGGREGATOR_ROLE CASCADE; +DROP EXTENSION IF EXISTS btree_gist CASCADE; +DROP EXTENSION IF EXISTS pgcrypto CASCADE; diff --git a/db/20230405185602_initial-schema.up.sql b/db/20230405185602_initial-schema.up.sql new file mode 100644 index 000000000..6add1f5c7 --- /dev/null +++ b/db/20230405185602_initial-schema.up.sql @@ -0,0 +1,221 @@ +-- Load pgcrypto for gen_random_bytes. +CREATE EXTENSION pgcrypto; +-- Load an extension to allow indexing over both BIGINT and TSRANGE in a multicolumn GiST index. +CREATE EXTENSION btree_gist; + +-- Identifies which aggregator role is being played for this task. +CREATE TYPE AGGREGATOR_ROLE AS ENUM( + 'LEADER', + 'HELPER' +); + +-- Corresponds to a DAP task, containing static data associated with the task. +CREATE TABLE tasks( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BYTEA UNIQUE NOT NULL, -- 32-byte TaskID as defined by the DAP specification + aggregator_role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator for this task + aggregator_endpoints TEXT[] NOT NULL, -- aggregator HTTPS endpoints, leader first + query_type JSONB NOT NULL, -- the query type in use for this task, along with its parameters + vdaf JSON NOT NULL, -- the VDAF instance in use for this task, along with its parameters + max_batch_query_count BIGINT NOT NULL, -- the maximum number of times a given batch may be collected + task_expiration TIMESTAMP NOT NULL, -- the time after which client reports are no longer accepted + report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled. + min_batch_size BIGINT NOT NULL, -- the minimum number of reports in a batch to allow it to be collected + time_precision BIGINT NOT NULL, -- the duration to which clients are expected to round their report timestamps, in seconds + tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds + collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) + input_share_aad_public_share_length_prefix BOOLEAN NOT NULL DEFAULT false -- selects which input share AAD format should be used (true: includes a length prefix for the public share, akin to DAP-03, false: no length prefix for the public share) +); + +-- The aggregator authentication tokens used by a given task. +CREATE TABLE task_aggregator_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages to/from the other aggregator (encrypted) + + CONSTRAINT aggregator_auth_token_unique_task_id_and_ord UNIQUE(task_id, ord), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- The collector authentication tokens used by a given task. +CREATE TABLE task_collector_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages from the collector (encrypted) + + CONSTRAINT collector_auth_token_unique_task_id_and_ord UNIQUE(task_id, ord), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- The HPKE public keys (aka configs) and private keys used by a given task. +CREATE TABLE task_hpke_keys( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the HPKE key is associated with + config_id SMALLINT NOT NULL, -- HPKE config ID + config BYTEA NOT NULL, -- HPKE config, including public key (encoded HpkeConfig message) + private_key BYTEA NOT NULL, -- private key (encrypted) + + CONSTRAINT unique_task_id_and_config_id UNIQUE(task_id, config_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- The VDAF verification keys used by a given task. +-- TODO(#229): support multiple verification parameters per task +CREATE TABLE task_vdaf_verify_keys( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the verification key is associated with + vdaf_verify_key BYTEA NOT NULL, -- the VDAF verification key (encrypted) + + CONSTRAINT vdaf_verify_key_unique_task_id UNIQUE(task_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- Individual reports received from clients. +CREATE TABLE client_reports( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- task ID the report is associated with + report_id BYTEA NOT NULL, -- 16-byte ReportID as defined by the DAP specification + client_timestamp TIMESTAMP NOT NULL, -- report timestamp, from client + extensions BYTEA, -- encoded sequence of Extension messages (populated for leader only) + public_share BYTEA, -- encoded public share (opaque VDAF message, populated for leader only) + leader_input_share BYTEA, -- encoded, decrypted leader input share (populated for leader only) + helper_encrypted_input_share BYTEA, -- encdoed HpkeCiphertext message containing the helper's input share (populated for leader only) + + CONSTRAINT unique_task_id_and_report_id UNIQUE(task_id, report_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +CREATE INDEX client_reports_task_and_timestamp_index ON client_reports(task_id, client_timestamp); + +-- Specifies the possible state of an aggregation job. +CREATE TYPE AGGREGATION_JOB_STATE AS ENUM( + 'IN_PROGRESS', -- at least one included report is in a non-terminal (START, WAITING) state, processing can continue + 'FINISHED', -- all reports have reached a terminal state (FINISHED, FAILED, INVALID) + 'ABANDONED' -- we have given up on the aggregation job entirely +); + +-- An aggregation job, representing the aggregation of a number of client reports. +CREATE TABLE aggregation_jobs( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- ID of related task + aggregation_job_id BYTEA NOT NULL, -- 32-byte AggregationJobID as defined by the DAP specification + batch_identifier BYTEA, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector; present for leader tasks and fixed size tasks, NULL otherwise) + batch_interval TSRANGE, -- batch interval, as a TSRANGE, populated only for leader time-interval tasks. (will match batch_identifier when present) + aggregation_param BYTEA NOT NULL, -- encoded aggregation parameter (opaque VDAF message) + state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job + + lease_expiry TIMESTAMP NOT NULL DEFAULT TIMESTAMP '-infinity', -- when lease on this aggregation job expires; -infinity implies no current lease + lease_token BYTEA, -- a value identifying the current leaseholder; NULL implies no current lease + lease_attempts BIGINT NOT NULL DEFAULT 0, -- the number of lease acquiries since the last successful lease release + + CONSTRAINT unique_aggregation_job_id UNIQUE(aggregation_job_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +CREATE INDEX aggregation_jobs_state_and_lease_expiry ON aggregation_jobs(state, lease_expiry) WHERE state = 'IN_PROGRESS'; +CREATE INDEX aggregation_jobs_task_and_batch_id ON aggregation_jobs(task_id, batch_identifier); +CREATE INDEX aggregation_jobs_task_and_batch_interval ON aggregation_jobs USING gist (task_id, batch_interval) WHERE batch_interval IS NOT NULL; + +-- Specifies the possible state of aggregating a single report. +CREATE TYPE REPORT_AGGREGATION_STATE AS ENUM( + 'START', -- the aggregator is waiting to decrypt its input share & compute initial preparation state + 'WAITING', -- the aggregator is waiting for a message from its peer before proceeding + 'FINISHED', -- the aggregator has completed the preparation process and recovered an output share + 'FAILED', -- an error has occurred and an output share cannot be recovered + 'INVALID' -- an aggregator received an unexpected message +); + +-- An aggregation attempt for a single client report. An aggregation job logically contains a number +-- of report aggregations. A single client report might be aggregated in multiple aggregation jobs & +-- therefore have multiple associated report aggregations. +CREATE TABLE report_aggregations( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + aggregation_job_id BIGINT NOT NULL, -- the aggregation job ID this report aggregation is associated with + client_report_id BIGINT NOT NULL, -- the client report ID this report aggregation is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of client reports in the aggregation job + state REPORT_AGGREGATION_STATE NOT NULL, -- the current state of this report aggregation + prep_state BYTEA, -- the current preparation state (opaque VDAF message, only if in state WAITING) + prep_msg BYTEA, -- the next preparation message to be sent to the helper (opaque VDAF message, only if in state WAITING if this aggregator is the leader) + out_share BYTEA, -- the output share (opaque VDAF message, only if in state FINISHED) + error_code SMALLINT, -- error code corresponding to a DAP ReportShareError value; null if in a state other than FAILED + + CONSTRAINT unique_ord UNIQUE(aggregation_job_id, ord), + CONSTRAINT fk_aggregation_job_id FOREIGN KEY(aggregation_job_id) REFERENCES aggregation_jobs(id), + CONSTRAINT fk_client_report_id FOREIGN KEY(client_report_id) REFERENCES client_reports(id) +); +CREATE INDEX report_aggregations_aggregation_job_id_index ON report_aggregations(aggregation_job_id); +CREATE INDEX report_aggregations_client_report_id_index ON report_aggregations(client_report_id); + +-- Information on aggregation for a single batch. This information may be incremental if the VDAF +-- supports incremental aggregation. +CREATE TABLE batch_aggregations( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- the task ID + batch_identifier BYTEA NOT NULL, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector) + aggregation_param BYTEA NOT NULL, -- the aggregation parameter (opaque VDAF message) + aggregate_share BYTEA NOT NULL, -- the (possibly-incremental) aggregate share + report_count BIGINT NOT NULL, -- the (possibly-incremental) client report count + checksum BYTEA NOT NULL, -- the (possibly-incremental) checksum + + CONSTRAINT unique_task_id_interval_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); + +-- Specifies the possible state of a collect job. +CREATE TYPE COLLECT_JOB_STATE AS ENUM( + 'START', -- the aggregator is waiting to run this collect job + 'FINISHED', -- this collect job has run successfully and is ready for collection + 'ABANDONED', -- this collect job has been abandoned & will never be run again + 'DELETED' -- this collect job has been deleted +); + +-- The leader's view of collect requests from the Collector. +CREATE TABLE collect_jobs( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + collect_job_id UUID NOT NULL, -- UUID used by collector to refer to this job + task_id BIGINT NOT NULL, -- the task ID being collected + batch_identifier BYTEA NOT NULL, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector) + batch_interval TSRANGE, -- batch interval, as a TSRANGE, populated only for time-interval tasks. (will always match batch_identifier) + aggregation_param BYTEA NOT NULL, -- the aggregation parameter (opaque VDAF message) + state COLLECT_JOB_STATE NOT NULL, -- the current state of this collect job + report_count BIGINT, -- the number of reports included in this collect job (only if in state FINISHED) + helper_aggregate_share BYTEA, -- the helper's encrypted aggregate share (HpkeCiphertext, only if in state FINISHED) + leader_aggregate_share BYTEA, -- the leader's unencrypted aggregate share (opaque VDAF message, only if in state FINISHED) + + lease_expiry TIMESTAMP NOT NULL DEFAULT TIMESTAMP '-infinity', -- when lease on this collect job expires; -infinity implies no current lease + lease_token BYTEA, -- a value identifying the current leaseholder; NULL implies no current lease + lease_attempts BIGINT NOT NULL DEFAULT 0, -- the number of lease acquiries since the last successful lease release + + CONSTRAINT unique_collect_job_task_id_interval_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +-- TODO(#224): verify that this index is optimal for purposes of acquiring collect jobs. +CREATE INDEX collect_jobs_lease_expiry ON collect_jobs(lease_expiry); +CREATE INDEX collect_jobs_interval_containment_index ON collect_jobs USING gist (task_id, batch_interval); + +-- The helper's view of aggregate share jobs. +CREATE TABLE aggregate_share_jobs( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- the task ID being collected + batch_identifier BYTEA NOT NULL, -- encoded query-type-specific batch identifier (corresponds to identifier in BatchSelector) + batch_interval TSRANGE, -- batch interval, as a TSRANGE, populated only for time-interval tasks. (will always match batch_identifier) + aggregation_param BYTEA NOT NULL, -- the aggregation parameter (opaque VDAF message) + helper_aggregate_share BYTEA NOT NULL, -- the helper's unencrypted aggregate share + report_count BIGINT NOT NULL, -- the count of reports included helper_aggregate_share + checksum BYTEA NOT NULL, -- the checksum over the reports included in helper_aggregate_share + + CONSTRAINT unique_aggregate_share_job_task_id_interval_aggregation_param UNIQUE(task_id, batch_identifier, aggregation_param), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); +CREATE INDEX aggregate_share_jobs_interval_containment_index ON aggregate_share_jobs USING gist (task_id, batch_interval); + +-- The leader's view of outstanding batches, which are batches which have not yet started +-- collection. Used for fixed-size tasks only. +CREATE TABLE outstanding_batches( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + task_id BIGINT NOT NULL, -- the task ID containing the batch + batch_id BYTEA NOT NULL, -- 32-byte BatchID as defined by the DAP specification. + + CONSTRAINT unique_task_id_batch_id UNIQUE(task_id, batch_id), + CONSTRAINT fk_task_id FOREIGN KEY(task_id) REFERENCES tasks(id) +); diff --git a/interop_binaries/Cargo.toml b/interop_binaries/Cargo.toml index cfbacc7b8..5b2d5b514 100644 --- a/interop_binaries/Cargo.toml +++ b/interop_binaries/Cargo.toml @@ -42,6 +42,7 @@ reqwest = { version = "0.11.16", default-features = false, features = ["rustls-t ring = "0.16.20" serde = { version = "1.0.159", features = ["derive"] } serde_json = "1.0.95" +sqlx = { version = "0.6.3", features = ["runtime-tokio-rustls", "migrate", "postgres"] } testcontainers = { version = "0.14" } tokio = { version = "1.27", features = ["full", "tracing"] } tracing = "0.1.37" diff --git a/interop_binaries/build.rs b/interop_binaries/build.rs index 37e0991d9..4cfc179b4 100644 --- a/interop_binaries/build.rs +++ b/interop_binaries/build.rs @@ -36,7 +36,7 @@ fn main() { println!("cargo:rerun-if-changed=../aggregator"); println!("cargo:rerun-if-changed=../client"); println!("cargo:rerun-if-changed=../core"); - println!("cargo:rerun-if-changed=../db/schema.sql"); + println!("cargo:rerun-if-changed=../db"); println!("cargo:rerun-if-changed=../integration_tests"); println!("cargo:rerun-if-changed=../interop_binaries"); diff --git a/interop_binaries/config/janus_cli.yaml b/interop_binaries/config/janus_cli.yaml deleted file mode 100644 index 10a655bb1..000000000 --- a/interop_binaries/config/janus_cli.yaml +++ /dev/null @@ -1,4 +0,0 @@ -database: - url: postgres://postgres@127.0.0.1:5432/postgres -logging_config: - force_json_output: true diff --git a/interop_binaries/config/janus_interop_aggregator.yaml b/interop_binaries/config/janus_interop_aggregator.yaml index 7ff5cb9c5..dbaf77776 100644 --- a/interop_binaries/config/janus_interop_aggregator.yaml +++ b/interop_binaries/config/janus_interop_aggregator.yaml @@ -4,3 +4,4 @@ health_check_listen_address: 0.0.0.0:8000 logging_config: force_json_output: true listen_address: 0.0.0.0:8080 +sql_migrations_source: /etc/janus/migrations diff --git a/interop_binaries/setup.sh b/interop_binaries/setup.sh index 188753bfe..77fe0575b 100755 --- a/interop_binaries/setup.sh +++ b/interop_binaries/setup.sh @@ -1,5 +1,4 @@ #!/bin/bash -/usr/local/bin/janus_cli write-schema --config-file /etc/janus/janus_cli.yaml /usr/bin/supervisorctl -c /etc/janus/supervisord.conf start janus_interop_aggregator /usr/bin/supervisorctl -c /etc/janus/supervisord.conf start aggregation_job_creator /usr/bin/supervisorctl -c /etc/janus/supervisord.conf start aggregation_job_driver diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index 74d5b8fa2..bd899b833 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -17,9 +17,11 @@ use janus_interop_binaries::{ use janus_messages::{BatchId, Duration, HpkeConfig, TaskId, Time}; use prio::codec::Decode; use serde::{Deserialize, Serialize}; +use sqlx::{migrate::Migrator, Connection, PgConnection}; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, + path::PathBuf, sync::Arc, }; use tokio::sync::Mutex; @@ -316,6 +318,9 @@ struct Config { /// Path prefix, e.g. `/dap/`, to serve DAP from. #[serde(default = "default_dap_serving_prefix")] dap_serving_prefix: String, + + /// Path at which `sqlx` migration files can be found. Migrations will be applied at startup. + sql_migrations_source: PathBuf, } impl BinaryConfig for Config { @@ -333,6 +338,12 @@ async fn main() -> anyhow::Result<()> { janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { let datastore = Arc::new(ctx.datastore); + // Apply SQL migrations to database + let mut connection = + PgConnection::connect(ctx.config.common_config.database.url.as_str()).await?; + let migrator = Migrator::new(ctx.config.sql_migrations_source).await?; + migrator.run(&mut connection).await?; + // Run an HTTP server with both the DAP aggregator endpoints and the interoperation test // endpoints. let filter = make_filter(Arc::clone(&datastore), ctx.config.dap_serving_prefix)?;