diff --git a/.github/workflows/build-image.yml b/.github/workflows/client-build.yml similarity index 79% rename from .github/workflows/build-image.yml rename to .github/workflows/client-build.yml index 728ca589..cc5fa273 100644 --- a/.github/workflows/build-image.yml +++ b/.github/workflows/client-build.yml @@ -1,5 +1,5 @@ # -name: Create and publish a Docker image +name: Create and publish client docker image on: workflow_dispatch: @@ -37,7 +37,7 @@ jobs: id: meta uses: docker/metadata-action@v5.5.1 with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-client # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. @@ -48,7 +48,7 @@ jobs: push: true target: production platforms: linux/arm64,linux/amd64 - tags: ${{ steps.meta.outputs.tags }}-angular-client # argos-angular-client + tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} # for caching # cache-from: type=gha,scope=global @@ -59,18 +59,4 @@ jobs: PROD=true BACKEND_URL=http://192.168.100.1:8000 MAP_ACCESS_TOKEN=pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw - - name: Build and push Docker image for scylla server - uses: docker/build-push-action@v5.4.0 - with: - context: ./scylla-server - push: true - target: production - platforms: linux/arm64,linux/amd64 - tags: ${{ steps.meta.outputs.tags }}-scylla-server # argos-scylla-server - labels: ${{ steps.meta.outputs.labels }} - # for caching - # cache-from: type=gha - # cache-to: type=gha,mode=max - # https://github.com/docker/build-push-action/issues/820 - provenance: false diff --git a/.github/workflows/scylla-server-rust-build.yml b/.github/workflows/scylla-server-rust-build.yml new file mode 100644 index 00000000..42bdb18b --- /dev/null +++ b/.github/workflows/scylla-server-rust-build.yml @@ -0,0 +1,57 @@ +# +name: Create and publish scylla docker image + +on: + workflow_dispatch: + + + +# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. +jobs: + build-and-push-image: + runs-on: ubuntu-latest + # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. + permissions: + contents: read + packages: write + # + steps: + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3.3.0 + - name: Checkout repository + uses: actions/checkout@v4 + # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. + - name: Log in to the Container registry + uses: docker/login-action@v3.2.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5.5.1 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-scylla + # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. + # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. + # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. + - name: Build and push Docker image for scylla server rust + uses: docker/build-push-action@v5.4.0 + with: + context: ./scylla-server-rust + push: true + platforms: linux/arm64,linux/amd64 + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + # for caching + # cache-from: type=gha + # cache-to: type=gha,mode=max + # https://github.com/docker/build-push-action/issues/820 + provenance: false + diff --git a/.gitignore b/.gitignore index 65cc1e4f..0b475aa1 100644 --- a/.gitignore +++ b/.gitignore @@ -95,4 +95,7 @@ fastlane/test_output # After new code Injection tools there's a generated folder /iOSInjectionProject # https://github.com/johnno1962/injectionforxcode -iOSInjectionProject/ \ No newline at end of file +iOSInjectionProject/ + +# user compose override +compose.override.yml diff --git a/README.md b/README.md index f5996cfa..759253c5 100644 --- a/README.md +++ b/README.md @@ -17,34 +17,51 @@ Once you've sucessfully setup Scylla and the Client, you can either run them sep ## Production -### Running the Project in Prod Mode - -There is a `docker-compose-dev.yml` file for a dev which varies from the router deployment: +The base docker compose (`compose.yml`) contains some important features to note. However, it is useless standalone. Please read the profile customization selection below before using the base compose. - It matches the number of CPUs as the router to roughly simulate router CPU (your CPU is still faster) -- You must build it locally first! -- It does not persist the database between `down` commands +- It persists the database between `down` commands via a volume called `argos_db-data`. Delete it with `docker volume rm argos_db-data` to start with a new database next `up`. +- It weighs the CPU usage of siren higher, so it is prioritized in CPU starvation scenarios. +- It limits memory according to the capacity of the router. + + +### Customizing runtime profiles of the project via docker compose + +This project uses docker compose overrides to secify configurations. Therefore there are multiple "profiles" to choose from when running in production, and there are some profiles for development testing. Also, there are fragment files for siren and client in `siren-base` and `angular-client` respectively, as they are services only used in certain cases. These profiles are specified via the command line on top of the base `compose.yml` file as follows. + +``` +docker compose -f compose.yml -f +``` + +Additionally if you need to create your own options, you can create a `compose.override.yml` file in this directory and specify what settings should be changed, which is ignored by git. If you think the override would become useful, document it here and name it `compose..yml`. Here is the current list of overrides, designed so only one is used at any given time: + +- `scylla-dev`*: Testing the client and interactions of scylla (scylla prod, siren local, client pt local) +- `client-dev`*: Testing the client development using the scylla mock data mode (scylla mock, client pt local) +- `router`: For production deployment to the base gateway node (scylla prod, siren local, client pt 192.168.100.1) +- `tpu`: Production deployment to the TPU on the car (no client, no siren, scylla pt siren external) -Note that both compose files limit memory to the same amount. However, the disk I/O of the router is **much** slower than yours. +***Note that since client settings are changed via rebuild, overrides with a * must be rebuilt via `docker compose -f compose.yml -f compose..yml build client`. Further, a build should be done when reverting to profiles without stars. ** -This will build the docker images that will be run: +Examples: -`docker-compose -f ./docker-compose-dev.yml build` +Router deploy and send to background: `docker compose -f compose.yml -f compose.router.yml up -d` -This will start the containers and output all the outputs from both of them to the terminal. If the container is not already an image through docker-compose build, it will attempt to pull the images from docker hub. +### Build and deploy -`docker-compose up` +Using the above profiles, one can `build` the app. Then, with correct permissions, you can `push` the app then `pull` it elsewhere for usage. Note that you must `push` and `pull` on the same architecture, so you cannot use for example a dell laptop to build for the router! To get `push` permissions, create a PAT [here](https://github.com/settings/tokens/new?scopes=write:packages) and copy the token into this command: -If changes are made to either the client or scylla you will need to rebuild and push to docker hub then pull on the router. Contact the current Argos lead or Chief Software Engineer to get access to the docker hub. +``` +sudo docker login ghcr.io -u -p +``` -### Running on the Openwrt router +Now you can update the image on a remote server. Note to save time you can just specify which service to upload, like `scylla-server-rust` or `client`. +``` +sudo docker compose -f compose.yml -f compose.router.yml build && sudo docker compose -f compose.yml -f compose.router.yml push +``` -The `docker-compose.yml` file is made for the router. When you push a commit it automatically gets built for the router in 20-30 minutes. -To use a non-standard branch edit the docker-compose.yml file to the name of the tag specified by the name [here](https://github.com/Northeastern-Electric-Racing/Argos/pkgs/container/argos). -Then do `docker compose down` and `docker compose pull` and `docker compose up -d`. -**The database is stored in a volume called `argos_db-data`, delete the volume to start the database fresh!** +## Codegen Protobuf Types (client only) -## Codegen Protobuf Types +Server protobuf generation is automatic. See below for client protobuf generation. ##### Mac diff --git a/angular-client/compose.client.yml b/angular-client/compose.client.yml new file mode 100644 index 00000000..472f12e5 --- /dev/null +++ b/angular-client/compose.client.yml @@ -0,0 +1,17 @@ +services: + client: + container_name: client + restart: unless-stopped + image: ghcr.io/northeastern-electric-racing/argos-client:rust-rewrite-socket + build: + context: . + args: + PROD: "true" + BACKEND_URL: http://192.168.100.1:8000 + MAP_ACCESS_TOKEN: pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw + target: production + dockerfile: Dockerfile + ports: + - 80:80 + cpu_shares: 512 + mem_limit: 1gb diff --git a/compose.client-dev.yml b/compose.client-dev.yml new file mode 100644 index 00000000..0030c37c --- /dev/null +++ b/compose.client-dev.yml @@ -0,0 +1,14 @@ +services: + scylla-server-rust: + environment: + - PROD_SIREN_HOST_URL=siren:1883 + - PROD_SCYLLA=false + + client: + extends: + file: ./angular-client/compose.client.yml + service: client + build: + context: ./angular-client + args: + BACKEND_URL: http://localhost:8000 diff --git a/compose.router.yml b/compose.router.yml new file mode 100644 index 00000000..3061c642 --- /dev/null +++ b/compose.router.yml @@ -0,0 +1,16 @@ +services: + scylla-server-rust: + depends_on: + - siren + environment: + - PROD_SIREN_HOST_URL=siren:1883 + + client: + extends: + file: ./angular-client/compose.client.yml + service: client + + siren: + extends: + file: ./siren-base/compose.siren.yml + service: siren diff --git a/compose.scylla-dev.yml b/compose.scylla-dev.yml new file mode 100644 index 00000000..1392f26a --- /dev/null +++ b/compose.scylla-dev.yml @@ -0,0 +1,21 @@ + services: + scylla-server-rust: + depends_on: + - siren + environment: + - PROD_SIREN_HOST_URL=siren:1883 + - RUST_LOG=none,scylla_server_rust=DEBUG + + client: + extends: + file: ./angular-client/compose.client.yml + service: client + build: + context: ./angular-client + args: + BACKEND_URL: http://localhost:8000 + + siren: + extends: + file: ./siren-base/compose.siren.yml + service: siren diff --git a/compose.tpu.yml b/compose.tpu.yml new file mode 100644 index 00000000..d94fe873 --- /dev/null +++ b/compose.tpu.yml @@ -0,0 +1,6 @@ +services: + scylla-server-rust: + environment: + - PROD_SIREN_HOST_URL=host.docker.internal:1883 + extra_hosts: + - "host.docker.internal:host-gateway" # for external siren diff --git a/compose.yml b/compose.yml new file mode 100644 index 00000000..49665376 --- /dev/null +++ b/compose.yml @@ -0,0 +1,46 @@ +services: + odyssey-timescale: + container_name: odyssey-timescale + image: timescale/timescaledb:2.13.1-pg15 + restart: unless-stopped + environment: + POSTGRES_HOST_AUTH_METHOD: trust + ports: # needs to be published for charybdis, which runs outside of docker + - 5432:5432 + expose: + - 5432 + volumes: + - db-data:/var/lib/postgresql/data + cpu_shares: 1024 + mem_limit: 3gb + stop_grace_period: 1m + + scylla-server-rust: + container_name: scylla-server-rust + image: ghcr.io/northeastern-electric-racing/argos-scylla:rust-rewrite-socket + build: + context: ./scylla-server-rust + restart: unless-stopped + ports: + - 8000:8000 + depends_on: + - odyssey-timescale + environment: + - SOURCE_DATABASE_URL=postgresql://postgres:password@odyssey-timescale:5432/timescaledb + # - PROD_SIREN_HOST_URL=siren:1883 + - PROD_SCYLLA=true + - RUST_LOG=none,scylla_server_rust=INFO # default log setting for docker + cpu_shares: 1024 + mem_limit: 2gb + stop_grace_period: 10s + stop_signal: SIGINT + init: true + entrypoint: ["./docker_run.sh"] + + + +volumes: + db-data: + + + diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml deleted file mode 100644 index 03f8f3ba..00000000 --- a/docker-compose-dev.yml +++ /dev/null @@ -1,79 +0,0 @@ -services: - odyssey-timescale: - container_name: odyssey-timescale - image: timescale/timescaledb:2.13.1-pg15 - restart: unless-stopped - environment: - POSTGRES_HOST_AUTH_METHOD: trust - ports: # needs to be published for charybdis, which runs outside of docker - - 5432:5432 - expose: - - 5432 - cpu_shares: 1024 - mem_limit: 3gb - cpuset: 0-3 - stop_grace_period: 1m - - scylla-server: - container_name: scylla-server - restart: unless-stopped - # image: ghcr.io/northeastern-electric-racing/argos:ghcr-action-scylla-server - build: - context: ./scylla-server - target: production - dockerfile: Dockerfile - ports: - - 8000:8000 - depends_on: - - odyssey-timescale - - siren - environment: - - SOURCE_DATABASE_URL=postgresql://postgres:password@odyssey-timescale:5432/timescaledb - - PROD_SIREN_HOST_URL=siren - - PROD=true - # extra_hosts: - # - "host.docker.internal:host-gateway" # for external siren - cpu_shares: 1024 - cpuset: 0-3 - mem_limit: 2gb - stop_grace_period: 10s - - - client: - container_name: client - restart: unless-stopped - # image: ghcr.io/northeastern-electric-racing/argos:ghcr-action-angular-client - build: - context: ./angular-client - args: - PROD: "true" - BACKEND_URL: http://localhost:8000 - MAP_ACCESS_TOKEN: pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw - target: production - dockerfile: Dockerfile - ports: - - 80:80 - depends_on: - - scylla-server - cpu_shares: 512 - cpuset: 0-3 - mem_limit: 1gb - - siren: - container_name: siren - restart: unless-stopped - image: eclipse-mosquitto:latest - ports: - - 1883:1883 - - 9001:9001 # why? - expose: - - 1883 - volumes: - - ./siren-base/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf - cpu_shares: 2048 - cpuset: 0-3 - mem_limit: 2gb - oom_kill_disable: true - - - diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 85ed3d71..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,83 +0,0 @@ -version: "3.8" - -services: - odyssey-timescale: - container_name: odyssey-timescale - image: timescale/timescaledb:2.13.1-pg15 - restart: unless-stopped - environment: - POSTGRES_HOST_AUTH_METHOD: trust - ports: # needs to be published for charybdis, which runs outside of docker - - 5432:5432 - expose: - - 5432 - volumes: - - db-data:/var/lib/postgresql/data - cpu_shares: 1024 - mem_limit: 3gb - stop_grace_period: 1m - - scylla-server: - container_name: scylla-server - restart: unless-stopped - image: ghcr.io/northeastern-electric-racing/argos:Develop-scylla-server - build: - context: ./scylla-server - target: production - dockerfile: Dockerfile - ports: - - 8000:8000 - depends_on: - - odyssey-timescale - - siren - environment: - - SOURCE_DATABASE_URL=postgresql://postgres:password@odyssey-timescale:5432/timescaledb - - PROD_SIREN_HOST_URL=siren - - PROD=true - # extra_hosts: - # - "host.docker.internal:host-gateway" # for external siren - cpu_shares: 1024 - mem_limit: 2gb - stop_grace_period: 10s - - - client: - container_name: client - restart: unless-stopped - image: ghcr.io/northeastern-electric-racing/argos:Develop-angular-client - build: - context: ./angular-client - args: - PROD: "true" - BACKEND_URL: http://192.168.100.1:8000 - MAP_ACCESS_TOKEN: pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw - target: production - dockerfile: Dockerfile - ports: - - 80:80 - depends_on: - - scylla-server - cpu_shares: 512 - mem_limit: 1gb - - siren: - container_name: siren - restart: unless-stopped - image: eclipse-mosquitto:latest - ports: - - 1883:1883 - - 9001:9001 # why? - expose: - - 1883 - volumes: - - ./siren-base/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf - cpu_shares: 2048 - mem_limit: 2gb - oom_kill_disable: true - - -volumes: - db-data: - - - diff --git a/scylla-server-rust/.cargo/config.toml b/scylla-server-rust/.cargo/config.toml index eea34c5d..09ffa116 100644 --- a/scylla-server-rust/.cargo/config.toml +++ b/scylla-server-rust/.cargo/config.toml @@ -1,2 +1,3 @@ [alias] -prisma = "run --package prisma-cli --" \ No newline at end of file +prisma = "run --release --package prisma-cli --" +prisma-seed = "run --release --bin seed" \ No newline at end of file diff --git a/scylla-server-rust/.dockerignore b/scylla-server-rust/.dockerignore new file mode 100644 index 00000000..690dff8c --- /dev/null +++ b/scylla-server-rust/.dockerignore @@ -0,0 +1,31 @@ +# editor things +.idea/ +.zed/ +.vscode/ + +# misc things +.DS_Store +.gitignore +*.nix + +# python things +pyrightconfig.json +__pycache__/ + +# Added by cargo (rust things) +/target +build/ +dist/ +logs/ + +# prisma +prisma.rs + +# protobuf +serverdata.rs + +target +Dockerfile +.dockerignore +.git +.gitignore diff --git a/scylla-server-rust/.gitignore b/scylla-server-rust/.gitignore new file mode 100644 index 00000000..11d3f83a --- /dev/null +++ b/scylla-server-rust/.gitignore @@ -0,0 +1,24 @@ +# editor things +.idea/ +.zed/ +.vscode/ + +# misc things +.DS_Store +*.nix + +# python things +pyrightconfig.json +__pycache__/ + +# Added by cargo (rust things) +/target +build/ +dist/ +logs/ + +# prisma +prisma.rs + +# protobuf +serverdata.rs diff --git a/scylla-server-rust/Cargo.lock b/scylla-server-rust/Cargo.lock index c2936845..22090201 100755 --- a/scylla-server-rust/Cargo.lock +++ b/scylla-server-rust/Cargo.lock @@ -28,6 +28,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -46,6 +58,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -73,6 +91,28 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.67", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -99,6 +139,34 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core 0.3.4", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 0.1.2", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.7.5" @@ -106,7 +174,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.3", "bytes", "futures-util", "http 1.1.0", @@ -115,7 +183,7 @@ dependencies = [ "hyper 1.4.0", "hyper-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -133,6 +201,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.4.3" @@ -203,6 +288,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bigdecimal" version = "0.3.1" @@ -283,6 +374,9 @@ name = "bytes" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -344,6 +438,44 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "console-api" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "convert_case" version = "0.5.0" @@ -469,6 +601,12 @@ dependencies = [ "sha3", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "datamodel-renderer" version = "0.1.0" @@ -590,6 +728,33 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +[[package]] +name = "engineioxide" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33b9cfc311d0ac3237b8177d2ee8962aa5bb4cfa22faf284356f2ebaf9d698f0" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.0", + "hyper-util", + "pin-project-lite", + "rand 0.8.5", + "serde", + "serde_json", + "smallvec", + "thiserror", + "tokio", + "tokio-tungstenite", + "tower", + "tracing", +] + [[package]] name = "enumflags2" version = "0.7.10" @@ -666,6 +831,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.0.30" @@ -676,6 +847,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -880,7 +1062,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" dependencies = [ - "ahash", + "ahash 0.7.8", ] [[package]] @@ -889,7 +1071,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.8", ] [[package]] @@ -897,6 +1079,23 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", +] + +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] [[package]] name = "heck" @@ -1016,6 +1215,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.29" @@ -1059,6 +1264,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.29", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1351,12 +1568,27 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d3c2fcf089c060eb333302d80c5f3ffa8297abecf220f788e4a09ef85f59420" + [[package]] name = "md-5" version = "0.10.6" @@ -1379,7 +1611,7 @@ version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e52eb6380b6d2a10eb3434aec0885374490f5b82c8aaf5cd487a183c98be834" dependencies = [ - "ahash", + "ahash 0.7.8", "metrics-macros", ] @@ -1389,7 +1621,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "142c53885123b68d94108295a09d4afe1a1388ed95b54d5dacd9a454753030f2" dependencies = [ - "ahash", + "ahash 0.7.8", "metrics-macros", ] @@ -1906,7 +2138,7 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" dependencies = [ - "fixedbitset", + "fixedbitset 0.1.9", "ordermap", ] @@ -2148,6 +2380,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.67", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "3.5.0" @@ -2535,8 +2799,17 @@ checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" dependencies = [ "aho-corasick 1.1.3", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2547,9 +2820,15 @@ checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" dependencies = [ "aho-corasick 1.1.3", "memchr", - "regex-syntax", + "regex-syntax 0.8.4", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.4" @@ -2611,7 +2890,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2627,6 +2906,47 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.15", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + +[[package]] +name = "ringbuffer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6368f71f205ff9c33c076d170dd56ebf68e8161c733c0caa07a7a5509ed53" + +[[package]] +name = "rumqttc" +version = "0.24.0" +source = "git+https://github.com/bytebeamio/rumqtt?branch=main#db1f261dd5cd6c69bbfd1058ba69ea8ef5f4fc38" +dependencies = [ + "bytes", + "fixedbitset 0.5.7", + "flume", + "futures-util", + "log", + "rustls-native-certs", + "rustls-pemfile 2.1.2", + "rustls-webpki", + "thiserror", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2646,6 +2966,33 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2655,6 +3002,33 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + +[[package]] +name = "rustls-webpki" +version = "0.102.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -2719,14 +3093,22 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" name = "scylla-server-rust" version = "0.0.1" dependencies = [ - "axum", + "axum 0.7.5", + "console-subscriber", "prisma-client-rust", "protobuf", "protobuf-codegen", + "rand 0.8.5", + "ringbuffer", + "rumqttc", "serde", + "socketioxide", "tokio", + "tokio-util", "tower", "tower-http", + "tracing", + "tracing-subscriber", ] [[package]] @@ -2827,6 +3209,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.9.9" @@ -2926,6 +3319,39 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socketioxide" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23f50a295325631d230022f1562fde3d1351edf4d8eac73265f657cc762f655c" +dependencies = [ + "bytes", + "engineioxide", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.4.0", + "itoa", + "matchit 0.8.3", + "pin-project-lite", + "serde", + "serde_json", + "thiserror", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "sql-ddl" version = "0.1.0" @@ -3211,9 +3637,20 @@ dependencies = [ "signal-hook-registry", "socket2 0.5.7", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.3.0" @@ -3267,6 +3704,40 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -3275,8 +3746,12 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", + "slab", "tokio", ] @@ -3289,6 +3764,33 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.6.20", + "base64 0.21.7", + "bytes", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3297,9 +3799,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3319,6 +3825,7 @@ dependencies = [ "pin-project-lite", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -3428,10 +3935,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log 0.2.0", ] @@ -3442,6 +3953,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -3508,6 +4037,12 @@ dependencies = [ "void", ] +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.2" @@ -3544,6 +4079,12 @@ dependencies = [ "user-facing-error-macros", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "1.8.0" @@ -3872,3 +4413,29 @@ dependencies = [ "cfg-if", "windows-sys 0.48.0", ] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.67", +] + +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" diff --git a/scylla-server-rust/Cargo.toml b/scylla-server-rust/Cargo.toml index dbda33fb..e7532679 100644 --- a/scylla-server-rust/Cargo.toml +++ b/scylla-server-rust/Cargo.toml @@ -2,16 +2,28 @@ name = "scylla-server-rust" version = "0.0.1" edition = "2021" +default-run = "scylla-server-rust" [dependencies] prisma-client-rust = {git = "https://github.com/Brendonovich/prisma-client-rust", tag = "0.6.11", default-features = false,features = ["postgresql", "migrations"] } serde = "1.0.203" protobuf-codegen = "3.3.0" protobuf = "3.3.0" -tokio = { version = "1.38.0", features = ["full"] } +tokio = { version = "1.38.0", features = ["full", "tracing"] } axum = "0.7.5" tower = { version = "0.4.13", features = ["timeout"] } -tower-http = { version = "0.5.2", features = ["cors"] } +tower-http = { version = "0.5.2", features = ["cors", "trace"] } +socketioxide = { version = "0.14.0", features = ["tracing"] } +rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "main"} +tokio-util = { version= "0.7.11", features = ["full"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] } +rand = "0.8.5" +console-subscriber = { version = "0.3.0", optional = true } +ringbuffer = "0.15.0" + +[features] +top = ["dep:console-subscriber"] [workspace] members = ["prisma-cli"] @@ -23,3 +35,12 @@ protobuf-codegen = "3.3.0" lto = true codegen-units = 1 panic = "abort" +strip = true # Automatically strip symbols from the binary. + +[[bin]] +name = "scylla-server-rust" +path = "src/main.rs" + +[[bin]] +name = "seed" +path = "prisma/seed.rs" \ No newline at end of file diff --git a/scylla-server-rust/Dockerfile b/scylla-server-rust/Dockerfile index 2f03ccd9..19c57654 100755 --- a/scylla-server-rust/Dockerfile +++ b/scylla-server-rust/Dockerfile @@ -2,3 +2,7 @@ FROM rust:latest WORKDIR /usr/src/myapp +COPY . . +RUN cargo prisma generate +RUN cargo build --release --locked +ENTRYPOINT ["./docker_run.sh"] diff --git a/scylla-server-rust/README.md b/scylla-server-rust/README.md index e39a5082..7bee2651 100644 --- a/scylla-server-rust/README.md +++ b/scylla-server-rust/README.md @@ -11,7 +11,7 @@ cargo prisma generate ``` # in argos proper -docker compose run -P odyssey-timescale +docker compose up odyssey-timescale ``` ``` @@ -21,10 +21,60 @@ SOURCE_DATABASE_URL=postgresql://postgres:password@127.0.0.1:5432/timescaledb ca ### Test this app +#### Seed some data + +Run `cargo prisma-seed` + + +#### Integration tests + Since this app uses the database for testing, you must follow these steps, or run `./integration_test.sh`: ``` docker volume rm argos_db-data -docker compose run -Pd odyssey-timescale +docker compose up odyssey-timescale cargo prisma migrate deploy SOURCE_DATABASE_URL=postgresql://postgres:password@127.0.0.1:5432/timescaledb cargo test -- --test-threads=1 -``` \ No newline at end of file +``` + +#### Test it yourself! + +You can send your own messages to the app running in production mode, and test how the client, database, etc. reacts! + +Follow this confluence doc: https://nerdocs.atlassian.net/wiki/spaces/NER/pages/473727054/How+to+run+data+through+Argos+without+the+car + +#### View threads and resources + +1. Build or run as: `RUSTFLAGS="--cfg tokio_unstable" cargo run --features top` +2. Install tokio console: ex `cargo install --locked tokio-console` +3. Run app and `tokio-console` + +#### Debug logging + +#### Activate logs +Modify the RUST_LOG env variable. Usually you dont want third party crate logs, so `RUST_LOG=none,scylla_server_rust=trace`. You can replace both none and trace with the levels you want. The levels are: +- none: not a darn thing +- trace: very verbose, could print on every message, which would flood the log especially if run on a server receiving millions of the cars messages +- debug: helpful info not constantly printed in high load situations, good for periodic task info or init/end status checks +- info: user-facing info that is optional, usually to notify of a setting change or whatnot +- warn: info the user should take note of as an error may have occured +- error: a critical byt survivable issue in the application + +#### Develop with logs + +When developing, take advantage of easy logging. Use `debug!`, `trace!` etc macros. with anything you may need, but be sure to abide by the conventions above when making a final PR. + +Have an async function that takes time and is somewhat important for performance? Use tracing::instrument macro on the function definition to capture performance data. + + + +### Deploy this app + +See main README. + + +#### Env variables + +- `SOURCE_DATABASE_URL` The timescale URL +- `PROD_SCYLLA` false=use mock instead of production (mqtt) as source of data +- `RUST_LOG=none,scylla_server_rust` levels of logging for this create, see above +- `PROD_SIREN_HOST_URL` URL:Port of the MQTT server, using when `PROD_SCYLLA=/false` diff --git a/scylla-server-rust/docker_run.sh b/scylla-server-rust/docker_run.sh new file mode 100755 index 00000000..b84d990b --- /dev/null +++ b/scylla-server-rust/docker_run.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +cargo prisma migrate deploy && exec /usr/src/myapp/target/release/scylla-server-rust \ No newline at end of file diff --git a/scylla-server-rust/prisma/seed.rs b/scylla-server-rust/prisma/seed.rs new file mode 100644 index 00000000..d8af8827 --- /dev/null +++ b/scylla-server-rust/prisma/seed.rs @@ -0,0 +1,229 @@ +use std::{sync::Arc, time::Duration}; + +use prisma_client_rust::{chrono, QueryError}; +use scylla_server_rust::{ + prisma::PrismaClient, + processors::ClientData, + services::{ + data_service, data_type_service, driver_service, location_service, node_service, + run_service, system_service, + }, + Database, +}; + +#[tokio::main] +async fn main() -> Result<(), QueryError> { + println!("Connecting and seeding!"); + let client: Database = Arc::new( + PrismaClient::_builder() + .build() + .await + .expect("Could not build prisma DB"), + ); + + client.data().delete_many(vec![]).exec().await?; + + client.data_type().delete_many(vec![]).exec().await?; + + client.driver().delete_many(vec![]).exec().await?; + + client.location().delete_many(vec![]).exec().await?; + + client.node().delete_many(vec![]).exec().await?; + + client.run().delete_many(vec![]).exec().await?; + + client.system().delete_many(vec![]).exec().await?; + + let created_run = + run_service::create_run(&client, chrono::offset::Utc::now().timestamp_millis()).await?; + + system_service::upsert_system(&client, "Data And Controls".to_string(), created_run.id).await?; + driver_service::upsert_driver(&client, "Fergus".to_string(), created_run.id).await?; + location_service::upsert_location( + &client, + "Gainsborough".to_string(), + 1.0, + 1.0, + 1.0, + created_run.id, + ) + .await?; + + node_service::upsert_node(&client, "BMS".to_string()).await?; + node_service::upsert_node(&client, "MPU".to_string()).await?; + + data_type_service::upsert_data_type( + &client, + "Pack-Temp".to_string(), + "C".to_string(), + "BMS".to_string(), + ) + .await?; + data_service::add_many( + &client, + vec![ + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["20".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis(), + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["21".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 1000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["22".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 2000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["17".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 3000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["25".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 4000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["30".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 5000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["38".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 6000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["32".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 7000, + node: "BMS".to_string(), + }, + ClientData { + run_id: created_run.id, + name: "Pack-Temp".to_string(), + unit: "C".to_string(), + values: vec!["26".to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis() + 8000, + node: "BMS".to_string(), + }, + ], + ) + .await?; + + data_type_service::upsert_data_type( + &client, + "Pack-Voltage".to_string(), + "V".to_string(), + "BMS".to_string(), + ) + .await?; + data_type_service::upsert_data_type( + &client, + "Pack-SOC".to_string(), + "%".to_string(), + "BMS".to_string(), + ) + .await?; + data_type_service::upsert_data_type( + &client, + "Pack-Current".to_string(), + "A".to_string(), + "BMS".to_string(), + ) + .await?; + data_type_service::upsert_data_type( + &client, + "Sense-Accel".to_string(), + "G".to_string(), + "MPU".to_string(), + ) + .await?; + data_type_service::upsert_data_type( + &client, + "Sense-Temperature".to_string(), + "C".to_string(), + "MPU".to_string(), + ) + .await?; + data_type_service::upsert_data_type( + &client, + "State-Speed".to_string(), + "mph".to_string(), + "MPU".to_string(), + ) + .await?; + + node_service::upsert_node(&client, "TPU".to_string()).await?; + data_type_service::upsert_data_type( + &client, + "Points".to_string(), + "coords".to_string(), + "TPU".to_string(), + ) + .await?; + + simulate_route(client, created_run.id).await?; + + Ok(()) +} + +// lat,long +const NYC_COORDS: (f64, f64) = (40.7128, -74.006); +const LA_COORDS: (f64, f64) = (34.0522, -118.2437); +const STEP_NUM: u8 = 10; +async fn simulate_route(db: Database, curr_run: i32) -> Result<(), QueryError> { + let step_lat = (LA_COORDS.0 - NYC_COORDS.0) / STEP_NUM as f64; + let step_long = (LA_COORDS.1 - NYC_COORDS.1) / STEP_NUM as f64; + + for i in 0..STEP_NUM { + // clamp to [-90,90] + let inter_lat = (NYC_COORDS.0 + step_lat * i as f64).clamp(-90.0, 90.0); + let inter_long = NYC_COORDS.1 + step_long * i as f64; + + data_service::add_data( + &db, + ClientData { + run_id: curr_run, + name: "Points".to_string(), + unit: "Coord".to_string(), + values: vec![inter_lat.to_string(), inter_long.to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis(), + node: "TPU".to_string(), + }, + ) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + } + + Ok(()) +} diff --git a/scylla-server-rust/src/controllers/run_controller.rs b/scylla-server-rust/src/controllers/run_controller.rs index 4c26eac4..984f6db2 100644 --- a/scylla-server-rust/src/controllers/run_controller.rs +++ b/scylla-server-rust/src/controllers/run_controller.rs @@ -1,7 +1,10 @@ use axum::{ extract::{Path, State}, - Json, + Extension, Json, }; +use prisma_client_rust::chrono; +use tokio::sync::mpsc; +use tracing::warn; use crate::{ error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, Database, @@ -31,3 +34,18 @@ pub async fn get_run_by_id( Ok(Json::from(transformed_run_data)) } + +pub async fn new_run( + State(db): State, + Extension(channel): Extension>, +) -> Result, ScyllaError> { + let run_data = + run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()).await?; + + // notify the mqtt receiver a new run has been created + if let Err(err) = channel.send(run_data.clone()).await { + warn!("Could not notify system about an updated run: {}", err); + } + + Ok(Json::from(PublicRun::from(&run_data))) +} diff --git a/scylla-server-rust/src/error.rs b/scylla-server-rust/src/error.rs index 9c57b350..221b4f13 100644 --- a/scylla-server-rust/src/error.rs +++ b/scylla-server-rust/src/error.rs @@ -6,6 +6,7 @@ use prisma_client_rust::{ prisma_errors::query_engine::{RecordNotFound, UniqueKeyViolation}, QueryError, }; +use tracing::warn; pub enum ScyllaError { PrismaError(QueryError), @@ -14,6 +15,7 @@ pub enum ScyllaError { impl From for ScyllaError { fn from(error: QueryError) -> Self { + warn!("Query error: {:?}", error); match error { e if e.is_prisma_error::() => ScyllaError::NotFound, e => ScyllaError::PrismaError(e), diff --git a/scylla-server-rust/src/lib.rs b/scylla-server-rust/src/lib.rs index df6b8493..d0b47a4c 100644 --- a/scylla-server-rust/src/lib.rs +++ b/scylla-server-rust/src/lib.rs @@ -1,5 +1,6 @@ pub mod controllers; pub mod error; +pub mod processors; pub mod services; pub mod transformers; diff --git a/scylla-server-rust/src/main.rs b/scylla-server-rust/src/main.rs index 533d9f76..52acea93 100755 --- a/scylla-server-rust/src/main.rs +++ b/scylla-server-rust/src/main.rs @@ -1,50 +1,195 @@ use std::sync::Arc; -use axum::{http::Method, routing::get, Router}; +use axum::{ + http::Method, + routing::{get, post}, + Extension, Router, +}; +use prisma_client_rust::chrono; use scylla_server_rust::{ controllers::{ self, data_type_controller, driver_controller, location_controller, node_controller, run_controller, system_controller, }, prisma::PrismaClient, + processors::{ + db_handler, mock_processor::MockProcessor, mqtt_processor::MqttProcessor, ClientData, + }, + services::run_service::{self, public_run}, Database, }; -use tower_http::cors::{Any, CorsLayer}; +use socketioxide::{extract::SocketRef, SocketIo}; +use tokio::{signal, sync::mpsc}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tower::ServiceBuilder; +use tower_http::{ + cors::{Any, CorsLayer}, + trace::TraceLayer, +}; +use tracing::{debug, info, level_filters::LevelFilter}; +use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; #[tokio::main] async fn main() { - let client: Database = Arc::new(PrismaClient::_builder().build().await.unwrap()); + println!("Initializing scylla server..."); + + #[cfg(feature = "top")] + { + println!("Initializing tokio console subscriber"); + console_subscriber::init(); + } + + #[cfg(not(feature = "top"))] + { + println!("Initializing fmt subscriber"); + // construct a subscriber that prints formatted traces to stdout + // if RUST_LOG is not set, defaults to loglevel INFO + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_thread_ids(true) + .with_ansi(true) + .with_thread_names(true) + .with_span_events(FmtSpan::CLOSE) + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .finish(); + // use that subscriber to process traces emitted after this point + tracing::subscriber::set_global_default(subscriber).expect("Could not init tracing"); + } + + // create the database stuff + let db: Database = Arc::new( + PrismaClient::_builder() + .build() + .await + .expect("Could not build prisma DB"), + ); + + // create the socket stuff + let (socket_layer, io) = SocketIo::new_layer(); + io.ns("/", |s: SocketRef| { + s.on_disconnect(|_: SocketRef| debug!("Socket: Client disconnected from socket")) + }); + + // channel to pass the mqtt data + // TODO tune buffer size + let (mqtt_send, mqtt_receive) = mpsc::channel::(10000); + + // channel to pass the processed data to the db thread + // TODO tune buffer size + let (db_send, db_receive) = mpsc::channel::>(1000); + + // channel to update the run to a new value + let (new_run_send, new_run_receive) = mpsc::channel::(5); + + // the below two threads need to cancel cleanly to ensure all queued messages are sent. therefore they are part of the a task tracker group. + // create a task tracker and cancellation token + let task_tracker = TaskTracker::new(); + let token = CancellationToken::new(); + // spawn the database handler + task_tracker.spawn( + db_handler::DbHandler::new(mqtt_receive, Arc::clone(&db)) + .handling_loop(db_send, token.clone()), + ); + // spawn the database inserter + task_tracker.spawn(db_handler::DbHandler::batching_loop( + db_receive, + Arc::clone(&db), + token.clone(), + )); + + // if PROD_SCYLLA=false + if std::env::var("PROD_SCYLLA").is_ok_and(|f| f == "false") { + info!("Running processor in mock mode, no data will be stored"); + let recv = MockProcessor::new(io); + tokio::spawn(recv.generate_mock()); + } else { + // creates the initial run + let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()) + .await + .expect("Could not create initial run!"); + debug!("Configuring current run: {:?}", curr_run); + + // run prod if this isnt present + // create and spawn the mqtt processor + info!("Running processor in MQTT (production) mode"); + let recv = MqttProcessor::new( + mqtt_send, + new_run_receive, + std::env::var("PROD_SIREN_HOST_URL").unwrap_or("localhost:1883".to_string()), + curr_run.id, + io, + ) + .await; + tokio::spawn(recv.process_mqtt()); + } let app = Router::new() - // get all data with the name dataTypeName and runID as specified + // DATA ROUTES .route( "/data/:dataTypeName/:runId", get(controllers::data_controller::get_data), ) - // get all datatypes + // DATA TYPE ROUTES .route("/datatypes", get(data_type_controller::get_all_data_types)) - // get all drivers + // DRIVERS .route("/drivers", get(driver_controller::get_all_drivers)) - // get all locations + // LOCATIONS .route("/locations", get(location_controller::get_all_locations)) - // get all nodes + // NODES .route("/nodes", get(node_controller::get_all_nodes)) - // runs: - // get all runs + // RUNS .route("/runs", get(run_controller::get_all_runs)) - // get run with id .route("/runs/:id", get(run_controller::get_run_by_id)) - // get all systems + .route( + "/runs/new", + post(run_controller::new_run).layer(Extension(new_run_send)), + ) + // SYSTEMS .route("/systems", get(system_controller::get_all_systems)) + // for CORS handling .layer( CorsLayer::new() // allow `GET` - .allow_methods([Method::GET]) + .allow_methods([Method::GET, Method::POST]) // allow requests from any origin .allow_origin(Any), ) - .with_state(client); + // for socketio integration + .layer( + ServiceBuilder::new() + .layer(CorsLayer::permissive()) + .layer(socket_layer), + ) + .layer(TraceLayer::new_for_http()) + .with_state(db.clone()); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:8000") + .await + .expect("Could not bind to 8000!"); + let axum_token = token.clone(); + tokio::spawn(async { + axum::serve(listener, app) + .with_graceful_shutdown(async move { + _ = axum_token.cancelled().await; + }) + .await + .expect("Failed shutdown init for axum"); + }); + + task_tracker.close(); + + info!("Initialization complete, ready..."); + info!("Use Ctrl+C or SIGINT to exit cleanly!"); - let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); - axum::serve(listener, app).await.unwrap(); + // listen for ctrl_c, then cancel, close, and await for all tasks in the tracker. Other tasks cancel vai the default tokio system + signal::ctrl_c() + .await + .expect("Could not read cancellation trigger (ctr+c)"); + info!("Received exit signal, shutting down!"); + token.cancel(); + task_tracker.wait().await; } diff --git a/scylla-server-rust/src/processors/db_handler.rs b/scylla-server-rust/src/processors/db_handler.rs new file mode 100644 index 00000000..5ad20d2b --- /dev/null +++ b/scylla-server-rust/src/processors/db_handler.rs @@ -0,0 +1,317 @@ +use tokio::sync::mpsc::Receiver; + +use tokio::{sync::mpsc::Sender, time::Duration}; + +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, instrument, trace, warn, Level}; + +use crate::{ + services::{ + data_service, data_type_service, driver_service, location_service, node_service, + system_service, + }, + Database, +}; + +/// The upload interval for batch uploads +const UPLOAD_INTERVAL: u64 = 5000; + +use super::{ClientData, LocationData}; + +/// A struct defining an in progress location packet +struct LocLock { + location_name: Option, + points: Option<(f64, f64)>, + radius: Option, +} + +impl LocLock { + pub fn new() -> LocLock { + LocLock { + location_name: None, + points: None, + radius: None, + } + } + + /// Add the location name to the packet + pub fn add_loc_name(&mut self, loc_name: String) { + self.location_name = Some(loc_name); + } + + /// Add points to the packet + pub fn add_points(&mut self, lat: f64, long: f64) { + self.points = Some((lat, long)); + } + + /// Add a radius to the packet + pub fn add_radius(&mut self, radius: f64) { + self.radius = Some(radius); + } + + /// Attempt to finalize the packet, returning a location data and clearing this object or None if still in progress + pub fn finalize(&mut self) -> Option { + if self.location_name.is_some() && self.points.is_some() && self.radius.is_some() { + self.clear(); + return Some(LocationData { + location_name: self.location_name.clone().unwrap(), + lat: self.points.unwrap().0, + long: self.points.unwrap().1, + radius: self.radius.unwrap(), + }); + } + None + } + + /// Clear the internal state + fn clear(&mut self) { + self.location_name = None; + self.points = None; + self.radius = None; + } +} + +/// A few threads to manage the processing and inserting of special types, +/// upserting of metadata for data, and batch uploading the database +pub struct DbHandler { + /// The list of nodes seen by this instance, used for when to upsert + node_list: Vec, + /// The list of data types seen by this instance, used for when to upsert + datatype_list: Vec, + /// The broadcast channel which provides serial datapoints for processing + receiver: Receiver, + /// The database + db: Database, + /// An internal state of an in progress location packet + location_lock: LocLock, + /// Whether the location has been modified this loop + is_location: bool, + /// the queue of data + data_queue: Vec, + /// the time since last batch + last_time: tokio::time::Instant, +} + +impl DbHandler { + /// Make a new db handler + /// * `recv` - the broadcast reciver of which clientdata will be sent + pub fn new(receiver: Receiver, db: Database) -> DbHandler { + DbHandler { + node_list: vec![], + datatype_list: vec![], + receiver, + db, + location_lock: LocLock::new(), + is_location: false, + data_queue: vec![], + last_time: tokio::time::Instant::now(), + } + } + + /// This loop handles batch uploading, and has no internal state or requirements + /// It uses the queue from data queue to insert to the database specified + /// On cancellation, will await one final queue message to cleanup anything remaining in the channel + pub async fn batching_loop( + mut data_queue: Receiver>, + database: Database, + cancel_token: CancellationToken, + ) { + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + if let Some(final_msgs) = data_queue.recv().await { + info!( + "Final Batch uploaded: {:?}", + data_service::add_many(&database, final_msgs).await + ); + } else { + info!("No messages to cleanup.") + } + break; + }, + Some(msgs) = data_queue.recv() => { + Self::batch_upload(msgs, &database).await; + trace!( + "DB send: {} of {}", + data_queue.len(), + data_queue.max_capacity() + ); + } + } + } + } + + #[instrument(level = Level::DEBUG, skip(msg))] + async fn batch_upload(msg: Vec, db: &Database) { + match data_service::add_many(db, msg).await { + Ok(count) => info!("Batch uploaded: {:?}", count), + Err(err) => warn!("Error in batch upload: {:?}", err), + } + } + + /// A loop which uses self and a sender channel to process data + /// If the data is special, i.e. coordinates, driver, etc. it will store it in its special location of the db immediately + /// For all data points it will add the to the data_channel for batch uploading logic when a certain time has elapsed + /// Before this time the data is stored in an internal queue. + /// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation + pub async fn handling_loop( + mut self, + data_channel: Sender>, + cancel_token: CancellationToken, + ) { + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Pushing final messages to queue"); + data_channel.send(self.data_queue.clone()).await.expect("Could not comm data to db thread, shutdown"); + self.data_queue.clear(); + break; + }, + Some(msg) = self.receiver.recv() => { + self.handle_msg(msg, &data_channel).await; + } + } + } + } + + #[instrument(skip(self), level = Level::TRACE)] + async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender>) { + trace!( + "Mqtt dispatcher: {} of {}", + self.receiver.len(), + self.receiver.max_capacity() + ); + + // If the time is greater than upload interval, push to batch upload thread and clear queue + if tokio::time::Instant::now().duration_since(self.last_time) + > Duration::from_millis(UPLOAD_INTERVAL) + && !self.data_queue.is_empty() + { + data_channel + .send(self.data_queue.clone()) + .await + .expect("Could not comm data to db thread"); + self.data_queue.clear(); + self.last_time = tokio::time::Instant::now(); + } + + // upsert if not present, a sort of cache of upserted types really + if !self.node_list.contains(&msg.node) { + info!("Upserting node: {}", msg.node); + if let Err(msg) = node_service::upsert_node(&self.db, msg.node.clone()).await { + warn!("DB error node upsert: {:?}", msg); + } + self.node_list.push(msg.node.clone()); + } + if !self.datatype_list.contains(&msg.name) { + info!("Upserting data type: {}", msg.name); + if let Err(msg) = data_type_service::upsert_data_type( + &self.db, + msg.name.clone(), + msg.unit.clone(), + msg.node.clone(), + ) + .await + { + warn!("DB error datatype upsert: {:?}", msg); + } + self.datatype_list.push(msg.name.clone()); + } + + // if data has some special meanings, push them to the database immediately, notably no matter what also enter batching logic + match msg.name.as_str() { + "Driver" => { + debug!("Upserting driver: {:?}", msg.values); + if let Err(err) = driver_service::upsert_driver( + &self.db, + msg.values + .first() + .unwrap_or(&"PizzaTheHut".to_string()) + .to_string(), + msg.run_id, + ) + .await + { + warn!("Driver upsert error: {:?}", err); + } + } + "location" => { + debug!("Upserting location name: {:?}", msg.values); + self.location_lock.add_loc_name( + msg.values + .first() + .unwrap_or(&"PizzaTheHut".to_string()) + .to_string(), + ); + self.is_location = true; + } + "system" => { + debug!("Upserting system: {:?}", msg.values); + if let Err(err) = system_service::upsert_system( + &self.db, + msg.values + .first() + .unwrap_or(&"PizzaTheHut".to_string()) + .to_string(), + msg.run_id, + ) + .await + { + warn!("System upsert error: {:?}", err); + } + } + "GPS-Location" => { + debug!("Upserting location points: {:?}", msg.values); + self.location_lock.add_points( + msg.values + .first() + .unwrap_or(&"PizzaTheHut".to_string()) + .parse::() + .unwrap_or_default(), + msg.values + .get(1) + .unwrap_or(&"PizzaTheHut".to_string()) + .parse::() + .unwrap_or_default(), + ); + self.is_location = true; + } + "Radius" => { + debug!("Upserting location radius: {:?}", msg.values); + self.location_lock.add_radius( + msg.values + .first() + .unwrap_or(&"PizzaTheHut".to_string()) + .parse::() + .unwrap_or_default(), + ); + self.is_location = true; + } + _ => {} + } + // if location has been modified, push a new location of the loc lock object returns Some + if self.is_location { + trace!("Checking location status..."); + if let Some(loc) = self.location_lock.finalize() { + debug!("Upserting location: {:?}", loc); + if let Err(err) = location_service::upsert_location( + &self.db, + loc.location_name, + loc.lat, + loc.long, + loc.radius, + msg.run_id, + ) + .await + { + warn!("Location upsert error: {:?}", err); + } + } + self.is_location = false; + } + + // no matter what, batch upload the message + trace!("Pushing msg to queue: {:?}", msg); + self.data_queue.push(msg); + } +} diff --git a/scylla-server-rust/src/processors/mock_data.rs b/scylla-server-rust/src/processors/mock_data.rs new file mode 100644 index 00000000..0ca1c447 --- /dev/null +++ b/scylla-server-rust/src/processors/mock_data.rs @@ -0,0 +1,136 @@ +use super::mock_processor::{MockData, MockStringData}; + +pub const BASE_MOCK_DATA: [MockData; 17] = [ + MockData { + name: "Status-Temp_Average", + unit: "C", + num_of_vals: 1, + min: -20.0, + max: 54.0, + }, + MockData { + name: "Temps-Motor_Temperature", + unit: "C", + num_of_vals: 1, + min: -20.0, + max: 54.0, + }, + MockData { + name: "Pack-SOC", + unit: "%", + num_of_vals: 1, + min: 0.0, + max: 100.0, + }, + MockData { + name: "Sense-Accel", + unit: "G", + num_of_vals: 3, + min: -6.0, + max: 6.0, + }, + MockData { + name: "GPS-Location", + unit: "coordinates", + num_of_vals: 2, + min: -90.0, + max: 90.0, + }, + MockData { + name: "Sense-SteeringAngle", + unit: "degrees", + num_of_vals: 1, + min: 0.0, + max: 360.0, + }, + MockData { + name: "Pack-Voltage", + unit: "V", + num_of_vals: 1, + min: 0.0, + max: 5.0, + }, + MockData { + name: "OnBoard-CpuUsage", + unit: "%", + num_of_vals: 1, + min: 0.0, + max: 100.0, + }, + MockData { + name: "OnBoard-CpuTemp", + unit: "C", + num_of_vals: 1, + min: 0.0, + max: 100.0, + }, + MockData { + name: "OnBoard-MemAvailable", + unit: "mb", + num_of_vals: 1, + min: 0.0, + max: 8000.0, + }, + MockData { + name: "HaLow-RSSI", + unit: "dbm", + num_of_vals: 1, + min: -150.0, + max: 80.0, + }, + MockData { + name: "HaLow-StaMCS", + unit: "", + num_of_vals: 1, + min: 0.0, + max: 10.0, + }, + MockData { + name: "Status/MPH", + unit: "mph", + num_of_vals: 1, + min: 0.0, + max: 88.0, + }, + MockData { + name: "Pack-CCL", + unit: "A", + num_of_vals: 1, + min: -35.0, + max: 0.0, + }, + MockData { + name: "Pack-DCL", + unit: "A", + num_of_vals: 1, + min: 0.0, + max: 550.0, + }, + MockData { + name: "Pedals-Brake1", + unit: "", + num_of_vals: 1, + min: 0.0, + max: 3000.0, + }, + MockData { + name: "Power-AC_Current", + unit: "A", + num_of_vals: 1, + min: 0.0, + max: 600.0, + }, +]; + +pub const BASE_MOCK_STRING_DATA: [MockStringData; 2] = [ + MockStringData { + name: "Driver", + unit: "String", + vals: "Fergus", + }, + MockStringData { + name: "Location", + unit: "String", + vals: "Max", + }, +]; diff --git a/scylla-server-rust/src/processors/mock_processor.rs b/scylla-server-rust/src/processors/mock_processor.rs new file mode 100644 index 00000000..c6f8b789 --- /dev/null +++ b/scylla-server-rust/src/processors/mock_processor.rs @@ -0,0 +1,96 @@ +use std::time::Duration; + +use prisma_client_rust::{chrono, serde_json}; +use rand::Rng; +use socketioxide::SocketIo; +use tracing::warn; + +use super::{ + mock_data::{BASE_MOCK_DATA, BASE_MOCK_STRING_DATA}, + ClientData, +}; + +#[derive(Clone, Copy)] +pub struct MockData { + pub name: &'static str, + pub unit: &'static str, + pub num_of_vals: u8, + pub min: f64, + pub max: f64, +} + +impl MockData { + fn get_values(&self) -> Vec { + let mut val_vec: Vec = vec![]; + // for each point, get a random number in the range + for _ in 0..self.num_of_vals { + val_vec.push( + rand::thread_rng() + .gen_range((self.min)..(self.max)) + .to_string(), + ); + } + + val_vec + } +} + +#[derive(Clone, Copy)] +pub struct MockStringData { + pub name: &'static str, + pub unit: &'static str, + pub vals: &'static str, +} + +pub struct MockProcessor { + curr_run: i32, + io: SocketIo, +} + +impl MockProcessor { + pub fn new(io: SocketIo) -> Self { + MockProcessor { curr_run: 1, io } + } + + pub async fn generate_mock(self) { + loop { + // get a random mock datapoint the first 0 to len of number mock data is for the non string and x to len of string mocks is a string mock index. + let index = rand::thread_rng() + .gen_range(0..(BASE_MOCK_DATA.len() + BASE_MOCK_STRING_DATA.len())); + + // if we are doing non-string mock this loop + let client_data: ClientData = if index < BASE_MOCK_DATA.len() { + let dat = BASE_MOCK_DATA[index]; + + ClientData { + run_id: self.curr_run, + name: dat.name.to_string(), + unit: dat.unit.to_string(), + values: dat.get_values(), + timestamp: chrono::offset::Utc::now().timestamp_millis(), + node: "".to_string(), // uneeded for socket use only + } + // do a string mock + } else { + let dat = BASE_MOCK_STRING_DATA[index - BASE_MOCK_DATA.len()]; + ClientData { + run_id: self.curr_run, + name: dat.name.to_string(), + unit: dat.unit.to_string(), + values: vec![dat.vals.to_string()], + timestamp: chrono::offset::Utc::now().timestamp_millis(), + node: "".to_string(), // uneeded for socket use only + } + }; + + match self.io.emit( + "message", + serde_json::to_string(&client_data).expect("Could not serialize ClientData"), + ) { + Ok(_) => (), + Err(err) => warn!("Socket: Broadcast error: {}", err), + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + } +} diff --git a/scylla-server-rust/src/processors/mod.rs b/scylla-server-rust/src/processors/mod.rs new file mode 100644 index 00000000..e28de9c8 --- /dev/null +++ b/scylla-server-rust/src/processors/mod.rs @@ -0,0 +1,31 @@ +pub mod db_handler; +mod mock_data; +pub mod mock_processor; +pub mod mqtt_processor; + +/// Represents the client data +/// This has the dual purposes of +/// * - representing the packet sent over the socket for live data +/// * - representing the struct for the service layer to unpack for insertion +/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet +#[derive(serde::Serialize, Clone, Debug)] +pub struct ClientData { + pub run_id: i32, + pub name: String, + pub unit: String, + pub values: Vec, + pub timestamp: i64, + + #[serde(skip_serializing)] + pub node: String, +} + +/// A final location packet +/// This has the purpose of representing the struct for the service layer to unpack for insertion, and therefore is not serialized +#[derive(Debug)] +struct LocationData { + location_name: String, + lat: f64, + long: f64, + radius: f64, +} diff --git a/scylla-server-rust/src/processors/mqtt_processor.rs b/scylla-server-rust/src/processors/mqtt_processor.rs new file mode 100644 index 00000000..b824df0d --- /dev/null +++ b/scylla-server-rust/src/processors/mqtt_processor.rs @@ -0,0 +1,242 @@ +use core::fmt; +use std::time::Duration; + +use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json}; +use protobuf::Message; +use ringbuffer::RingBuffer; +use rumqttc::v5::{ + mqttbytes::v5::{LastWill, Packet, Publish}, + AsyncClient, Event, MqttOptions, +}; +use socketioxide::SocketIo; +use tokio::sync::mpsc::{Receiver, Sender}; +use tracing::{debug, instrument, trace, warn, Level}; + +use crate::{serverdata, services::run_service}; + +use super::ClientData; +use std::borrow::Cow; + +pub struct MqttProcessor { + channel: Sender, + new_run_channel: Receiver, + curr_run: i32, + io: SocketIo, + mqtt_ops: MqttOptions, +} + +impl MqttProcessor { + /// Creates a new mqtt receiver and socketio and db sender + /// * `channel` - The mpsc channel to send the database data to + /// * `mqtt_path` - The mqtt URI, including port, (without the mqtt://) to subscribe to + /// * `db` - The database to store the data in + /// * `io` - The socketio layer to send the data to + /// + /// This is async as it creates the initial run and gets the ID, as well as connecting to and subbing Siren + /// Returns the instance and the event loop, which can be passed into the process_mqtt func to begin recieiving + pub async fn new( + channel: Sender, + new_run_channel: Receiver, + mqtt_path: String, + initial_run: i32, + io: SocketIo, + ) -> MqttProcessor { + // create the mqtt client and configure it + let mut create_opts = MqttOptions::new( + "ScyllaServer", + mqtt_path.split_once(':').expect("Invalid Siren URL").0, + mqtt_path + .split_once(':') + .unwrap() + .1 + .parse::() + .expect("Invalid Siren port"), + ); + create_opts + .set_last_will(LastWill::new( + "Scylla/Status", + "Scylla has disconnected!", + rumqttc::v5::mqttbytes::QoS::ExactlyOnce, + true, + None, + )) + .set_keep_alive(Duration::from_secs(20)) + .set_clean_start(false) + .set_connection_timeout(3) + .set_session_expiry_interval(Some(u32::MAX)) + .set_topic_alias_max(Some(600)); + + MqttProcessor { + channel, + new_run_channel, + curr_run: initial_run, + io, + mqtt_ops: create_opts, + } + } + + /// This handles the reception of mqtt messages, will not return + /// * `connect` - The eventloop returned by ::new to connect to + pub async fn process_mqtt(mut self) { + let mut view_interval = tokio::time::interval(Duration::from_secs(3)); + + let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); + let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); + + // process over messages, non blocking + // TODO mess with incoming message cap if db, etc. cannot keep up + let (client, mut connect) = AsyncClient::new(self.mqtt_ops.clone(), 600); + + debug!("Subscribing to siren"); + client + .subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce) + .await + .expect("Could not subscribe to Siren"); + + loop { + #[rustfmt::skip] // rust cannot format this macro for some reason + tokio::select! { + msg = connect.poll() => match msg { + Ok(Event::Incoming(Packet::Publish(msg))) => { + trace!("Received mqtt message: {:?}", msg); + // parse the message into the data and the node name it falls under + let msg = match self.parse_msg(msg) { + Ok(msg) => msg, + Err(err) => { + warn!("Message parse error: {:?}", err); + continue; + } + }; + latency_ringbuffer.push(chrono::offset::Utc::now().timestamp_millis() - msg.timestamp); + self.send_db_msg(msg.clone()).await; + self.send_socket_msg(msg); + }, + Err(msg) => trace!("Received mqtt error: {:?}", msg), + _ => trace!("Received misc mqtt: {:?}", msg), + }, + _ = view_interval.tick() => { + trace!("Updating viewership data!"); + if let Ok(sockets) = self.io.sockets() { + let client_data = ClientData { + name: "Viewers".to_string(), + node: "Internal".to_string(), + unit: "".to_string(), + run_id: self.curr_run, + timestamp: chrono::offset::Utc::now().timestamp_millis(), + values: vec![sockets.len().to_string()] + }; + self.send_socket_msg(client_data); + } else { + warn!("Could not fetch socket count"); + } + } + _ = latency_interval.tick() => { + // set latency to 0 if no messages are in buffer + let avg_latency = if latency_ringbuffer.is_empty() { + 0 + } else { + latency_ringbuffer.iter().sum::() / latency_ringbuffer.len().to_i64().unwrap_or_default() + }; + + let client_data = ClientData { + name: "Latency".to_string(), + node: "Internal".to_string(), + unit: "ms".to_string(), + run_id: self.curr_run, + timestamp: chrono::offset::Utc::now().timestamp_millis(), + values: vec![avg_latency.to_string()] + }; + trace!("Latency update sending: {}", client_data.values.first().unwrap_or(&"n/a".to_string())); + self.send_socket_msg(client_data); + } + Some(new_run) = self.new_run_channel.recv() => { + trace!("New run: {:?}", new_run); + self.curr_run = new_run.id; + } + } + } + } + + /// Parse the message + /// * `msg` - The mqtt message to parse + /// returns the ClientData, or the Err of something that can be debug printed + #[instrument(skip(self), level = Level::TRACE)] + fn parse_msg(&self, msg: Publish) -> Result { + let data = serverdata::ServerData::parse_from_bytes(&msg.payload) + .map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?; + + let split = std::str::from_utf8(&msg.topic) + .map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))? + .split_once('/') + .ok_or(&format!("Could not parse nesting: {:?}", msg.topic))?; + + let node = split.0; + + let data_type = split.1.replace('/', "-"); + + // extract the unix time, returning the current time instead if either the "ts" user property isnt present or it isnt parsable + // note the Cow magic involves the return from the map is a borrow, but the unwrap cannot as we dont own it + let unix_time = msg + .properties + .unwrap_or_default() + .user_properties + .iter() + .map(Cow::Borrowed) + .find(|f| f.0 == "ts") + .unwrap_or_else(|| { + debug!("Could not find timestamp in mqtt, using system time"); + Cow::Owned(( + "ts".to_string(), + chrono::offset::Utc::now().timestamp_millis().to_string(), + )) + }) + .1 + .parse::() + .unwrap_or_else(|err| { + warn!("Invalid timestamp in mqtt, using system time: {}", err); + chrono::offset::Utc::now().timestamp_millis() + }); + + // ts check for bad sources of time which may return 1970 + // if both system time and packet timestamp are before year 2000, the message cannot be recorded + let unix_clean = if unix_time < 963014966000 { + debug!("Timestamp before year 2000: {}", unix_time); + let sys_time = chrono::offset::Utc::now().timestamp_millis(); + if sys_time < 963014966000 { + return Err("System has no good time, discarding message!".to_string()); + } + sys_time + } else { + unix_time + }; + + Ok(ClientData { + run_id: self.curr_run, + name: data_type, + unit: data.unit, + values: data.value, + timestamp: unix_clean, + node: node.to_string(), + }) + } + + /// Send a message to the channel, printing and IGNORING any error that may occur + /// * `client_data` - The client data to send over the broadcast + async fn send_db_msg(&self, client_data: ClientData) { + if let Err(err) = self.channel.send(client_data.clone()).await { + warn!("Error sending through channel: {:?}", err) + } + } + + /// Sends a message to the socket, printing and IGNORING any error that may occur + /// * `client_data` - The client data to send over the broadcast + fn send_socket_msg(&self, client_data: ClientData) { + match self.io.emit( + "message", + serde_json::to_string(&client_data).expect("Could not serialize ClientData"), + ) { + Ok(_) => (), + Err(err) => warn!("Socket: Broadcast error: {}", err), + } + } +} diff --git a/scylla-server-rust/src/services/data_service.rs b/scylla-server-rust/src/services/data_service.rs index 3283b1db..d3a65be1 100644 --- a/scylla-server-rust/src/services/data_service.rs +++ b/scylla-server-rust/src/services/data_service.rs @@ -1,26 +1,30 @@ use prisma_client_rust::{chrono::DateTime, QueryError}; -use crate::{ - prisma::{self}, - serverdata::ServerData, - Database, -}; +use crate::{prisma, processors::ClientData, Database}; + +prisma::data::select! {public_data { + time + values +}} /// Get datapoints that mach criteria /// * `db` - The prisma client to make the call to /// * `data_type_name` - The data type name to filter the data by -/// * `run_id` - The run id to filter the data by +/// * `run_id` - The run id to filter the data +/// * `fetch_run` whether to fetch the run assocaited with this data +/// * `fetch_data_type` whether to fetch the data type associated with this data /// returns: A result containing the data or the QueryError propogated by the db pub async fn get_data( db: &Database, data_type_name: String, run_id: i32, -) -> Result, QueryError> { +) -> Result, QueryError> { db.data() .find_many(vec![ prisma::data::data_type_name::equals(data_type_name), prisma::data::run_id::equals(run_id), ]) + .select(public_data::select()) .exec() .await } @@ -34,26 +38,50 @@ pub async fn get_data( /// returns: A result containing the data or the QueryError propogated by the db pub async fn add_data( db: &Database, - serverdata: ServerData, - unix_time: i64, - data_type_name: String, - run_id: i32, -) -> Result { + client_data: ClientData, +) -> Result { db.data() .create( - prisma::data_type::name::equals(data_type_name), - DateTime::from_timestamp_millis(unix_time) - .unwrap() + prisma::data_type::name::equals(client_data.name), + DateTime::from_timestamp_millis(client_data.timestamp) + .expect("Could not parse timestamp") .fixed_offset(), - prisma::run::id::equals(run_id), + prisma::run::id::equals(client_data.run_id), vec![prisma::data::values::set( - serverdata - .value + client_data + .values .iter() - .map(|f| f.parse::().unwrap()) + .map(|f| f.parse::().unwrap_or_default()) .collect(), )], ) + .select(public_data::select()) + .exec() + .await +} + +pub async fn add_many(db: &Database, client_data: Vec) -> Result { + db.data() + .create_many( + client_data + .iter() + .map(|f| { + prisma::data::create_unchecked( + f.name.to_string(), + DateTime::from_timestamp_millis(f.timestamp) + .expect("Could not parse timestamp") + .fixed_offset(), + f.run_id, + vec![prisma::data::values::set( + f.values + .iter() + .map(|f| f.parse::().unwrap_or_default()) + .collect(), + )], + ) + }) + .collect(), + ) .exec() .await } diff --git a/scylla-server-rust/src/services/data_type_service.rs b/scylla-server-rust/src/services/data_type_service.rs index 00bf25e2..8e1aabb5 100644 --- a/scylla-server-rust/src/services/data_type_service.rs +++ b/scylla-server-rust/src/services/data_type_service.rs @@ -2,11 +2,20 @@ use prisma_client_rust::QueryError; use crate::{prisma, Database}; +prisma::data_type::select! {public_datatype { + name + unit +}} + /// Gets all datatypes /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_data_types(db: &Database) -> Result, QueryError> { - db.data_type().find_many(vec![]).exec().await +pub async fn get_all_data_types(db: &Database) -> Result, QueryError> { + db.data_type() + .find_many(vec![]) + .select(public_datatype::select()) + .exec() + .await } /// Upserts a datatype, either creating or updating one depending on its existence @@ -20,21 +29,23 @@ pub async fn upsert_data_type( data_type_name: String, unit: String, node_name: String, -) -> Result { +) -> Result { db.data_type() .upsert( prisma::data_type::name::equals(data_type_name.clone()), prisma::data_type::create( - data_type_name, + data_type_name.clone(), unit.clone(), prisma::node::name::equals(node_name.clone()), vec![], ), vec![ prisma::data_type::unit::set(unit), - prisma::data_type::node_name::set(node_name), + prisma::data_type::name::set(data_type_name.clone()), + prisma::data_type::node::connect(prisma::node::name::equals(node_name.clone())), ], ) + .select(public_datatype::select()) .exec() .await } diff --git a/scylla-server-rust/src/services/driver_service.rs b/scylla-server-rust/src/services/driver_service.rs index 18145aca..247fd179 100644 --- a/scylla-server-rust/src/services/driver_service.rs +++ b/scylla-server-rust/src/services/driver_service.rs @@ -5,11 +5,26 @@ use crate::{ Database, }; +prisma::driver::select! { public_driver{ + username + runs: select { + id + location_name + driver_name + system_name + time + } +}} + /// Gets all drivers /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_drivers(db: &Database) -> Result, QueryError> { - db.driver().find_many(vec![]).exec().await +pub async fn get_all_drivers(db: &Database) -> Result, QueryError> { + db.driver() + .find_many(vec![]) + .select(public_driver::select()) + .exec() + .await } /// Upserts a driver, either creating or updating one depending on its existence @@ -21,7 +36,7 @@ pub async fn upsert_driver( db: &Database, driver_name: String, run_id: i32, -) -> Result { +) -> Result { let drive = db .driver() .upsert( @@ -29,6 +44,7 @@ pub async fn upsert_driver( prisma::driver::create(driver_name.clone(), vec![]), vec![], ) + .select(public_driver::select()) .exec() .await?; diff --git a/scylla-server-rust/src/services/location_service.rs b/scylla-server-rust/src/services/location_service.rs index 2e1483d8..04cf5796 100644 --- a/scylla-server-rust/src/services/location_service.rs +++ b/scylla-server-rust/src/services/location_service.rs @@ -5,11 +5,29 @@ use crate::{ Database, }; +prisma::location::select! {public_location{ + name + latitude + longitude + radius + runs: select { + id + location_name + driver_name + system_name + time + } +}} + /// Gets all locations /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_locations(db: &Database) -> Result, QueryError> { - db.location().find_many(vec![]).exec().await +pub async fn get_all_locations(db: &Database) -> Result, QueryError> { + db.location() + .find_many(vec![]) + .select(public_location::select()) + .exec() + .await } /// Upserts a location, either creating or updating one depending on its existence @@ -27,7 +45,7 @@ pub async fn upsert_location( longitude: f64, radius: f64, run_id: i32, -) -> Result { +) -> Result { let loc = db .location() .upsert( @@ -39,6 +57,7 @@ pub async fn upsert_location( prisma::location::radius::set(radius), ], ) + .select(public_location::select()) .exec() .await?; diff --git a/scylla-server-rust/src/services/node_service.rs b/scylla-server-rust/src/services/node_service.rs index 5f8608af..4ecd9430 100644 --- a/scylla-server-rust/src/services/node_service.rs +++ b/scylla-server-rust/src/services/node_service.rs @@ -2,11 +2,23 @@ use prisma_client_rust::QueryError; use crate::{prisma, Database}; +prisma::node::include! {public_node{ + data_types: select { + name + unit + } +}} + /// Gets all nodes /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_nodes(db: &Database) -> Result, QueryError> { - db.node().find_many(vec![]).exec().await +pub async fn get_all_nodes(db: &Database) -> Result, QueryError> { + db.node() + .find_many(vec![]) + .with(prisma::node::data_types::fetch(vec![])) + .include(public_node::include()) + .exec() + .await } /// Upserts a node, either creating or updating one depending on its existence @@ -16,13 +28,14 @@ pub async fn get_all_nodes(db: &Database) -> Result, Que pub async fn upsert_node( db: &Database, node_name: String, -) -> Result { +) -> Result { db.node() .upsert( prisma::node::name::equals(node_name.clone()), prisma::node::create(node_name, vec![]), vec![], ) + .include(public_node::include()) .exec() .await } diff --git a/scylla-server-rust/src/services/run_service.rs b/scylla-server-rust/src/services/run_service.rs index 1a126040..3e1581fa 100644 --- a/scylla-server-rust/src/services/run_service.rs +++ b/scylla-server-rust/src/services/run_service.rs @@ -7,11 +7,23 @@ use crate::{ Database, }; +prisma::run::select! {public_run{ + id + location_name + driver_name + system_name + time +}} + /// Gets all runs /// * `db` - The prisma client to make the call to /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_runs(db: &Database) -> Result, QueryError> { - db.run().find_many(vec![]).exec().await +pub async fn get_all_runs(db: &Database) -> Result, QueryError> { + db.run() + .find_many(vec![]) + .select(public_run::select()) + .exec() + .await } /// Gets a single run by its id @@ -21,9 +33,10 @@ pub async fn get_all_runs(db: &Database) -> Result, Query pub async fn get_run_by_id( db: &Database, run_id: i32, -) -> Result, QueryError> { +) -> Result, QueryError> { db.run() .find_unique(prisma::run::id::equals(run_id)) + .select(public_run::select()) .exec() .await } @@ -32,14 +45,15 @@ pub async fn get_run_by_id( /// * `db` - The prisma client to make the call to /// * `timestamp` - The unix time since epoch in miliseconds when the run starts /// returns: A result containing the data or the QueryError propogated by the db -pub async fn create_run(db: &Database, timestamp: i64) -> Result { +pub async fn create_run(db: &Database, timestamp: i64) -> Result { db.run() .create( DateTime::from_timestamp_millis(timestamp) - .unwrap() + .expect("Could not parse timestamp") .fixed_offset(), vec![], ) + .select(public_run::select()) .exec() .await } @@ -53,14 +67,15 @@ pub async fn create_run_with_id( db: &Database, timestamp: i64, run_id: i32, -) -> Result { +) -> Result { db.run() .create( DateTime::from_timestamp_millis(timestamp) - .unwrap() + .expect("Could not parse timestamp") .fixed_offset(), vec![prisma::run::id::set(run_id)], ) + .select(public_run::select()) .exec() .await } diff --git a/scylla-server-rust/src/services/system_service.rs b/scylla-server-rust/src/services/system_service.rs index 54f7ca08..a8b18e2c 100644 --- a/scylla-server-rust/src/services/system_service.rs +++ b/scylla-server-rust/src/services/system_service.rs @@ -5,14 +5,29 @@ use crate::{ Database, }; +prisma::system::select! {public_system{ + name + runs: select { + id + location_name + driver_name + system_name + time + } +}} + /// Upserts a datatype, either creating or updating one depending on its existence /// * `db` - The prisma client to make the call to /// * `data_type_name` - The data type name to upsert /// * `unit` - The unit of the data /// * `node_name` - The name of the node linked to the data type, must already exist! /// returns: A result containing the data or the QueryError propogated by the db -pub async fn get_all_systems(db: &Database) -> Result, QueryError> { - db.system().find_many(vec![]).exec().await +pub async fn get_all_systems(db: &Database) -> Result, QueryError> { + db.system() + .find_many(vec![]) + .select(public_system::select()) + .exec() + .await } /// Upserts a system, either creating or updating one depending on its existence @@ -24,7 +39,7 @@ pub async fn upsert_system( db: &Database, system_name: String, run_id: i32, -) -> Result { +) -> Result { let system = db .system() .upsert( @@ -32,6 +47,7 @@ pub async fn upsert_system( prisma::system::create(system_name.clone(), vec![]), vec![], ) + .select(public_system::select()) .exec() .await?; diff --git a/scylla-server-rust/src/transformers/data_transformer.rs b/scylla-server-rust/src/transformers/data_transformer.rs index ab183948..6fd20b3d 100644 --- a/scylla-server-rust/src/transformers/data_transformer.rs +++ b/scylla-server-rust/src/transformers/data_transformer.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::prisma; +use crate::{processors::ClientData, services::data_service}; /// The struct defining the data format sent to the client #[derive(Serialize, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -10,11 +10,21 @@ pub struct PublicData { } /// convert the prisma type to the client type for JSON encoding -impl From<&prisma::data::Data> for PublicData { - fn from(value: &prisma::data::Data) -> Self { +impl From<&data_service::public_data::Data> for PublicData { + fn from(value: &data_service::public_data::Data) -> Self { PublicData { - values: value.values.iter().map(|f| f.to_string()).collect(), + values: value.values.iter().map(f64::to_string).collect(), time: value.time.timestamp_millis(), } } } + +/// convert from the client (socket) type to the client type, for debugging and testing only probably +impl From for PublicData { + fn from(value: ClientData) -> Self { + PublicData { + time: value.timestamp, + values: value.values, + } + } +} diff --git a/scylla-server-rust/src/transformers/data_type_transformer.rs b/scylla-server-rust/src/transformers/data_type_transformer.rs index 34a92615..7075250e 100644 --- a/scylla-server-rust/src/transformers/data_type_transformer.rs +++ b/scylla-server-rust/src/transformers/data_type_transformer.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::prisma; +use crate::services::{data_type_service, node_service}; /// The struct defining the data type format sent to the client #[derive(Serialize, Debug, PartialEq)] @@ -9,8 +9,17 @@ pub struct PublicDataType { pub unit: String, } -impl From<&prisma::data_type::Data> for PublicDataType { - fn from(value: &prisma::data_type::Data) -> Self { +impl From<&data_type_service::public_datatype::Data> for PublicDataType { + fn from(value: &data_type_service::public_datatype::Data) -> Self { + PublicDataType { + name: value.name.clone(), + unit: value.unit.clone(), + } + } +} + +impl From<&node_service::public_node::data_types::Data> for PublicDataType { + fn from(value: &node_service::public_node::data_types::Data) -> Self { PublicDataType { name: value.name.clone(), unit: value.unit.clone(), diff --git a/scylla-server-rust/src/transformers/driver_transformer.rs b/scylla-server-rust/src/transformers/driver_transformer.rs index b7a0520f..5bba8b4b 100644 --- a/scylla-server-rust/src/transformers/driver_transformer.rs +++ b/scylla-server-rust/src/transformers/driver_transformer.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::prisma; +use crate::services::driver_service; use super::run_transformer::PublicRun; @@ -11,17 +11,11 @@ pub struct PublicDriver { runs: Vec, } -impl From<&prisma::driver::Data> for PublicDriver { - fn from(value: &prisma::driver::Data) -> Self { +impl From<&driver_service::public_driver::Data> for PublicDriver { + fn from(value: &driver_service::public_driver::Data) -> Self { PublicDriver { username: value.username.clone(), - runs: value - .runs - .clone() - .unwrap_or_default() - .iter() - .map(PublicRun::from) - .collect(), + runs: value.runs.clone().iter().map(PublicRun::from).collect(), } } } diff --git a/scylla-server-rust/src/transformers/location_transformer.rs b/scylla-server-rust/src/transformers/location_transformer.rs index c8dedc90..7fef3d21 100644 --- a/scylla-server-rust/src/transformers/location_transformer.rs +++ b/scylla-server-rust/src/transformers/location_transformer.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::prisma; +use crate::services::location_service; use super::run_transformer::PublicRun; @@ -14,20 +14,14 @@ pub struct PublicLocation { pub runs: Vec, } -impl From<&prisma::location::Data> for PublicLocation { - fn from(value: &prisma::location::Data) -> Self { +impl From<&location_service::public_location::Data> for PublicLocation { + fn from(value: &location_service::public_location::Data) -> Self { PublicLocation { name: value.name.clone(), latitude: value.latitude, longitude: value.longitude, radius: value.radius, - runs: value - .runs - .clone() - .unwrap_or_default() - .iter() - .map(PublicRun::from) - .collect(), + runs: value.runs.clone().iter().map(PublicRun::from).collect(), } } } diff --git a/scylla-server-rust/src/transformers/node_transformer.rs b/scylla-server-rust/src/transformers/node_transformer.rs index c8ebbca4..d1551e9a 100644 --- a/scylla-server-rust/src/transformers/node_transformer.rs +++ b/scylla-server-rust/src/transformers/node_transformer.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::prisma; +use crate::services::node_service; use super::data_type_transformer::PublicDataType; @@ -12,14 +12,13 @@ pub struct PublicNode { data_types: Vec, } -impl From<&prisma::node::Data> for PublicNode { - fn from(value: &prisma::node::Data) -> Self { +impl From<&node_service::public_node::Data> for PublicNode { + fn from(value: &node_service::public_node::Data) -> Self { PublicNode { name: value.name.clone(), data_types: value .data_types .clone() - .unwrap_or_default() .iter() .map(PublicDataType::from) .collect(), diff --git a/scylla-server-rust/src/transformers/run_transformer.rs b/scylla-server-rust/src/transformers/run_transformer.rs index 0f2ca2bf..b1e18b6b 100644 --- a/scylla-server-rust/src/transformers/run_transformer.rs +++ b/scylla-server-rust/src/transformers/run_transformer.rs @@ -1,6 +1,9 @@ use serde::Serialize; -use crate::prisma; +use crate::services::{ + driver_service::public_driver, location_service::public_location, run_service::public_run, + system_service::public_system, +}; /// The struct defining the run format sent to the client #[derive(Serialize, Debug, PartialEq)] @@ -15,8 +18,46 @@ pub struct PublicRun { pub time: i64, } -impl From<&prisma::run::Data> for PublicRun { - fn from(value: &prisma::run::Data) -> Self { +impl From<&public_run::Data> for PublicRun { + fn from(value: &public_run::Data) -> Self { + PublicRun { + id: value.id, + location_name: value.location_name.clone().unwrap_or_default(), + driver_name: value.driver_name.clone().unwrap_or_default(), + system_name: value.system_name.clone().unwrap_or_default(), + time: value.time.timestamp_millis(), + } + } +} + +// why are these three needed? basically the nested relations via select do not "share" nested relations elsewhere. +// ultimately this means structs with identical fields have non identical types, and as they are macro generated they cannot be derived together +impl From<&public_driver::runs::Data> for PublicRun { + fn from(value: &public_driver::runs::Data) -> Self { + PublicRun { + id: value.id, + location_name: value.location_name.clone().unwrap_or_default(), + driver_name: value.driver_name.clone().unwrap_or_default(), + system_name: value.system_name.clone().unwrap_or_default(), + time: value.time.timestamp_millis(), + } + } +} + +impl From<&public_location::runs::Data> for PublicRun { + fn from(value: &public_location::runs::Data) -> Self { + PublicRun { + id: value.id, + location_name: value.location_name.clone().unwrap_or_default(), + driver_name: value.driver_name.clone().unwrap_or_default(), + system_name: value.system_name.clone().unwrap_or_default(), + time: value.time.timestamp_millis(), + } + } +} + +impl From<&public_system::runs::Data> for PublicRun { + fn from(value: &public_system::runs::Data) -> Self { PublicRun { id: value.id, location_name: value.location_name.clone().unwrap_or_default(), diff --git a/scylla-server-rust/src/transformers/system_transformer.rs b/scylla-server-rust/src/transformers/system_transformer.rs index 34f76d3f..8985239c 100644 --- a/scylla-server-rust/src/transformers/system_transformer.rs +++ b/scylla-server-rust/src/transformers/system_transformer.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::prisma; +use crate::services::system_service; use super::run_transformer::PublicRun; @@ -11,17 +11,11 @@ pub struct PublicSystem { pub runs: Vec, } -impl From<&prisma::system::Data> for PublicSystem { - fn from(value: &prisma::system::Data) -> Self { +impl From<&system_service::public_system::Data> for PublicSystem { + fn from(value: &system_service::public_system::Data) -> Self { PublicSystem { name: value.name.clone(), - runs: value - .runs - .clone() - .unwrap_or_default() - .iter() - .map(PublicRun::from) - .collect(), + runs: value.runs.clone().iter().map(PublicRun::from).collect(), } } } diff --git a/scylla-server-rust/tests/data_service_test.rs b/scylla-server-rust/tests/data_service_test.rs index d32b58ea..327836d6 100644 --- a/scylla-server-rust/tests/data_service_test.rs +++ b/scylla-server-rust/tests/data_service_test.rs @@ -2,9 +2,8 @@ mod test_utils; use prisma_client_rust::QueryError; -use protobuf::SpecialFields; use scylla_server_rust::{ - serverdata::ServerData, + processors::ClientData, services::{data_service, data_type_service, node_service, run_service}, transformers::data_transformer::PublicData, }; @@ -46,14 +45,14 @@ async fn test_data_add() -> Result<(), QueryError> { let data = data_service::add_data( &db, - ServerData { - value: vec!["0".to_owned()], + ClientData { + values: vec!["0".to_owned()], unit: "A".to_owned(), - special_fields: SpecialFields::new(), + run_id: run_data.id, + name: TEST_KEYWORD.to_owned(), + timestamp: 1000, + node: "Irrelevant".to_string(), }, - 1000, - TEST_KEYWORD.to_owned(), - run_data.id, ) .await?; @@ -87,14 +86,14 @@ async fn test_data_no_prereqs() -> Result<(), QueryError> { // should err out as data type name doesnt exist yet data_service::add_data( &db, - ServerData { - value: vec!["0".to_owned()], + ClientData { + values: vec!["0".to_owned()], unit: "A".to_owned(), - special_fields: SpecialFields::new(), + run_id: 0, + name: TEST_KEYWORD.to_owned(), + timestamp: 1000, + node: "Irrelevant".to_string(), }, - 1000, - TEST_KEYWORD.to_owned(), - 0, ) .await .expect_err("Should have errored, datatype doesnt exist!"); @@ -113,14 +112,14 @@ async fn test_data_no_prereqs() -> Result<(), QueryError> { // now shouldnt fail as it and node does exist data_service::add_data( &db, - ServerData { - value: vec!["0".to_owned()], + ClientData { + values: vec!["0".to_owned()], unit: "A".to_owned(), - special_fields: SpecialFields::new(), + run_id: 0, + name: TEST_KEYWORD.to_owned(), + timestamp: 1000, + node: "Irrelevant".to_string(), }, - 1000, - TEST_KEYWORD.to_owned(), - 0, ) .await?; diff --git a/scylla-server-rust/tests/data_type_service_test.rs b/scylla-server-rust/tests/data_type_service_test.rs index 86037f0f..36c156b0 100644 --- a/scylla-server-rust/tests/data_type_service_test.rs +++ b/scylla-server-rust/tests/data_type_service_test.rs @@ -4,7 +4,10 @@ mod test_utils; use prisma_client_rust::QueryError; use scylla_server_rust::{ prisma, - services::{data_type_service, node_service}, + services::{ + data_type_service::{self, public_datatype}, + node_service, + }, transformers::data_type_transformer::PublicDataType, }; use test_utils::cleanup_and_prepare; @@ -56,6 +59,7 @@ async fn test_datatype_create() -> Result<(), QueryError> { let data = db .data_type() .find_unique(prisma::data_type::name::equals(data_type_name.clone())) + .select(public_datatype::select()) .exec() .await? .expect("This should not be empty"); diff --git a/scylla-server-rust/tests/system_service_test.rs b/scylla-server-rust/tests/system_service_test.rs index aa43d3c6..82e45d1c 100644 --- a/scylla-server-rust/tests/system_service_test.rs +++ b/scylla-server-rust/tests/system_service_test.rs @@ -1,8 +1,10 @@ use prisma_client_rust::QueryError; use scylla_server_rust::{ prisma, - services::{run_service, system_service}, - transformers::system_transformer::PublicSystem, + services::{ + run_service, + system_service::{self, public_system}, + }, }; use test_utils::cleanup_and_prepare; @@ -15,22 +17,18 @@ const TEST_KEYWORD: &str = "test"; async fn test_upsert_system_create() -> Result<(), QueryError> { let db = cleanup_and_prepare().await?; - let res_c = system_service::upsert_system( - &db, - TEST_KEYWORD.to_owned(), - run_service::create_run(&db, 101).await?.id, - ) - .await?; + let run = run_service::create_run(&db, 101).await?; + + let _ = system_service::upsert_system(&db, TEST_KEYWORD.to_owned(), run.id).await?; - let res = db + let _ = db .system() .find_unique(prisma::system::name::equals(TEST_KEYWORD.to_owned())) + .select(public_system::select()) .exec() .await? .expect("System should exist, was just upserted!"); - assert_eq!(PublicSystem::from(&res_c), PublicSystem::from(&res)); - Ok(()) } diff --git a/siren-base/compose.siren.yml b/siren-base/compose.siren.yml new file mode 100644 index 00000000..3fde4fda --- /dev/null +++ b/siren-base/compose.siren.yml @@ -0,0 +1,15 @@ +services: + siren: + container_name: siren + restart: unless-stopped + image: eclipse-mosquitto:latest + ports: + - 1883:1883 + - 9001:9001 # why? + expose: + - 1883 + volumes: + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + cpu_shares: 2048 + mem_limit: 2gb + oom_kill_disable: true diff --git a/siren-base/mosquitto/mosquitto.conf b/siren-base/mosquitto/mosquitto.conf index f4cee72f..e281ff79 100755 --- a/siren-base/mosquitto/mosquitto.conf +++ b/siren-base/mosquitto/mosquitto.conf @@ -109,21 +109,27 @@ autosave_interval 30 #autosave_on_changes false -#persistence false +# *** diff from tpu +persistence true #persistence_file mosquitto.db -#persistence_location +# *** diff from tpu +persistence_location /mosquitto/data # ================================================================= # Logging # ================================================================= # *** diff from tpu (for docker) -log_dest file /mosquitto/log/mosquitto.log - +log_dest stdout +log_type error +log_type warning log_type notice +log_type information +log_type subscribe +log_type unsubscribe #log_type information connection_messages true