From 61cac476c4f4f17c2ee128d6f1017d7fec93ae68 Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Date: Fri, 4 Aug 2023 08:43:26 -0500 Subject: [PATCH] externalized configuration --- channel-sender/Dockerfile | 10 +- channel-sender/README.md | 4 +- channel-sender/config/config-local.yaml | 30 ++++ channel-sender/config/config.exs | 3 - channel-sender/config/dev.exs | 11 +- channel-sender/config/dev1.exs | 12 +- channel-sender/config/dev2.exs | 11 +- channel-sender/config/prod.exs | 8 +- channel-sender/config/runtime.exs | 24 --- channel-sender/container_utils/runner.sh | 12 ++ channel-sender/deploy_samples/k8s/README.md | 2 +- .../k8s/libcluster-kubernetes-dns/README.md | 69 ++++---- .../k8s/libcluster-kubernetes-dns/app.yaml | 8 +- .../libcluster-kubernetes-dns/configmap.yaml | 46 ++--- .../libcluster-kubernetes-dnssrv/README.md | 69 ++++---- .../k8s/libcluster-kubernetes-dnssrv/app.yaml | 8 +- .../configmap.yaml | 46 ++--- .../k8s/libcluster-kubernetes/README.md | 74 ++++---- .../k8s/libcluster-kubernetes/app.yaml | 8 +- .../k8s/libcluster-kubernetes/configmap.yaml | 52 +++--- .../lib/channel_sender_ex/application.ex | 37 ++-- .../channel_sender_ex/application_config.ex | 161 ++++++++++++++++++ .../lib/channel_sender_ex/core/channel.ex | 27 +-- .../core/channel_id_generator.ex | 8 +- .../core/pubsub/re_connect_process.ex | 9 +- .../core/pubsub/socket_event_bus.ex | 4 +- channel-sender/mix.exs | 5 +- channel-sender/mix.lock | 8 +- channel-sender/rel/env.sh.eex | 3 + .../core/channel_id_generator_test.exs | 10 ++ .../core/channel_integration_test.exs | 26 ++- .../channel_sender_ex/core/channel_test.exs | 13 ++ .../transport/rest/rest_controller_test.exs | 24 ++- .../transport/socket_integration_test.exs | 27 ++- 34 files changed, 560 insertions(+), 309 deletions(-) create mode 100644 channel-sender/config/config-local.yaml delete mode 100644 channel-sender/config/runtime.exs create mode 100644 channel-sender/container_utils/runner.sh create mode 100644 channel-sender/lib/channel_sender_ex/application_config.ex diff --git a/channel-sender/Dockerfile b/channel-sender/Dockerfile index 4ab47cd..f48d3ac 100644 --- a/channel-sender/Dockerfile +++ b/channel-sender/Dockerfile @@ -22,6 +22,7 @@ RUN export MIX_ENV=${BUILD_ENV} && \ # Extract Release archive to /rel for copying in next stage RUN RELEASE_FILE=`ls -d _build/${BUILD_ENV}/*.gz` && \ mkdir /export && \ + cp container_utils/runner.sh /export && \ tar -xf "${RELEASE_FILE}" -C /export #=================== @@ -41,11 +42,14 @@ RUN apk upgrade --no-cache && \ RUN addgroup -S adfuser && adduser -S adfuser -G adfuser -WORKDIR /home/adfuser +WORKDIR /app COPY --from=build --chown=adfuser:adfuser /export/ . USER adfuser -ENTRYPOINT ["/home/adfuser/bin/channel_sender_ex"] -CMD ["start"] +VOLUME /app/config + +ENTRYPOINT ["/bin/bash"] +CMD ["/app/runner.sh"] + diff --git a/channel-sender/README.md b/channel-sender/README.md index 3855ada..3e39c3b 100644 --- a/channel-sender/README.md +++ b/channel-sender/README.md @@ -29,7 +29,7 @@ mix compile ### Configuration -Open and edit the config/dev.exs file to configure, you can create and set new environment files. +Open and edit the `config.yaml` file to set up configurations. | **Parameters** | Description | Default Value | | -------------------------------- | -------------------------------------- | ------------------ | | `socket_port` | Port to atend Web Sockets requests | 8082 | @@ -63,7 +63,7 @@ $ MIX_ENV= iex --erl "-name async-node1@127.0.0.1" -S mix ADF Sender incorporate `libcluster` dependency in order to facilitate the automatic configuration of erlang clusters in kubernetes. -In folder `deploy_samples\k8s` we have included manifests to deploy ADF sender on kubernetes (and also if istio is present). +In folder [deploy_samples\k8s](./deploy_samples/k8s/README.md) we have included manifests to deploy ADF sender on kubernetes (and also if istio is present), using 3 of the strategies supported by `libcluster`. ## Clients diff --git a/channel-sender/config/config-local.yaml b/channel-sender/config/config-local.yaml new file mode 100644 index 0000000..db73859 --- /dev/null +++ b/channel-sender/config/config-local.yaml @@ -0,0 +1,30 @@ +channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes + config: + mode: :hostname + kubernetes_ip_lookup_mode: :pods + kubernetes_service_name: "adfsender-headless" + kubernetes_node_basename: "channel_sender_ex" + kubernetes_selector: "cluster=beam" + namespace: "sendernm" + polling_interval: 5000 + +logger: + level: debug + + + diff --git a/channel-sender/config/config.exs b/channel-sender/config/config.exs index 5b2cce8..96e372d 100644 --- a/channel-sender/config/config.exs +++ b/channel-sender/config/config.exs @@ -1,11 +1,8 @@ import Config config :channel_sender_ex, - channel_supervisor_module: Horde.DynamicSupervisor, - registry_module: Horde.Registry, app_repo: ChannelSenderEx.Repository.ApplicationRepo, channel_shutdown_tolerance: 10_000, - no_start: false, min_disconnection_tolerance: 50, socket_event_bus: ChannelSenderEx.Core.PubSub.SocketEventBus diff --git a/channel-sender/config/dev.exs b/channel-sender/config/dev.exs index 52fd6bc..eeb08b9 100644 --- a/channel-sender/config/dev.exs +++ b/channel-sender/config/dev.exs @@ -1,13 +1,4 @@ import Config config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900, - topology: [ - strategy: Cluster.Strategy.Gossip - ] + config_file: "./config/config-local.yaml" diff --git a/channel-sender/config/dev1.exs b/channel-sender/config/dev1.exs index 1f29377..c9463eb 100644 --- a/channel-sender/config/dev1.exs +++ b/channel-sender/config/dev1.exs @@ -1,13 +1,5 @@ import Config + config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8092, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8091, - max_age: 900, - topology: [ - strategy: Cluster.Strategy.Gossip - ] + config_file: "./config/config-local1.yaml" diff --git a/channel-sender/config/dev2.exs b/channel-sender/config/dev2.exs index a8badad..361b742 100644 --- a/channel-sender/config/dev2.exs +++ b/channel-sender/config/dev2.exs @@ -1,13 +1,4 @@ import Config config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8072, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8071, - max_age: 900, - topology: [ - strategy: Cluster.Strategy.Gossip - ] + config_file: "./config/config-local2.yaml" diff --git a/channel-sender/config/prod.exs b/channel-sender/config/prod.exs index 3495be5..0a1927d 100644 --- a/channel-sender/config/prod.exs +++ b/channel-sender/config/prod.exs @@ -1,10 +1,4 @@ import Config config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900 + config_file: "/app/config/config.yaml" diff --git a/channel-sender/config/runtime.exs b/channel-sender/config/runtime.exs deleted file mode 100644 index 46797a7..0000000 --- a/channel-sender/config/runtime.exs +++ /dev/null @@ -1,24 +0,0 @@ -import Config - -if (config_env() == :prod) do - config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900, - topology: [ - strategy: Cluster.Strategy.Kubernetes, - config: [ - mode: :hostname, - kubernetes_ip_lookup_mode: :pods, - kubernetes_service_name: "adfsender-headless", - kubernetes_node_basename: "channel_sender_ex", - kubernetes_selector: "cluster=beam", - namespace: "sendernm", - polling_interval: 5000 - ] - ] -end diff --git a/channel-sender/container_utils/runner.sh b/channel-sender/container_utils/runner.sh new file mode 100644 index 0000000..7a93a25 --- /dev/null +++ b/channel-sender/container_utils/runner.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +trap "echo; exit" INT + +# load any needed env vars +File=/app/config/env.sh +if test -f "$File"; then + . $File +fi + +# run release +/app/bin/channel_sender_ex start diff --git a/channel-sender/deploy_samples/k8s/README.md b/channel-sender/deploy_samples/k8s/README.md index e60c707..68758e6 100644 --- a/channel-sender/deploy_samples/k8s/README.md +++ b/channel-sender/deploy_samples/k8s/README.md @@ -10,7 +10,7 @@ The demo templates rely on [libcluster](https://hexdocs.pm/libcluster/readme.htm These templates also asume istio is installed and enforcing mtls cluster-wide. -## Folders +## Strategies a. [libcluster-kubernetes](./libcluster-kubernetes/README.md): Templates for deploying sender using `libcluster` strategy `Cluster.Strategy.Kubernetes`. diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/README.md b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/README.md index eba1ca6..c7cb881 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/README.md +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/README.md @@ -110,34 +110,43 @@ Building an Erlang Cluster with `libcluster` strategy named: [`Cluster.Strategy ### 2.1. ADF Sender configuration -a. **Topology config** - -You can set topology related configuration in `config\runtime.exs`: - -```elixir -import Config - -config :logger, level: :info - -config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900, - topology: [ - strategy: Elixir.Cluster.Strategy.Kubernetes.DNS, - config: [ - service: "adfsender-headless" - application_name: "channel_sender_ex" - namespace: "sendernm" - polling_interval: 5000 - ] - ] +a. **Basic Sender Configuration** + +You can provide all sender configuration via a yaml file. + +For containers using prod release, path should be: `/app/config/config.yaml` for mounting the file. + +config.yaml: +```yaml +channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + # --- libcluster related config --- + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes + config: + service: "adfsender-headless" + application_name: "channel_sender_ex" + namespace: "sendernm" + polling_interval: 5000 + # --- end libcluster configuration --- + +logger: + level: debug ``` -where: + +Note the specifics in the libcluster configuration: - `service` it's the headless service name defined in `app.yaml`. - `application_name` it's the elixir release name. See `mix.exs`. Default release name is `channel_sender_ex`. @@ -145,11 +154,11 @@ where: b. **Define related env vars for release** -via `rel/env.sh.eex` file: +You must mount a file in the following path `/app/config/env.sh`, performing any env configuration: -```elixir +```bash export RELEASE_DISTRIBUTION=name -export RELEASE_NODE=<%= @release.name %>@${POD_IP} +export RELEASE_NODE=channel_sender_ex@${POD_IP} ``` check env var POD_IP being injected in `app.yaml`. diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/app.yaml b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/app.yaml index 020b9a9..c2b530c 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/app.yaml +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/app.yaml @@ -61,7 +61,7 @@ spec: spec: containers: - name: adfsender - image: bancolombia/async-dataflow-channel-sender:0.1.4 + image: bancolombia/async-dataflow-channel-sender:0.1.5 env: - name: POD_IP valueFrom: @@ -90,11 +90,11 @@ spec: memory: 250M volumeMounts: - name: config-volume - mountPath: /home/adfuser/releases/0.1.4/env.sh + mountPath: /app/config/env.sh subPath: env.sh - name: config-volume - mountPath: /home/adfuser/releases/0.1.4/runtime.exs - subPath: runtime.exs + mountPath: /app/config/config.yaml + subPath: config.yaml volumes: - name: config-volume configMap: diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/configmap.yaml b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/configmap.yaml index 89d5527..a8d9fc5 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/configmap.yaml +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dns/configmap.yaml @@ -8,26 +8,28 @@ data: #!/bin/sh export RELEASE_DISTRIBUTION=name export RELEASE_NODE=channel_sender_ex@${POD_IP} - runtime.exs: |- - import Config - - config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8091, - max_age: 900, - # libcluster topology for this demostration. - # To see other topologies supported by libcluster - # see library documentation. - topology: [ - strategy: Cluster.Strategy.Kubernetes.DNS, - config: [ - service: "adfsender-headless", - application_name: "channel_sender_ex", - namespace: "sendernm", + config.yaml: |- + channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes.DNS + config: + service: "adfsender-headless" + application_name: "channel_sender_ex" + namespace: "sendernm" polling_interval: 5000 - ] - ] \ No newline at end of file + + logger: + level: debug diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/README.md b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/README.md index 03d07fb..fc8da12 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/README.md +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/README.md @@ -112,34 +112,43 @@ Building an Erlang Cluster with `libcluster` strategy named: [`Cluster.Strategy ### 2.1. ADF Sender configuration -a. **Topology config** - -You can set topology related configuration in `config\runtime.exs`: - -```elixir -import Config - -config :logger, level: :info - -config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900, - topology: [ - strategy: Elixir.Cluster.Strategy.Kubernetes.DNSSRV, - config: [ - service: "adfsender-headless" - application_name: "channel_sender_ex" - namespace: "sendernm" - polling_interval: 5000 - ] - ] +a. **Basic Sender Configuration** + +You can provide all sender configuration via a yaml file. + +For containers using prod release, path should be: `/app/config/config.yaml` for mounting the file. + +config.yaml: +```yaml +channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + # --- libcluster related config --- + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes + config: + service: "adfsender-headless" + application_name: "channel_sender_ex" + namespace: "sendernm" + polling_interval: 5000 + # --- end libcluster configuration --- + +logger: + level: debug ``` -where: + +Note the specifics in the libcluster configuration: - `service` it's the headless service name defined in `app.yaml`. - `application_name` it's the elixir release name. See `mix.exs`. Default release name is `channel_sender_ex`. @@ -147,11 +156,11 @@ where: b. **Define related env vars for release** -via `rel/env.sh.eex` file: +You must mount a file in the following path `/app/config/env.sh`, performing any env configuration: -```elixir +```bash export RELEASE_DISTRIBUTION=name -export RELEASE_NODE=<%= @release.name %>@(hostname -f) +export RELEASE_NODE=channel_sender_ex@(hostname -f) ``` This pair of files can be mounted as a volume and passed to the container. See `configmap.yaml` and volume mount definition in `app.yaml`. diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/app.yaml b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/app.yaml index 54131ee..d291b43 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/app.yaml +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/app.yaml @@ -61,7 +61,7 @@ spec: spec: containers: - name: adfsender - image: bancolombia/async-dataflow-channel-sender:0.1.4 + image: bancolombia/async-dataflow-channel-sender:0.1.5 env: - name: ERLANG_COOKIE value: "secret" @@ -86,11 +86,11 @@ spec: memory: 250M volumeMounts: - name: config-volume - mountPath: /home/adfuser/releases/0.1.4/env.sh + mountPath: /app/config/env.sh subPath: env.sh - name: config-volume - mountPath: /home/adfuser/releases/0.1.4/runtime.exs - subPath: runtime.exs + mountPath: /app/config/config.yaml + subPath: config.yaml volumes: - name: config-volume configMap: diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/configmap.yaml b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/configmap.yaml index 514e5c6..f1922de 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/configmap.yaml +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes-dnssrv/configmap.yaml @@ -8,26 +8,28 @@ data: #!/bin/sh export RELEASE_DISTRIBUTION=name export RELEASE_NODE=channel_sender_ex@$(hostname -f) - runtime.exs: |- - import Config - - config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8091, - max_age: 900, - # libcluster topology for this demostration. - # To see other topologies supported by libcluster - # see library documentation. - topology: [ - strategy: Cluster.Strategy.Kubernetes.DNSSRV, - config: [ - service: "adfsender-headless", - application_name: "channel_sender_ex", - namespace: "sendernm", + config.yaml: |- + channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes.DNS + config: + service: "adfsender-headless" + application_name: "channel_sender_ex" + namespace: "sendernm" polling_interval: 5000 - ] - ] \ No newline at end of file + + logger: + level: debug \ No newline at end of file diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes/README.md b/channel-sender/deploy_samples/k8s/libcluster-kubernetes/README.md index b4a04a5..a2ec937 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes/README.md +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes/README.md @@ -112,37 +112,45 @@ Building an Erlang Cluster with `libcluster` strategy named: [`Cluster.Strategy ### 2.1. ADF Sender configuration -a. **Topology config** - -You can set topology related configuration in `config\runtime.exs`: - -```elixir -import Config - -config :logger, level: :info - -config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900, - topology: [ - strategy: Elixir.Cluster.Strategy.Kubernetes, - config: [ - mode: :hostname - kubernetes_ip_lookup_mode: :pods - kubernetes_service_name: "adfsender-headless" - kubernetes_node_basename: "channel_sender_ex" - kubernetes_selector: "cluster=beam" - namespace: "sendernm" - polling_interval: 5000 - ] - ] +a. **Basic Sender Configuration** + +You can provide all sender configuration via a yaml file. + +For containers using prod release, path should be: `/app/config/config.yaml` for mounting the file. + +config.yaml: +```yaml +channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + # --- libcluster related config --- + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes + config: + mode: :hostname + kubernetes_ip_lookup_mode: :pods + kubernetes_service_name: "adfsender-headless" + kubernetes_node_basename: "channel_sender_ex" + kubernetes_selector: "cluster=beam" + namespace: "sendernm" + polling_interval: 5000 + # --- end libcluster configuration --- +logger: + level: debug ``` -where: + +Note the specifics in the libcluster configuration: - `kubernetes_service_name` it's the name of the headless-service defined in app.yaml. - `kubernetes_node_basename` it's the elixir release name. See `mix.exs`. Default release name is `channel_sender_ex`. @@ -151,11 +159,11 @@ where: b. **Define related env vars for release** -via `rel/env.sh.eex` file: +You must mount a file in the following path `/app/config/env.sh`, performing any env configuration: -```elixir +```bash export RELEASE_DISTRIBUTION=name -export RELEASE_NODE=<%= @release.name %>@${POD_NAME}.${POD_NAME_DNS} +export RELEASE_NODE=channel_sender_ex@${POD_NAME}.${POD_NAME_DNS} ``` check env vars POD_NAME and POD_NAME_DNS in app.yaml. diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes/app.yaml b/channel-sender/deploy_samples/k8s/libcluster-kubernetes/app.yaml index 61058ef..136ba7f 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes/app.yaml +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes/app.yaml @@ -59,7 +59,7 @@ spec: spec: containers: - name: adfsender - image: bancolombia/async-dataflow-channel-sender:0.1.4 + image: bancolombia/async-dataflow-channel-sender:0.1.5 env: - name: POD_NAME valueFrom: @@ -88,11 +88,11 @@ spec: memory: 250M volumeMounts: - name: config-volume - mountPath: /home/adfuser/releases/0.1.4/env.sh + mountPath: /app/config/env.sh subPath: env.sh - name: config-volume - mountPath: /home/adfuser/releases/0.1.4/runtime.exs - subPath: runtime.exs + mountPath: /app/config/config.yaml + subPath: config.yaml volumes: - name: config-volume configMap: diff --git a/channel-sender/deploy_samples/k8s/libcluster-kubernetes/configmap.yaml b/channel-sender/deploy_samples/k8s/libcluster-kubernetes/configmap.yaml index 649d7cb..5a9d260 100644 --- a/channel-sender/deploy_samples/k8s/libcluster-kubernetes/configmap.yaml +++ b/channel-sender/deploy_samples/k8s/libcluster-kubernetes/configmap.yaml @@ -8,31 +8,33 @@ data: #!/bin/sh export RELEASE_DISTRIBUTION=name export RELEASE_NODE=channel_sender_ex@${POD_NAME}.${POD_NAME_DNS} - runtime.exs: |- - import Config - - config :channel_sender_ex, - secret_base: - {"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", "socket auth"}, - socket_port: 8082, - initial_redelivery_time: 900, - socket_idle_timeout: 30000, - rest_port: 8081, - max_age: 900, - # libcluster topology for this demostration. - # To see other topologies supported by libcluster - # see library documentation. - topology: [ - strategy: Cluster.Strategy.Kubernetes, - config: [ - mode: :hostname, - kubernetes_ip_lookup_mode: :pods, - kubernetes_service_name: "adfsender-headless", - kubernetes_node_basename: "channel_sender_ex", - kubernetes_selector: "cluster=beam", - namespace: "sendernm", + config.yaml: |- + channel_sender_ex: + rest_port: 8081 + socket_port: 8082 + secret_generator: + base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" + salt: "socket auth" + max_age: 900 + initial_redelivery_time: 900 + socket_idle_timeout: 30000 + channel_shutdown_tolerance: 10000 + min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + accept_channel_reply_timeout: 1000 + no_start: false + topology: + strategy: Elixir.Cluster.Strategy.Kubernetes + config: + mode: :hostname + kubernetes_ip_lookup_mode: :pods + kubernetes_service_name: "adfsender-headless" + kubernetes_node_basename: "channel_sender_ex" + kubernetes_selector: "cluster=beam" + namespace: "sendernm" polling_interval: 5000 - ] - ] + + logger: + level: debug diff --git a/channel-sender/lib/channel_sender_ex/application.ex b/channel-sender/lib/channel_sender_ex/application.ex index c053416..bfe7d9b 100644 --- a/channel-sender/lib/channel_sender_ex/application.ex +++ b/channel-sender/lib/channel_sender_ex/application.ex @@ -4,35 +4,44 @@ defmodule ChannelSenderEx.Application do @moduledoc false alias ChannelSenderEx.Transport.Rest.RestController alias ChannelSenderEx.Transport.EntryPoint + alias ChannelSenderEx.ApplicationConfig + use Application require Logger - @no_start Application.get_env(:channel_sender_ex, :no_start) - @http_port Application.get_env(:channel_sender_ex, :rest_port, 8080) def start(_type, _args) do + + _config = ApplicationConfig.load() + ChannelSenderEx.Utils.ClusterUtils.discover_and_connect_local() ChannelSenderEx.Core.RulesProvider.Helper.compile(:channel_sender_ex) - if !@no_start do + no_start_param = Application.get_env(:channel_sender_ex, :no_start) + if !no_start_param do EntryPoint.start() end opts = [strategy: :one_for_one, name: ChannelSenderEx.Supervisor] - Supervisor.start_link(children(@no_start), opts) + Supervisor.start_link(children(no_start_param), opts) end - defp children(_no_start = false) do - [ - {Cluster.Supervisor, [topologies(), [name: ChannelSenderEx.ClusterSupervisor]]}, - ChannelSenderEx.Core.ChannelRegistry, - ChannelSenderEx.Core.ChannelSupervisor, - ChannelSenderEx.Core.NodeObserver, - {Plug.Cowboy, scheme: :http, plug: RestController, options: [port: @http_port]} - ] + defp children(no_start_param) do + case no_start_param do + false -> + [ + {Cluster.Supervisor, [topologies(), [name: ChannelSenderEx.ClusterSupervisor]]}, + ChannelSenderEx.Core.ChannelRegistry, + ChannelSenderEx.Core.ChannelSupervisor, + ChannelSenderEx.Core.NodeObserver, + {Plug.Cowboy, scheme: :http, plug: RestController, options: [ + port: Application.get_env(:channel_sender_ex, :rest_port) + ]} + ] + true -> + [] + end end - defp children(_no_start = true), do: [] - defp topologies do topology = [ k8s: Application.get_env(:channel_sender_ex, :topology) diff --git a/channel-sender/lib/channel_sender_ex/application_config.ex b/channel-sender/lib/channel_sender_ex/application_config.ex new file mode 100644 index 0000000..a4a63a8 --- /dev/null +++ b/channel-sender/lib/channel_sender_ex/application_config.ex @@ -0,0 +1,161 @@ +defmodule ChannelSenderEx.ApplicationConfig do + @moduledoc false + + alias Vapor.Provider.File + + require Logger + + def load() do + config_file = Application.get_env(:channel_sender_ex, :config_file) + Logger.info("Loading configuration from #{inspect(config_file)}") + + # Vapor + providers = [ + %File{ + path: config_file, + bindings: [ + channel_sender_ex: "channel_sender_ex", + logger: "logger" + ] + } + ] + + try do + case Vapor.load(providers) do + {:error, err} -> + Logger.error("Error loading configuration, #{inspect(err)}") + setup_config(%{}) + {:ok, config} -> + setup_config(config) + end + rescue + e in Vapor.FileNotFoundError -> + Logger.error("Error loading configuration, #{inspect(e)}") + setup_config(%{}) + end + + + end + + def setup_config(config) do + + Logger.configure(level: String.to_existing_atom( + Map.get(fetch(config, :logger), "level", "info") + )) + + Application.put_env(:channel_sender_ex, :no_start, + Map.get(fetch(config, :channel_sender_ex), "no_start", false) + ) + + Application.put_env(:channel_sender_ex, :channel_shutdown_tolerance, + Map.get(fetch(config, :channel_sender_ex), "channel_shutdown_tolerance", 10_000) + ) + + Application.put_env(:channel_sender_ex, :min_disconnection_tolerance, + Map.get(fetch(config, :channel_sender_ex), "min_disconnection_tolerance", 50) + ) + + Application.put_env(:channel_sender_ex, :on_connected_channel_reply_timeout, + Map.get(fetch(config, :channel_sender_ex), "on_connected_channel_reply_timeout", 2000) + ) + + Application.put_env(:channel_sender_ex, :accept_channel_reply_timeout, + Map.get(fetch(config, :channel_sender_ex), "accept_channel_reply_timeout", 1000) + ) + + Application.put_env(:channel_sender_ex, :secret_base, + { + Map.get(fetch(config, :channel_sender_ex, "secret_generator"), "base", + "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc"), + Map.get(fetch(config, :channel_sender_ex, "secret_generator"), "salt", "10293846571") + } + ) + + Application.put_env(:channel_sender_ex, :max_age, + Map.get(fetch(config, :channel_sender_ex, "secret_generator"), "max_age", 900) + ) + + Application.put_env(:channel_sender_ex, :socket_port, + Map.get(fetch(config, :channel_sender_ex), "socket_port", 8082) + ) + + Application.put_env(:channel_sender_ex, :rest_port, + Map.get(fetch(config, :channel_sender_ex), "rest_port", 8081) + ) + + Application.put_env(:channel_sender_ex, :initial_redelivery_time, + Map.get(fetch(config, :channel_sender_ex), "initial_redelivery_time", 900) + ) + + Application.put_env(:channel_sender_ex, :socket_idle_timeout, + Map.get(fetch(config, :channel_sender_ex), "socket_idle_timeout", 30_000) + ) + + Application.put_env(:channel_sender_ex, :topology, parse_libcluster_topology(config)) + + if (config == %{}) do + Logger.warn("No valid configuration found!!!, Loading pre-defined default values : #{inspect(Application.get_all_env(:channel_sender_ex))}") + else + Logger.info("Succesfully loaded configuration: #{inspect(inspect(Application.get_all_env(:channel_sender_ex)))}") + end + + config + end + + defp parse_libcluster_topology(config) do + topology = get_in(config, [:channel_sender_ex, "topology"]) + case topology do + nil -> + Logger.warn("No libcluster topology defined!!! -> Using Default [Gossip]") + [ strategy: Cluster.Strategy.Gossip ] + _ -> + [ + strategy: String.to_existing_atom(topology["strategy"]), + config: parse_config_key(topology["config"]) + ] + end + end + + defp parse_config_key(cfg) do + case cfg do + nil -> + [] + _ -> + Enum.map(cfg, fn({key, value}) -> + {String.to_atom(key), process_param(value)} + end) + end + end + + defp process_param(param) when is_integer(param) do + param + end + + defp process_param(param) when is_binary(param) do + case String.starts_with?(param, ":") do + true -> + String.to_atom(String.replace_leading(param, ":", "")) + false -> + param + end + end + + defp fetch(config, base) do + case get_in(config, [base]) do + nil -> + %{} + data -> + data + end + end + + defp fetch(config, base, key) do + case get_in(config, [base, key]) do + nil -> + %{} + data -> + data + end + end + +end diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex index 27516cc..a406e28 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel.ex @@ -7,21 +7,7 @@ defmodule ChannelSenderEx.Core.Channel do alias ChannelSenderEx.Core.ProtocolMessage alias ChannelSenderEx.Core.RulesProvider - # Max allowed time in waiting before terminate the channel - # @waiting_timeout Application.get_env(:channel_sender_ex, :channel_waiting_timeout, 30) - # @message_time_to_live Application.get_env(:channel_sender_ex, :message_time_to_live, 8000) - @token_max_age :max_age - @min_disconnection_tolerance :min_disconnection_tolerance - @on_connected_channel_reply_timeout Application.get_env( - :channel_sender_ex, - :on_connected_channel_reply_timeout, - 2000 - ) - @accept_channel_reply_timeout Application.get_env( - :channel_sender_ex, - :accept_channel_reply_timeout, - 1000 - ) + @on_connected_channel_reply_timeout 2000 @type delivery_ref() :: {pid(), reference()} @type output_message() :: {delivery_ref(), ProtocolMessage.t()} @@ -62,7 +48,10 @@ defmodule ChannelSenderEx.Core.Channel do @type deliver_response :: :accepted_waiting | :accepted_connected @spec deliver_message(:gen_statem.server_ref(), ProtocolMessage.t()) :: deliver_response() def deliver_message(server, message) do - GenStateMachine.call(server, {:deliver_message, message}, @accept_channel_reply_timeout) + GenStateMachine.call(server, {:deliver_message, message}, Application.get_env( + :channel_sender_ex, + :accept_channel_reply_timeout + )) end @spec start_link(any()) :: :gen_statem.start_ret() @@ -90,7 +79,7 @@ defmodule ChannelSenderEx.Core.Channel do ### WAITING STATE #### ### waiting state callbacks definitions #### def waiting(:enter, _old_state, data) do - waiting_timeout = round(RulesProvider.get(@token_max_age) * 1000) + waiting_timeout = round(RulesProvider.get(:max_age) * 1000) {:keep_state, data, [{:state_timeout, waiting_timeout, :waiting_timeout}]} end @@ -325,8 +314,8 @@ defmodule ChannelSenderEx.Core.Channel do @spec calculate_refresh_token_timeout() :: integer() @compile {:inline, calculate_refresh_token_timeout: 0} defp calculate_refresh_token_timeout() do - token_validity = RulesProvider.get(@token_max_age) - tolerance = RulesProvider.get(@min_disconnection_tolerance) + token_validity = RulesProvider.get(:max_age) + tolerance = RulesProvider.get(:min_disconnection_tolerance) min_timeout = token_validity / 2 round(max(min_timeout, token_validity - tolerance) * 1000) end diff --git a/channel-sender/lib/channel_sender_ex/core/channel_id_generator.ex b/channel-sender/lib/channel_sender_ex/core/channel_id_generator.ex index 3722e7a..dd3552f 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel_id_generator.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel_id_generator.ex @@ -7,10 +7,6 @@ defmodule ChannelSenderEx.Core.ChannelIDGenerator do import Plug.Crypto, only: [verify: 4, sign: 3] alias ChannelSenderEx.Core.RulesProvider - @application_name :channel_sender_ex - @secret_key :secret_base - @max_age :max_age - @type application() :: String.t() @type user_ref() :: String.t() @type channel_ref() :: String.t() @@ -51,11 +47,11 @@ defmodule ChannelSenderEx.Core.ChannelIDGenerator do end defp get_secret_and_salt!() do - case get_env(@application_name, @secret_key) do + case get_env(:channel_sender_ex, :secret_base) do data = {_secret, _salt} -> data other -> raise "Secret base no properly configured for application: #{other}" end end - defp max_age(), do: RulesProvider.get(@max_age) + defp max_age(), do: RulesProvider.get(:max_age) end diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex index 364657f..9d06b46 100644 --- a/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex +++ b/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex @@ -29,7 +29,10 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcess do def connect_socket_to_channel(channel_ref, socket_pid) do case ChannelRegistry.lookup_channel_addr(channel_ref) do :noproc -> :noproc - pid -> Channel.socket_connected(pid, socket_pid) + pid -> + timeout = Application.get_env(:channel_sender_ex, + :on_connected_channel_reply_timeout) + Channel.socket_connected(pid, socket_pid, timeout) end catch _type, _err -> :noproc @@ -37,7 +40,3 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcess do end - - - - diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex index ffc50c8..b6be6f9 100644 --- a/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex +++ b/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex @@ -16,7 +16,9 @@ defmodule ChannelSenderEx.Core.PubSub.SocketEventBus do def connect_channel(channel, socket_pid, count) do case ChannelRegistry.lookup_channel_addr(channel) do pid when is_pid(pid) -> - :ok = Channel.socket_connected(pid, socket_pid) + timeout = Application.get_env(:channel_sender_ex, + :on_connected_channel_reply_timeout) + :ok = Channel.socket_connected(pid, socket_pid, timeout) pid :noproc -> Process.sleep(350) diff --git a/channel-sender/mix.exs b/channel-sender/mix.exs index bc1c19f..5d77784 100644 --- a/channel-sender/mix.exs +++ b/channel-sender/mix.exs @@ -4,7 +4,7 @@ defmodule ChannelSenderEx.MixProject do def project do [ app: :channel_sender_ex, - version: "0.1.4", + version: "0.1.5", elixir: "~> 1.12", start_permanent: Mix.env() == :prod, deps: deps(), @@ -44,7 +44,8 @@ defmodule ChannelSenderEx.MixProject do {:plug_crypto, "~> 1.2"}, {:stream_data, "~> 0.4", only: [:test]}, {:gun, "~> 1.3", only: [:test, :benchee]}, - {:libcluster, "~> 3.3"} + {:libcluster, "~> 3.3"}, + {:vapor, "~> 0.10.0"} # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} ] end diff --git a/channel-sender/mix.lock b/channel-sender/mix.lock index 440a17b..2af008f 100644 --- a/channel-sender/mix.lock +++ b/channel-sender/mix.lock @@ -13,11 +13,12 @@ "hackney": {:hex, :hackney, "1.2.0", "41d352d5c272a150127517ab29e00cf5521a04af925f260667bf55161fbee68f", [:rebar], [{:idna, "~> 1.0.2", [hex: :idna, repo: "hexpm", optional: false]}, {:ssl_verify_hostname, "~> 1.0.5", [hex: :ssl_verify_hostname, repo: "hexpm", optional: false]}], "hexpm", "c7851a8a2e07ad18f9d92f8bce93ea96af0617fb873961bbd3993073bf27da4b"}, "horde": {:hex, :horde, "0.8.3", "82c5276109edf5e76c802aa51b5196a1624d57cfb92b64e4bcab1e95f9aab45f", [:mix], [{:delta_crdt, "~> 0.5.10", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "cd9d3079711bc18dceb0bc3b207868df800fed59e29dea05aa3c1874496cc1d2"}, "idna": {:hex, :idna, "1.0.3", "d456a8761cad91c97e9788c27002eb3b773adaf5c893275fc35ba4e3434bbd9b", [:rebar3], [], "hexpm", "357d489a51112db4f216034406834f9172b3c0ff5a12f83fb28b25ca271541d1"}, - "jason": {:hex, :jason, "1.2.1", "12b22825e22f468c02eb3e4b9985f3d0cb8dc40b9bd704730efa11abd2708c44", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b659b8571deedf60f79c5a608e15414085fa141344e2716fbd6988a084b5f993"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"}, "libring": {:hex, :libring, "1.5.0", "44313eb6862f5c9168594a061e9d5f556a9819da7c6444706a9e2da533396d70", [:mix], [], "hexpm", "04e843d4fdcff49a62d8e03778d17c6cb2a03fe2d14020d3825a1761b55bd6cc"}, "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, "mime": {:hex, :mime, "1.4.0", "5066f14944b470286146047d2f73518cf5cca82f8e4815cf35d196b58cf07c47", [:mix], [], "hexpm", "75fa42c4228ea9a23f70f123c74ba7cece6a03b1fd474fe13f6a7a85c6ea4ff6"}, + "norm": {:hex, :norm, "0.13.0", "2c562113f3205e3f195ee288d3bd1ab903743e7e9f3282562c56c61c4d95dec4", [:mix], [{:stream_data, "~> 0.5", [hex: :stream_data, repo: "hexpm", optional: true]}], "hexpm", "447cc96dd2d0e19dcb37c84b5fc0d6842aad69386e846af048046f95561d46d7"}, "phoenix": {:hex, :phoenix, "1.5.5", "9a5a197edc1828c5f138a8ef10524dfecc43e36ab435c14578b1e9b4bd98858c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b10eaf86ad026eafad2ee3dd336f0fb1c95a3711789855d913244e270bde463b"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"}, "plug": {:hex, :plug, "1.10.4", "41eba7d1a2d671faaf531fa867645bd5a3dce0957d8e2a3f398ccff7d2ef017f", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ad1e233fe73d2eec56616568d260777b67f53148a999dc2d048f4eb9778fe4a0"}, @@ -30,5 +31,8 @@ "stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"}, "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"}, "telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"}, - "toml": {:hex, :toml, "0.5.2", "e471388a8726d1ce51a6b32f864b8228a1eb8edc907a0edf2bb50eab9321b526", [:mix], [], "hexpm", "f1e3dabef71fb510d015fad18c0e05e7c57281001141504c6b69d94e99750a07"}, + "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, + "vapor": {:hex, :vapor, "0.10.0", "547a94b381093dea61a4ca2200109385b6e44b86d72d1ebf93e5ac1a8873bc3c", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:norm, "~> 0.9", [hex: :norm, repo: "hexpm", optional: false]}, {:toml, "~> 0.3", [hex: :toml, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.1", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "ee6d089a71309647a0a2a2ae6cf3bea61739a983e8c1310d53ff04b1493afbc1"}, + "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, + "yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"}, } diff --git a/channel-sender/rel/env.sh.eex b/channel-sender/rel/env.sh.eex index 382cd83..ca5712a 100644 --- a/channel-sender/rel/env.sh.eex +++ b/channel-sender/rel/env.sh.eex @@ -16,3 +16,6 @@ # RELEASE_DISTRIBUTION variable below. Must be "sname", "name" or "none". # export RELEASE_DISTRIBUTION=name # export RELEASE_NODE=<%= @release.name %>@127.0.0.1 + +if [ -z ${RELEASE_DISTRIBUTION+x} ]; then echo "RELEASE_DISTRIBUTION is unset"; else echo "RELEASE_DISTRIBUTION is set to '$RELEASE_DISTRIBUTION'"; fi +if [ -z ${RELEASE_NODE+x} ]; then echo "RELEASE_NODE is unset"; else echo "RELEASE_NODE is set to '$RELEASE_NODE'"; fi diff --git a/channel-sender/test/channel_sender_ex/core/channel_id_generator_test.exs b/channel-sender/test/channel_sender_ex/core/channel_id_generator_test.exs index 62555aa..41efd81 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_id_generator_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_id_generator_test.exs @@ -7,8 +7,18 @@ defmodule ChannelSenderEx.Core.ChannelIdGeneratorTest do @moduletag :capture_log setup_all do + Application.put_env(:channel_sender_ex, :secret_base, { + "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", + "socket auth" + }) + {:ok, _} = Application.ensure_all_started(:plug_crypto) Helper.compile(:channel_sender_ex) + + on_exit(fn -> + Application.delete_env(:channel_sender_ex, :secret_base) + end) + :ok end diff --git a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs index 69bc345..f38b945 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs @@ -9,11 +9,22 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do @moduletag :capture_log - @supervisor_module Application.get_env(:channel_sender_ex, :channel_supervisor_module) - @registry_module Application.get_env(:channel_sender_ex, :registry_module) - setup_all do IO.puts("Starting Applications for Socket Test") + + Application.put_env(:channel_sender_ex, + :accept_channel_reply_timeout, + 1000) + + Application.put_env(:channel_sender_ex, + :on_connected_channel_reply_timeout, + 2000) + + Application.put_env(:channel_sender_ex, :secret_base, { + "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", + "socket auth" + }) + {:ok, _} = Application.ensure_all_started(:cowboy) {:ok, _} = Application.ensure_all_started(:gun) {:ok, _} = Application.ensure_all_started(:plug_crypto) @@ -26,14 +37,16 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do event_name: "event.example" } - {:ok, pid_registry} = @registry_module.start_link(name: ChannelRegistry, keys: :unique) + {:ok, pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique) {:ok, pid_supervisor} = - @supervisor_module.start_link(name: ChannelSupervisor, strategy: :one_for_one) + Horde.DynamicSupervisor.start_link(name: ChannelSupervisor, strategy: :one_for_one) on_exit(fn -> true = Process.exit(pid_registry, :normal) true = Process.exit(pid_supervisor, :normal) + Application.delete_env(:channel_sender_ex, :accept_channel_reply_timeout) + Application.delete_env(:channel_sender_ex, :on_connected_channel_reply_timeout) IO.puts("Supervisor and Registry was terminated") end) @@ -58,6 +71,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do channel: channel, secret: secret } do + {conn, stream} = assert_connect_and_authenticate(port, channel, secret) assert {:accepted_connected, _, _} = deliver_message(channel) assert_receive {:gun_ws, ^conn, ^stream, {:text, data_string}} @@ -123,7 +137,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do defp assert_connect_and_authenticate(port, channel, secret) do conn = connect(port, channel) - assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers} + assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers}, 500 :gun.ws_send(conn, {:text, "Auth::#{secret}"}) assert_receive {:gun_ws, ^conn, ^stream, {:text, data_string}} diff --git a/channel-sender/test/channel_sender_ex/core/channel_test.exs b/channel-sender/test/channel_sender_ex/core/channel_test.exs index 48959ac..7344a75 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_test.exs @@ -11,6 +11,19 @@ defmodule ChannelSenderEx.Core.ChannelTest do @moduletag :capture_log setup_all do + Application.put_env(:channel_sender_ex, + :accept_channel_reply_timeout, + 1000) + + Application.put_env(:channel_sender_ex, + :on_connected_channel_reply_timeout, + 2000) + + Application.put_env(:channel_sender_ex, :secret_base, { + "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", + "socket auth" + }) + {:ok, _} = Application.ensure_all_started(:plug_crypto) Helper.compile(:channel_sender_ex) :ok diff --git a/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs b/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs index 632c90a..b142836 100644 --- a/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs @@ -7,25 +7,39 @@ defmodule ChannelSenderEx.Transport.Rest.RestControllerTest do alias ChannelSenderEx.Core.Security.ChannelAuthenticator alias ChannelSenderEx.Core.RulesProvider.Helper - @supervisor_module Application.get_env(:channel_sender_ex, :channel_supervisor_module) - @registry_module Application.get_env(:channel_sender_ex, :registry_module) - @moduletag :capture_log doctest RestController setup_all do + Application.put_env(:channel_sender_ex, + :accept_channel_reply_timeout, + 1000) + + Application.put_env(:channel_sender_ex, + :on_connected_channel_reply_timeout, + 2000) + + Application.put_env(:channel_sender_ex, :secret_base, { + "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", + "socket auth" + }) + {:ok, _} = Application.ensure_all_started(:cowboy) {:ok, _} = Application.ensure_all_started(:hackney) {:ok, _} = Application.ensure_all_started(:plug_crypto) {:ok, _} = Plug.Cowboy.http(RestController, [], port: 9085, protocol_options: []) - {:ok, pid_registry} = @registry_module.start_link(name: ChannelRegistry, keys: :unique) + {:ok, pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique) {:ok, pid_supervisor} = - @supervisor_module.start_link(name: ChannelSupervisor, strategy: :one_for_one) + Horde.DynamicSupervisor.start_link(name: ChannelSupervisor, strategy: :one_for_one) on_exit(fn -> + Application.delete_env(:channel_sender_ex, :secret_base) + Application.delete_env(:channel_sender_ex, :accept_channel_reply_timeout) + Application.delete_env(:channel_sender_ex, :on_connected_channel_reply_timeout) + :ok = Plug.Cowboy.shutdown(RestController.HTTP) true = Process.exit(pid_registry, :kill) true = Process.exit(pid_supervisor, :kill) diff --git a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs index 73d2ef8..6b370ee 100644 --- a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs @@ -15,13 +15,25 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do @moduletag :capture_log - @supervisor_module Application.get_env(:channel_sender_ex, :channel_supervisor_module) - @registry_module Application.get_env(:channel_sender_ex, :registry_module) @binary "binary_flow" @json "json_flow" setup_all do IO.puts("Starting Applications for Socket Test") + + Application.put_env(:channel_sender_ex, + :accept_channel_reply_timeout, + 1000) + + Application.put_env(:channel_sender_ex, + :on_connected_channel_reply_timeout, + 2000) + + Application.put_env(:channel_sender_ex, :secret_base, { + "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc", + "socket auth" + }) + {:ok, _} = Application.ensure_all_started(:cowboy) {:ok, _} = Application.ensure_all_started(:gun) {:ok, _} = Application.ensure_all_started(:plug_crypto) @@ -34,12 +46,16 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do event_name: "event.example" } - {:ok, pid_registry} = @registry_module.start_link(name: ChannelRegistry, keys: :unique) + {:ok, pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique) {:ok, pid_supervisor} = - @supervisor_module.start_link(name: ChannelSupervisor, strategy: :one_for_one) + Horde.DynamicSupervisor.start_link(name: ChannelSupervisor, strategy: :one_for_one) + on_exit(fn -> + Application.delete_env(:channel_sender_ex, :accept_channel_reply_timeout) + Application.delete_env(:channel_sender_ex, :on_connected_channel_reply_timeout) + Application.delete_env(:channel_sender_ex, :secret_base) true = Process.exit(pid_registry, :normal) true = Process.exit(pid_supervisor, :normal) IO.puts("Supervisor and Registry was terminated") @@ -63,7 +79,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do test "Should connect to socket", %{port: port, channel: channel} do conn = connect(port, channel) - assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers} + assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers}, 300 :gun.close(conn) end @@ -351,4 +367,5 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do defp decode_message({:binary, data}) do IO.inspect(BinaryEncoder.decode_message(data)) end + end