diff --git a/agones/src/sidecar.rs b/agones/src/sidecar.rs index 97c50463c4..7aa54781c5 100644 --- a/agones/src/sidecar.rs +++ b/agones/src/sidecar.rs @@ -77,10 +77,8 @@ filters: on_write: DO_NOTHING bytes: c2lkZWNhcg== # sidecar clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7654 + - endpoints: + - address: 127.0.0.1:7654 "#; let config_map = config_maps diff --git a/benches/throughput.rs b/benches/throughput.rs index 8bf8e0d016..7e32f89ca4 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -29,7 +29,7 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) { let runtime = tokio::runtime::Runtime::new().unwrap(); let config = Arc::new(quilkin::Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![quilkin::endpoint::Endpoint::new(endpoint.into())]) + clusters.insert_default([quilkin::endpoint::Endpoint::new(endpoint.into())].into()) }); let proxy = quilkin::cli::Proxy { diff --git a/build.rs b/build.rs index 6a700161bc..c308464e61 100644 --- a/build.rs +++ b/build.rs @@ -33,6 +33,7 @@ fn main() -> Result<(), Box> { "proto/data-plane-api/envoy/type/metadata/v3/metadata.proto", "proto/data-plane-api/envoy/type/tracing/v3/custom_tag.proto", "proto/quilkin/relay/v1alpha1/relay.proto", + "proto/quilkin/config/v1alpha1/config.proto", "proto/quilkin/filters/capture/v1alpha1/capture.proto", "proto/quilkin/filters/compress/v1alpha1/compress.proto", "proto/quilkin/filters/concatenate_bytes/v1alpha1/concatenate_bytes.proto", diff --git a/docs/src/deployment/quickstarts/agones-xonotic-xds.md b/docs/src/deployment/quickstarts/agones-xonotic-xds.md index 88a24c347f..fbf4cac28d 100644 --- a/docs/src/deployment/quickstarts/agones-xonotic-xds.md +++ b/docs/src/deployment/quickstarts/agones-xonotic-xds.md @@ -5,12 +5,12 @@ ## 1. Overview -In this quickstart, we'll be setting up an example [Xonotic](https://xonotic.org/) [Agones](https://agones.dev/) +In this quickstart, we'll be setting up an example [Xonotic](https://xonotic.org/) [Agones](https://agones.dev/) Fleet, that will only be accessible through Quilkin, via utilising the [TokenRouter] -Filter to provide routing and access control to the Allocated `GameServer` instances. +Filter to provide routing and access control to the Allocated `GameServer` instances. -To do this, we'll take advantage of the Quilkin [Agones xDS Provider](../../services/xds/providers/agones.md) to provide -an out-of-the-box control plane for integration between Agones and [Quilkin's xDS configuration API](../../services/xds.md) with +To do this, we'll take advantage of the Quilkin [Agones xDS Provider](../../services/xds/providers/agones.md) to provide +an out-of-the-box control plane for integration between Agones and [Quilkin's xDS configuration API](../../services/xds.md) with minimal effort. ## 2. Install Quilkin Agones xDS Provider @@ -26,11 +26,11 @@ kubectl apply -f https://raw.githubusercontent.com/googleforgames/quilkin/{{GITH This applies several resources to your cluster: -1. A [ConfigMap] with a [Capture] and [TokenRouter] Filter set up to route packets to Endpoints, to be the base +1. A [ConfigMap] with a [Capture] and [TokenRouter] Filter set up to route packets to Endpoints, to be the base configuration for all the Quilkin proxies. -2. Appropriate [RBAC](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) permissions for the +2. Appropriate [RBAC](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) permissions for the `quilkin manage agones` process to inspect Agones resources. -3. A matching [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/) that runs the +3. A matching [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/) that runs the `quilkin manage process` xDS control plane and a [Service](https://kubernetes.io/docs/concepts/services-networking/service/) that the Quilkin proxies can connect to, to get their Filter and Endpoint configuration from. @@ -44,7 +44,7 @@ quilkin-manage-agones-54b787654-9dbvp 1/1 Running 0 76s ``` We can now run `kubectl get service quilkin-manage-agones` and see the -service that is generated in front of the above Deployment for our Quilkin proxies to connect to and receive their +service that is generated in front of the above Deployment for our Quilkin proxies to connect to and receive their configuration information from. ```shell @@ -55,20 +55,20 @@ quilkin-manage-agones ClusterIP 10.104.2.72 80/TCP 1m23s ## 3. Install Quilkin Proxy Pool -To install the Quilkin Proxy pool which connects to the above xDS provider, we can create a Deployment of Quilkin +To install the Quilkin Proxy pool which connects to the above xDS provider, we can create a Deployment of Quilkin proxy instances that point to the aforementioned Service, like so: ```shell kubectl apply -f https://raw.githubusercontent.com/googleforgames/quilkin/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/proxy-pool.yaml ``` -This will set up three instances of Quilkin running as `quilkin proxy --management-server http://quilkin-manage-agones:80` +This will set up three instances of Quilkin running as `quilkin proxy --management-server http://quilkin-manage-agones:80` all connected to the `quilkin-manage-agones` service. Now we can run `kubectl get pods` until we see that the Pods for the proxy Deployment is up and running. ```shell -$ kubectl get pods +$ kubectl get pods NAME READY STATUS RESTARTS AGE quilkin-manage-agones-54b787654-9dbvp 1/1 Running 0 5m7s quilkin-proxies-78965c446d-dqvjg 1/1 Running 0 6s @@ -76,34 +76,34 @@ quilkin-proxies-78965c446d-fr6zs 1/1 Running 0 6s quilkin-proxies-78965c446d-m4rr7 1/1 Running 0 6s ``` -Let's take this one step further, and check the configuration of the proxies that should have come from the `quilkin +Let's take this one step further, and check the configuration of the proxies that should have come from the `quilkin manage agones` instance. In another terminal, run: `kubectl port-forward deployments/quilkin-proxies 8000`, to port forward the [admin endpoint](../admin.md) locally, which we can then query. -Go back to your original terminal and run `curl -s http://localhost:8000/config` +Go back to your original terminal and run `curl -s http://localhost:8000/config` > If you have [jq](https://stedolan.github.io/jq/) installed, run `curl -s http://localhost:8000/config | jq` for a > nicely formatted JSON output. ```shell $ curl -s http://localhost:8000/config -{"admin":{"address":"0.0.0.0:8000"},"clusters":{},"filters":[{"name":"quilkin.filters.capture.v1alpha1.Capture","config":{"metadataKey":"quilkin.dev/capture","suffix":{"size":3,"remove":true}}},{"name":"quilkin.filters.token_router.v1alpha1.TokenRouter","config":null}],"id":"quilkin-proxies-78965c446d-dqvjg","management_servers":[{"address":"http://quilkin-manage-agones:80"}],"port":7000,"version":"v1alpha1","maxmind_db":null}% +{"admin":{"address":"0.0.0.0:8000"},"clusters":{},"filters":[{"name":"quilkin.filters.capture.v1alpha1.Capture","config":{"metadataKey":"quilkin.dev/capture","suffix":{"size":3,"remove":true}}},{"name":"quilkin.filters.token_router.v1alpha1.TokenRouter","config":null}],"id":"quilkin-proxies-78965c446d-dqvjg","management_servers":[{"address":"http://quilkin-manage-agones:80"}],"port":7000,"version":"v1alpha1","maxmind_db":null}% ``` -This shows us the current configuration of the proxies coming from the xDS server created via `quilkin manage -agones`. The most interesting part that we see here, is that we have a matching set of -[Filters](../../services/proxy/filters.md) that are found in the `ConfigMap` in the +This shows us the current configuration of the proxies coming from the xDS server created via `quilkin manage +agones`. The most interesting part that we see here, is that we have a matching set of +[Filters](../../services/proxy/filters.md) that are found in the `ConfigMap` in the [xds-control-plane.yaml](https://github.com/googleforgames/quilkin/blob/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/xds-control-plane.yaml) we installed earlier. ## 4. Create the Agones Fleet -Now we will create an [Agones Fleet](https://agones.dev/site/docs/reference/fleet/) to spin up all our Xonotic +Now we will create an [Agones Fleet](https://agones.dev/site/docs/reference/fleet/) to spin up all our Xonotic game servers. -Thankfully, Agones Fleets require no specific configuration to work with Quilkin proxies, so this yaml is a +Thankfully, Agones Fleets require no specific configuration to work with Quilkin proxies, so this yaml is a [standard Agones Fleet configuration](https://github.com/googleforgames/quilkin/blob/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/fleet.yaml) ```shell @@ -126,18 +126,18 @@ To let the Quilkin xDS provider know what token will route to which `GameServer` `quilkin.dev/tokens` annotation to an allocated `GameServer`, with the token content as its value. > This token would normally get generated by some kind of player authentication service and passed to the client -> via the matchmaking service - but for demonstrative purposes, we've hardcoded it into the example +> via the matchmaking service - but for demonstrative purposes, we've hardcoded it into the example > `GameServerAllocation`. -Since you can add annotations to `GameServers` at -[allocation time](https://agones.dev/site/docs/reference/gameserverallocation/), we can both allocate a `GameServer` +Since you can add annotations to `GameServers` at +[allocation time](https://agones.dev/site/docs/reference/gameserverallocation/), we can both allocate a `GameServer` and apply the annotation at the same time! ```shell kubectl create -f https://raw.githubusercontent.com/googleforgames/quilkin/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/gameserverallocation.yaml ``` -If we check our `GameServers` now, we should see that one of them has moved to the `Allocated` state, marking it as +If we check our `GameServers` now, we should see that one of them has moved to the `Allocated` state, marking it as having players playing on it, and therefore it is protected by Agones until the game session ends. ```shell @@ -150,7 +150,7 @@ xonotic-d7rfx-sn5d6 Ready 34.168.170.51 7036 gke-agones-default-534a > Don't do this more than once, as then multiple allocated `GameServers` will have the same routing token! -If we `kubectl describe gameserver ` and have a look at the annotations section, we +If we `kubectl describe gameserver ` and have a look at the annotations section, we should see something similar to this: ```shell @@ -168,7 +168,7 @@ Kind: GameServer ... ``` -Where we can see that there is now an annotation of `quilkin.dev/tokens` with the base64 encoded version of `456` as +Where we can see that there is now an annotation of `quilkin.dev/tokens` with the base64 encoded version of `456` as our authentication and routing token ("NDU2"). > You should use something more cryptographically random than `456` in your application. @@ -177,22 +177,22 @@ Let's run `curl -s http://localhost:8000/config` again, so we can see what has c ```shell $ curl -s http://localhost:8000/config -{"admin":{"address":"0.0.0.0:8000"},"clusters":{"default":{"localities":[{"locality":null,"endpoints":[{"address":"34.168.170.51:7226","metadata":{"quilkin.dev":{"tokens":["NDU2"]}}}]}]}},"filters":[{"name":"quilkin.filters.capture.v1alpha1.Capture","config":{"metadataKey":"quilkin.dev/capture","suffix":{"size":3,"remove":true}}},{"name":"quilkin.filters.token_router.v1alpha1.TokenRouter","config":null}],"id":"quilkin-proxies-78965c446d-tfgsj","management_servers":[{"address":"http://quilkin-manage-agones:80"}],"port":7000,"version":"v1alpha1","maxmind_db":null}% +{"admin":{"address":"0.0.0.0:8000"},"clusters": [{"locality":null,"endpoints":[{"address":"34.168.170.51:7226","metadata":{"quilkin.dev":{"tokens":["NDU2"]}}}]}],"filters":[{"name":"quilkin.filters.capture.v1alpha1.Capture","config":{"metadataKey":"quilkin.dev/capture","suffix":{"size":3,"remove":true}}},{"name":"quilkin.filters.token_router.v1alpha1.TokenRouter","config":null}],"id":"quilkin-proxies-78965c446d-tfgsj","management_servers":[{"address":"http://quilkin-manage-agones:80"}],"port":7000,"version":"v1alpha1","maxmind_db":null}% ``` -Looking under `clusters` > `localities` > `endpoints` we can see an address and token that matches up with the +Looking under `clusters` > `endpoints` we can see an address and token that matches up with the `GameServer` record we created above! -The xDS process saw that allocated `GameServer`, turned it into a Quilkin `Endpoint` and applied the set routing -token appropriately -- without you having to write a line of xDS compliant code! +The xDS process saw that allocated `GameServer`, turned it into a Quilkin `Endpoint` and applied the set routing +token appropriately -- without you having to write a line of xDS compliant code! ## Connecting Client Side Instead of connecting to Xonotic or an Agones `GameServer` directly, we'll want to grab the IP and exposed port of -the `Service` that fronts all our Quilkin proxies and connect to that instead -- but we'll have to append our +the `Service` that fronts all our Quilkin proxies and connect to that instead -- but we'll have to append our routing token `456` from before, to ensure our traffic gets routed to the correct Xonotic `GameServer` address. -Run `kubectl get service quilkin-proxies` to get the `EXTERNAL-IP` of the Service you created. +Run `kubectl get service quilkin-proxies` to get the `EXTERNAL-IP` of the Service you created. ```shell $ kubectl get service quilkin-proxies @@ -201,14 +201,14 @@ quilkin-proxies LoadBalancer 10.109.0.12 35.246.94.14 7000:30174/UDP ``` We have a [Quilkin config yaml](https://github.com/googleforgames/quilkin/blob/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/client-token.yaml) -file all ready for you, that is configured to append the routing token `456` to each -packet that passes through it, via the power of a +file all ready for you, that is configured to append the routing token `456` to each +packet that passes through it, via the power of a [ConcatenateBytes](../../services/proxy/filters/concatenate_bytes.md) Filter. Download `client-token.yaml` locally, so you can edit it: ```shell -curl https://raw.githubusercontent.com/googleforgames/quilkin/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/client-token.yaml --output client-token.yaml +curl https://raw.githubusercontent.com/googleforgames/quilkin/{{GITHUB_REF_NAME}}/examples/agones-xonotic-xds/client-token.yaml --output client-token.yaml ``` We then take the EXTERNAL-IP and port from the `quilkin-proxies` service, and replace the`${LOADBALANCER_IP}` diff --git a/docs/src/services/proxy.md b/docs/src/services/proxy.md index 16f4d0b11d..89635895f9 100644 --- a/docs/src/services/proxy.md +++ b/docs/src/services/proxy.md @@ -8,7 +8,7 @@ "Proxy" is the primary Quilkin service, which acts as a non-transparent UDP proxy. -To view all the options for the `proxy` subcommand, run: +To view all the options for the `proxy` subcommand, run: ```shell $ quilkin proxy --help @@ -17,51 +17,49 @@ $ quilkin proxy --help ## Endpoints -An Endpoint represents an address that Quilkin forwards packets to that it has received from the +An Endpoint represents an address that Quilkin forwards packets to that it has received from the source port. -It is represented by an IP address and port. An Endpoint can optionally be associated with an arbitrary set of +It is represented by an IP address and port. An Endpoint can optionally be associated with an arbitrary set of [metadata](#endpoint-metadata) as well. ## Proxy Filters Filters are the way for a Quilkin proxy to intercept UDP packet traffic from the source and [Endpoints][Endpoint] in either direction, and be able to inspect, -manipulate, and route the packets as desired. +manipulate, and route the packets as desired. -See [Filters] for a deeper dive into Filters, as well as the list of build in Filters that come with +See [Filters] for a deeper dive into Filters, as well as the list of build in Filters that come with Quilkin. ## Endpoint Metadata Endpoint metadata is an arbitrary set of key value pairs that are associated with an Endpoint. -These are visible to Filters when processing packets and can be used to provide more context about endpoints (e.g +These are visible to Filters when processing packets and can be used to provide more context about endpoints (e.g whether or not to route a packet to an endpoint). Keys must be of type string otherwise the configuration is rejected. Metadata associated with an endpoint contain arbitrary key value pairs which [Filters] can consult when processing packets (e.g they can contain information that determine whether or not to route a particular packet to an endpoint). ### Specialist Endpoint Metadata -Access tokens that can be associated with an endpoint are simply a special piece of metadata well known to Quilkin +Access tokens that can be associated with an endpoint are simply a special piece of metadata well known to Quilkin and utilised by the built-in [TokenRouter] filter to route packets. -Such well known values are placed within an object in the endpoint metadata, under the special key `quilkin.dev`. +Such well known values are placed within an object in the endpoint metadata, under the special key `quilkin.dev`. Currently, only the `tokens` key is in use. As an example, the following shows the configuration for an endpoint with its metadata: ```yaml clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:26000 - metadata: - canary: false - quilkin.dev: # This object is extracted by Quilkin and is usually reserved for built-in features - tokens: - - MXg3aWp5Ng== # base64 for 1x7ijy6 - - OGdqM3YyaQ== # base64 for 8gj3v2i + - endpoints: + - address: 127.0.0.1:26000 + metadata: + canary: false + quilkin.dev: # This object is extracted by Quilkin and is usually reserved for built-in features + tokens: + - MXg3aWp5Ng== # base64 for 1x7ijy6 + - OGdqM3YyaQ== # base64 for 8gj3v2i ``` An endpoint's metadata can be specified alongside the endpoint in [static configuration][file-configuration] or using the [xDS endpoint metadata][xds-endpoint-metadata] field when using [dynamic configuration][dynamic-configuration-doc] via xDS. @@ -74,17 +72,17 @@ Quilkin uses the "Session" concept to track traffic flowing through the proxy be Session serves the same purpose, and can be thought of as a lightweight version of a `TCP` session in that, while a TCP session requires a protocol to establish and teardown: -- A Quilkin session is automatically created upon receiving the first packet from a client via the [Local Port] to be +- A Quilkin session is automatically created upon receiving the first packet from a client via the [Local Port] to be sent to an upstream [Endpoint]. -- The session is automatically deleted after a period of inactivity (where no packet was sent between either +- The session is automatically deleted after a period of inactivity (where no packet was sent between either party) - currently 60 seconds. -A session is identified by the 4-tuple `(client IP, client Port, server IP, server Port)` where the client is the -downstream endpoint which initiated the communication with Quilkin and the server is one of the upstream Endpoints +A session is identified by the 4-tuple `(client IP, client Port, server IP, server Port)` where the client is the +downstream endpoint which initiated the communication with Quilkin and the server is one of the upstream Endpoints that Quilkin proxies traffic to. -Sessions are established *after* the filter chain completes. The destination Endpoint of a packet is determined by -the [filter chain][Filters], so a Session can only be created after filter chain completion. For example, if the +Sessions are established *after* the filter chain completes. The destination Endpoint of a packet is determined by +the [filter chain][Filters], so a Session can only be created after filter chain completion. For example, if the filter chain drops all packets, then no session will ever be created. [Endpoint]: #endpoints diff --git a/docs/src/services/proxy/configuration.md b/docs/src/services/proxy/configuration.md index e871915a6a..e29ca35f44 100644 --- a/docs/src/services/proxy/configuration.md +++ b/docs/src/services/proxy/configuration.md @@ -22,14 +22,14 @@ endpoint configuration to specify two endpoints with `token` metadata attached t {{#include ../../../../examples/proxy.yaml:17:100}} ``` -This is a great use of a static configuration file, as we only get a singular `--to` endpoint address via the +This is a great use of a static configuration file, as we only get a singular `--to` endpoint address via the command line arguments. We can also configure [Filters](./filters.md) via the configuration file. See that section for documentation. ## Dynamic Configuration -If you need to dynamically change either Filters and/or Endpoints at runtime, see the [Control Plane](../xds.md) +If you need to dynamically change either Filters and/or Endpoints at runtime, see the [Control Plane](../xds.md) documentation on the configuration API surface, and built in dynamic management providers. ## Json Schema @@ -66,58 +66,33 @@ properties: items: '$ref': {} # Refer to the Filter documentation for a filter configuration schema. clusters: - type: object + type: array description: | - grouping of clusters, each with a key for a name - additionalProperties: + grouping of endpoints, per cluster. + items: type: object - description: | - An individual cluster properties: - localities: + endpoints: type: array description: | - grouping of endpoints, per cluster. + A list of upstream endpoints to forward packets to. items: type: object + description: | + An upstream endpoint properties: - endpoints: - type: array + address: + type: string description: | - A list of upstream endpoints to forward packets to. - items: + Socket address of the endpoint. This must be of the ´IP:Port` form e.g `192.168.1.1:7001` + metadata: type: object description: | - An upstream endpoint - properties: - address: - type: string - description: | - Socket address of the endpoint. This must be of the ´IP:Port` form e.g `192.168.1.1:7001` - metadata: - type: object - description: | - Arbitrary key value pairs that is associated with the endpoint. - These are visible to Filters when processing packets and can be used to provide more context about endpoints (e.g whether or not to route a packet to an endpoint). - Keys must be of type string otherwise the configuration is rejected. - required: - - address - management_servers: - type: array - description: | - A list of XDS management servers to fetch configuration from. - Multiple servers can be provided for redundancy for the proxy to - fall back to upon error. - items: - type: object - description: | - Configuration for a management server. - properties: - address: - type: string - description: | - Address of the management server. This must have the `http(s)` scheme prefix. - Example: `http://example.com` + Arbitrary key value pairs that is associated with the endpoint. + These are visible to Filters when processing packets and can be used to provide more context about endpoints (e.g whether or not to route a packet to an endpoint). + Keys must be of type string otherwise the configuration is rejected. + required: + - address ``` [examples]: https://github.com/googleforgames/quilkin/blob/{{GITHUB_REF_NAME}}/examples diff --git a/docs/src/services/proxy/filters.md b/docs/src/services/proxy/filters.md index 1f8a7b5091..9f8f6a8e37 100644 --- a/docs/src/services/proxy/filters.md +++ b/docs/src/services/proxy/filters.md @@ -50,10 +50,8 @@ filters: max_packets: 10 period: 1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 2); diff --git a/docs/src/services/proxy/filters/capture.md b/docs/src/services/proxy/filters/capture.md index 907ea2ac7c..a21a72ac90 100644 --- a/docs/src/services/proxy/filters/capture.md +++ b/docs/src/services/proxy/filters/capture.md @@ -41,10 +41,8 @@ filters: size: 3 remove: false clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/compress.md b/docs/src/services/proxy/filters/compress.md index 59d7439e44..e478906011 100644 --- a/docs/src/services/proxy/filters/compress.md +++ b/docs/src/services/proxy/filters/compress.md @@ -19,10 +19,8 @@ filters: on_write: DECOMPRESS mode: SNAPPY clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/concatenate_bytes.md b/docs/src/services/proxy/filters/concatenate_bytes.md index 9b918854bf..76bc61eaeb 100644 --- a/docs/src/services/proxy/filters/concatenate_bytes.md +++ b/docs/src/services/proxy/filters/concatenate_bytes.md @@ -19,10 +19,8 @@ filters: on_write: DO_NOTHING bytes: MXg3aWp5Ng== clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/debug.md b/docs/src/services/proxy/filters/debug.md index e2eead229d..8ae5b04f93 100644 --- a/docs/src/services/proxy/filters/debug.md +++ b/docs/src/services/proxy/filters/debug.md @@ -18,10 +18,8 @@ filters: config: id: debug-1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/firewall.md b/docs/src/services/proxy/filters/firewall.md index 996be96ea2..9b814f2c7d 100644 --- a/docs/src/services/proxy/filters/firewall.md +++ b/docs/src/services/proxy/filters/firewall.md @@ -27,10 +27,8 @@ filters: ports: - 7000 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/load_balancer.md b/docs/src/services/proxy/filters/load_balancer.md index 3942050317..c40db0e834 100644 --- a/docs/src/services/proxy/filters/load_balancer.md +++ b/docs/src/services/proxy/filters/load_balancer.md @@ -18,10 +18,8 @@ filters: config: policy: ROUND_ROBIN clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/local_rate_limit.md b/docs/src/services/proxy/filters/local_rate_limit.md index 761030fb65..1827d9b052 100644 --- a/docs/src/services/proxy/filters/local_rate_limit.md +++ b/docs/src/services/proxy/filters/local_rate_limit.md @@ -22,10 +22,8 @@ filters: max_packets: 1000 period: 1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/docs/src/services/proxy/filters/match.md b/docs/src/services/proxy/filters/match.md index 43f10979a5..f81fc22f78 100644 --- a/docs/src/services/proxy/filters/match.md +++ b/docs/src/services/proxy/filters/match.md @@ -15,11 +15,9 @@ quilkin.filters.match.v1alpha1.Match # let yaml = " version: v1alpha1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:26000 - - address: 127.0.0.1:26001 + - endpoints: + - address: 127.0.0.1:26000 + - address: 127.0.0.1:26001 filters: - name: quilkin.filters.capture.v1alpha1.Capture config: diff --git a/docs/src/services/proxy/filters/timestamp.md b/docs/src/services/proxy/filters/timestamp.md index 59f4b26946..5481e28ca6 100644 --- a/docs/src/services/proxy/filters/timestamp.md +++ b/docs/src/services/proxy/filters/timestamp.md @@ -24,10 +24,8 @@ filters: config: metadataKey: example.com/session_duration clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:26000 + - endpoints: + - address: 127.0.0.1:26000 # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); ``` diff --git a/docs/src/services/proxy/filters/token_router.md b/docs/src/services/proxy/filters/token_router.md index de5fb23271..ed56428eef 100644 --- a/docs/src/services/proxy/filters/token_router.md +++ b/docs/src/services/proxy/filters/token_router.md @@ -20,20 +20,18 @@ filters: config: metadataKey: myapp.com/myownkey clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:26000 - metadata: - quilkin.dev: - tokens: - - MXg3aWp5Ng== # Authentication is provided by these ids, and matched against - - OGdqM3YyaQ== # the value stored in Filter dynamic metadata - - address: 127.0.0.1:26001 - metadata: - quilkin.dev: - tokens: - - bmt1eTcweA== + - endpoints: + - address: 127.0.0.1:26000 + metadata: + quilkin.dev: + tokens: + - MXg3aWp5Ng== # Authentication is provided by these ids, and matched against + - OGdqM3YyaQ== # the value stored in Filter dynamic metadata + - address: 127.0.0.1:26001 + metadata: + quilkin.dev: + tokens: + - bmt1eTcweA== # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); @@ -84,20 +82,18 @@ filters: remove: true - name: quilkin.filters.token_router.v1alpha1.TokenRouter clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:26000 - metadata: - quilkin.dev: - tokens: - - MXg3aWp5Ng== # Authentication is provided by these ids, and matched against - - OGdqM3YyaQ== # the value stored in Filter dynamic metadata - - address: 127.0.0.1:26001 - metadata: - quilkin.dev: - tokens: - - bmt1eTcweA== + - endpoints: + - address: 127.0.0.1:26000 + metadata: + quilkin.dev: + tokens: + - MXg3aWp5Ng== # Authentication is provided by these ids, and matched against + - OGdqM3YyaQ== # the value stored in Filter dynamic metadata + - address: 127.0.0.1:26001 + metadata: + quilkin.dev: + tokens: + - bmt1eTcweA== # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 2); diff --git a/docs/src/services/proxy/filters/writing_custom_filters.md b/docs/src/services/proxy/filters/writing_custom_filters.md index ab6782602f..7e584e8e8d 100644 --- a/docs/src/services/proxy/filters/writing_custom_filters.md +++ b/docs/src/services/proxy/filters/writing_custom_filters.md @@ -105,10 +105,8 @@ version: v1alpha1 filters: - name: greet.v1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:4321 + - endpoints: + - address: 127.0.0.1:4321 ``` Next we to setup our network of services, for this example we're going to use diff --git a/docs/src/services/relay.md b/docs/src/services/relay.md index 182d2d7965..5ebeabff92 100644 --- a/docs/src/services/relay.md +++ b/docs/src/services/relay.md @@ -37,10 +37,8 @@ example works with any configuration provider. # quilkin.yaml version: v1alpha1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:8888 + - endpoints: + - address: 127.0.0.1:8888 ``` To start the relay, run the `relay` command: @@ -91,10 +89,8 @@ set of data that the proxies can query using xDS discovery requests. # quilkin2.yaml version: v1alpha1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:9999 + - endpoints: + - address: 127.0.0.1:9999 ``` ``` diff --git a/docs/src/services/xds/providers/filesystem.md b/docs/src/services/xds/providers/filesystem.md index 350c1389de..cb51b1f328 100644 --- a/docs/src/services/xds/providers/filesystem.md +++ b/docs/src/services/xds/providers/filesystem.md @@ -25,14 +25,12 @@ filters: config: id: hello clusters: - cluster-a: - localities: - - endpoints: - - address: 123.0.0.1:29 - metadata: - 'quilkin.dev': - tokens: - - 'MXg3aWp5Ng==' + - endpoints: + - address: 123.0.0.1:29 + metadata: + 'quilkin.dev': + tokens: + - 'MXg3aWp5Ng==' # "; # let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); # assert_eq!(config.filters.load().len(), 1); diff --git a/examples/agones-xonotic-sidecar/client-compress.yaml b/examples/agones-xonotic-sidecar/client-compress.yaml index c9ae4a4c27..f930d8b8a2 100644 --- a/examples/agones-xonotic-sidecar/client-compress.yaml +++ b/examples/agones-xonotic-sidecar/client-compress.yaml @@ -22,7 +22,5 @@ filters: on_write: DECOMPRESS mode: SNAPPY clusters: - default: - localities: - - endpoints: - - address: ${GAMESERVER_IP}:${GAMESERVER_PORT} + - endpoints: + - address: ${GAMESERVER_IP}:${GAMESERVER_PORT} diff --git a/examples/agones-xonotic-sidecar/sidecar-compress.yaml b/examples/agones-xonotic-sidecar/sidecar-compress.yaml index 87cd6b275e..3ec4073e12 100644 --- a/examples/agones-xonotic-sidecar/sidecar-compress.yaml +++ b/examples/agones-xonotic-sidecar/sidecar-compress.yaml @@ -28,10 +28,8 @@ data: on_write: COMPRESS mode: SNAPPY clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:26000 + - endpoints: + - address: 127.0.0.1:26000 --- apiVersion: "agones.dev/v1" kind: Fleet diff --git a/examples/agones-xonotic-xds/client-token.yaml b/examples/agones-xonotic-xds/client-token.yaml index abf1b9ab30..1b99fc536a 100644 --- a/examples/agones-xonotic-xds/client-token.yaml +++ b/examples/agones-xonotic-xds/client-token.yaml @@ -22,7 +22,5 @@ filters: on_write: DO_NOTHING bytes: NDU2 # 456 clusters: - default: - localities: - - endpoints: - - address: ${LOADBALANCER_IP}:7777 + - endpoints: + - address: ${LOADBALANCER_IP}:7777 diff --git a/examples/iperf3/proxy.yaml b/examples/iperf3/proxy.yaml index b060ef78a4..9b0f3abc63 100644 --- a/examples/iperf3/proxy.yaml +++ b/examples/iperf3/proxy.yaml @@ -21,7 +21,5 @@ version: v1alpha1 id: iperf3 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:8001 + - endpoints: + - address: 127.0.0.1:8001 diff --git a/examples/proxy.yaml b/examples/proxy.yaml index 946fedbe49..be96a222b7 100644 --- a/examples/proxy.yaml +++ b/examples/proxy.yaml @@ -21,17 +21,15 @@ version: v1alpha1 id: my-proxy # An identifier for the proxy instance. clusters: # grouping of clusters - default: - localities: # grouping of endpoints within a cluster - - endpoints: # array of potential endpoints to send on traffic to - - address: 127.0.0.1:26000 - metadata: # Metadata associated with the endpoint - quilkin.dev: - tokens: - - MXg3aWp5Ng== # the connection byte array to route to, encoded as base64 (string value: 1x7ijy6) - - OGdqM3YyaQ== # (string value: 8gj3v2i) - - address: 127.0.0.1:26001 - metadata: # Metadata associated with the endpoint - quilkin.dev: - tokens: - - bmt1eTcweA== # (string value: nkuy70x) + - endpoints: # array of potential endpoints to send on traffic to + - address: 127.0.0.1:26000 + metadata: # Metadata associated with the endpoint + quilkin.dev: + tokens: + - MXg3aWp5Ng== # the connection byte array to route to, encoded as base64 (string value: 1x7ijy6) + - OGdqM3YyaQ== # (string value: 8gj3v2i) + - address: 127.0.0.1:26001 + metadata: # Metadata associated with the endpoint + quilkin.dev: + tokens: + - bmt1eTcweA== # (string value: nkuy70x) diff --git a/proto/quilkin/config/v1alpha1/config.proto b/proto/quilkin/config/v1alpha1/config.proto new file mode 100644 index 0000000000..651db8144d --- /dev/null +++ b/proto/quilkin/config/v1alpha1/config.proto @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +import "google/protobuf/struct.proto"; + +package quilkin.config.v1alpha1; + +message ClusterMap { + repeated Cluster clusters = 1; +} + +message Cluster { + Locality locality = 1; + repeated Endpoint endpoints = 2; +} + +message Locality { + string region = 1; + string zone = 2; + string sub_zone = 3; +} + +message Endpoint { + string host = 1; + uint32 port = 2; + google.protobuf.Struct metadata = 3; +} diff --git a/src/admin.rs b/src/admin.rs index 3672c2e51f..c18afe4126 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -149,12 +149,10 @@ mod tests { let response = super::check_proxy_readiness(&config); assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); - let cluster = crate::cluster::Cluster::new_default(vec![vec![Endpoint::new( - (std::net::Ipv4Addr::LOCALHOST, 25999).into(), - )] - .into()]); - - config.clusters.write().insert(cluster); + config + .clusters + .write() + .insert_default([Endpoint::new((std::net::Ipv4Addr::LOCALHOST, 25999).into())].into()); let response = super::check_proxy_readiness(&config); assert_eq!(response.status(), StatusCode::OK); diff --git a/src/cli.rs b/src/cli.rs index 1ac73b0fe7..6f5037e90c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -237,7 +237,7 @@ mod tests { use crate::{ config::{Filter, Providers}, - endpoint::{Endpoint, LocalityEndpoints}, + endpoint::Endpoint, filters::{Capture, StaticFilter, TokenRouter}, }; @@ -281,16 +281,15 @@ mod tests { let endpoints_file = tempfile::NamedTempFile::new().unwrap(); let config = Config::default(); std::fs::write(endpoints_file.path(), { - config - .clusters - .write() - .default_cluster_mut() - .insert(LocalityEndpoints::from(vec![Endpoint::with_metadata( + config.clusters.write().insert_default( + [Endpoint::with_metadata( (std::net::Ipv4Addr::LOCALHOST, server_port).into(), crate::endpoint::Metadata { tokens: vec!["abc".into()].into_iter().collect(), }, - )])); + )] + .into(), + ); serde_yaml::to_string(&config).unwrap() }) .unwrap(); @@ -315,15 +314,15 @@ mod tests { quiet: true, admin_address: Some((Ipv4Addr::LOCALHOST, control_plane_admin_port).into()), config: <_>::default(), - command: Commands::Manage(Manage { + command: Commands::Agent(Agent { relay: vec!["http://localhost:7900".parse().unwrap()], - port: 7801, region: None, sub_zone: None, zone: None, - provider: Providers::File { + qcmp_port: crate::test_utils::available_addr().await.port(), + provider: Some(Providers::File { path: endpoints_file.path().to_path_buf(), - }, + }), }), }; @@ -356,16 +355,15 @@ mod tests { tracing::info!(?token, "writing new config"); std::fs::write(endpoints_file.path(), { - config - .clusters - .write() - .default_cluster_mut() - .insert(LocalityEndpoints::from(vec![Endpoint::with_metadata( + config.clusters.write().insert_default( + [Endpoint::with_metadata( (std::net::Ipv4Addr::LOCALHOST, server_port).into(), crate::endpoint::Metadata { tokens: vec![token.clone()].into_iter().collect(), }, - )])); + )] + .into(), + ); serde_yaml::to_string(&config).unwrap() }) .unwrap(); diff --git a/src/cli/manage.rs b/src/cli/manage.rs index 73a217071c..22b226bd03 100644 --- a/src/cli/manage.rs +++ b/src/cli/manage.rs @@ -61,7 +61,7 @@ impl Manage { if let Some(locality) = &locality { config .clusters - .modify(|map| map.update_unlocated_endpoints(locality)); + .modify(|map| map.update_unlocated_endpoints(locality.clone())); } let provider_task = self.provider.spawn(config.clone(), locality.clone()); diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index a067e9e659..590bbe5d41 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -87,7 +87,14 @@ impl Proxy { if !self.to.is_empty() { config.clusters.modify(|clusters| { - clusters.default_cluster_mut().localities = vec![self.to.clone().into()].into(); + clusters.insert( + None, + self.to + .iter() + .cloned() + .map(crate::endpoint::Endpoint::from) + .collect(), + ); }); } @@ -109,9 +116,7 @@ impl Proxy { let mut stream = client.xds_client_stream(config.clone()); tokio::time::sleep(std::time::Duration::from_nanos(1)).await; - stream - .discovery_request(ResourceType::Endpoint, &[]) - .await?; + stream.discovery_request(ResourceType::Cluster, &[]).await?; tokio::time::sleep(std::time::Duration::from_nanos(1)).await; stream .discovery_request(ResourceType::Listener, &[]) @@ -199,10 +204,13 @@ mod tests { let config = Arc::new(crate::Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![ - Endpoint::new(endpoint1.socket.local_addr().unwrap().into()), - Endpoint::new(endpoint2.socket.local_addr().unwrap().into()), - ]) + clusters.insert_default( + [ + Endpoint::new(endpoint1.socket.local_addr().unwrap().into()), + Endpoint::new(endpoint2.socket.local_addr().unwrap().into()), + ] + .into(), + ); }); t.run_server(config, proxy, None); @@ -242,9 +250,9 @@ mod tests { }; let config = Arc::new(Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::new( - endpoint.socket.local_addr().unwrap().into(), - )]) + clusters.insert_default( + [Endpoint::new(endpoint.socket.local_addr().unwrap().into())].into(), + ); }); t.run_server(config, proxy, None); @@ -281,9 +289,9 @@ mod tests { .unwrap(), ); config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::new( - endpoint.socket.local_addr().unwrap().into(), - )]) + clusters.insert_default( + [Endpoint::new(endpoint.socket.local_addr().unwrap().into())].into(), + ); }); t.run_server( config, @@ -320,7 +328,7 @@ mod tests { let msg = "hello"; let config = Arc::new(Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()]) + clusters.insert_default([endpoint.socket.local_addr().unwrap().into()].into()) }); // we'll test a single DownstreamReceiveWorkerConfig @@ -358,7 +366,12 @@ mod tests { let config = Arc::new(crate::Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()]) + clusters.insert_default( + [crate::endpoint::Endpoint::from( + endpoint.socket.local_addr().unwrap(), + )] + .into(), + ) }); proxy.run_recv_from(&config, <_>::default()).unwrap(); diff --git a/src/cluster.rs b/src/cluster.rs index 6fe0f4ccd0..09be9719eb 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -14,18 +14,19 @@ * limitations under the License. */ -use std::collections::HashMap; +use std::collections::BTreeSet; use dashmap::DashMap; use once_cell::sync::Lazy; -use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::endpoint::{Endpoint, EndpointAddress, Locality, LocalityEndpoints, LocalitySet}; +use crate::endpoint::{Endpoint, Locality}; -const DEFAULT_CLUSTER_NAME: &str = "default"; const SUBSYSTEM: &str = "cluster"; +crate::include_proto!("quilkin.config.v1alpha1"); +pub(crate) use self::quilkin::config::v1alpha1 as proto; + pub(crate) fn active_clusters() -> &'static prometheus::IntGauge { static ACTIVE_CLUSTERS: Lazy = Lazy::new(|| { crate::metrics::register( @@ -56,175 +57,111 @@ pub(crate) fn active_endpoints() -> &'static prometheus::IntGauge { &ACTIVE_ENDPOINTS } -#[derive(Clone, Default, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -pub struct Cluster { - #[serde(skip, default = "default_cluster_name")] - pub name: String, - pub localities: LocalitySet, -} - -impl Cluster { - /// Creates a new `Cluster` called `name` containing `localities`. - pub fn new(name: impl Into, localities: impl Into) -> Self { - Self { - name: name.into(), - localities: localities.into(), - } - } - - /// Creates a new `Cluster` called `"default"` containing `endpoints`. - pub fn new_default(endpoints: impl Into) -> Self { - Self::new("default", endpoints) - } +/// Represents a full snapshot of all clusters. +#[derive(Clone, Default, Debug)] +pub struct ClusterMap(DashMap, BTreeSet>); - /// Adds a new set of endpoints to the cluster. - pub fn insert(&mut self, endpoints: impl Into) { - self.localities.insert(endpoints.into()); - } +type DashMapRef<'inner> = dashmap::mapref::one::Ref<'inner, Option, BTreeSet>; +type DashMapRefMut<'inner> = + dashmap::mapref::one::RefMut<'inner, Option, BTreeSet>; - pub fn update_locality(&mut self, locality: &Locality) { - if let Some(endpoints) = self.localities.remove(&None) { - self.localities - .insert(endpoints.with_locality(locality.clone())); - } +impl ClusterMap { + pub fn new_default(cluster: BTreeSet) -> Self { + let this = Self::default(); + this.insert_default(cluster); + this } - /// Provides a flat iterator over the list of endpoints. - pub fn endpoints(&self) -> impl Iterator + '_ { - self.localities - .iter() - .flat_map(|locality| locality.endpoints.iter()) + pub fn insert( + &self, + locality: Option, + cluster: BTreeSet, + ) -> Option> { + self.0.insert(locality, cluster) } - pub fn merge(&mut self, cluster: &Self) { - self.localities.merge(&cluster.localities); + pub fn len(&self) -> usize { + self.0.len() } -} - -fn default_cluster_name() -> String { - DEFAULT_CLUSTER_NAME.into() -} - -/// Represents a full snapshot of all clusters. -#[derive(Clone, Default, Debug, Serialize)] -pub struct ClusterMap(DashMap); -type DashMapRef<'inner> = dashmap::mapref::one::Ref<'inner, String, Cluster>; -type DashMapRefMut<'inner> = dashmap::mapref::one::RefMut<'inner, String, Cluster>; - -impl ClusterMap { - /// Creates a new `Cluster` called `name` containing `endpoints`. - pub fn new_with_default_cluster(localities: impl Into) -> Self { - Self::from_iter([Cluster::new_default(vec![localities.into()])]) + pub fn is_empty(&self) -> bool { + self.0.is_empty() } - pub fn insert(&self, cluster: Cluster) -> Option { - self.0.insert(cluster.name.clone(), cluster) - } - - pub fn get(&self, key: &str) -> Option { + pub fn get(&self, key: &Option) -> Option { self.0.get(key) } - pub fn get_mut(&self, key: &str) -> Option { + pub fn get_mut(&self, key: &Option) -> Option { self.0.get_mut(key) } pub fn get_default(&self) -> Option { - self.get(DEFAULT_CLUSTER_NAME) + self.get(&None) } pub fn get_default_mut(&self) -> Option { - self.get_mut(DEFAULT_CLUSTER_NAME) + self.get_mut(&None) } - pub fn remove_endpoint(&self, endpoint: &Endpoint) -> Option<()> { - self.0.iter_mut().find_map(|mut cluster| { - for le in cluster.localities.iter_mut() { - if let Some(endpoint) = le - .endpoints - .iter() - .find(|rhs| endpoint.address == rhs.address) - .cloned() - { - le.endpoints.remove(&endpoint); - } + pub fn insert_default(&self, endpoints: BTreeSet) { + self.insert(None, endpoints); + } + + pub fn remove_endpoint(&self, endpoint: &Endpoint) -> bool { + for mut entry in self.0.iter_mut() { + if entry.value_mut().remove(endpoint) { + return true; } + } - None - }) + false } - pub fn remove_endpoint_if(&self, closure: impl Fn(&Endpoint) -> bool) -> Option<()> { - self.0.iter_mut().find_map(|mut cluster| { - cluster.localities.iter_mut().find_map(|le| { - le.endpoints - .iter() - .find(|endpoint| (closure)(endpoint)) - .cloned() - .and_then(|endpoint| le.endpoints.remove(&endpoint).then_some(())) - }) - }) - } + pub fn remove_endpoint_if(&self, closure: impl Fn(&Endpoint) -> bool) -> bool { + for mut entry in self.0.iter_mut() { + let set = entry.value_mut(); + if let Some(endpoint) = set.iter().find(|endpoint| (closure)(endpoint)).cloned() { + return set.remove(&endpoint); + } + } - pub fn insert_default(&self, cluster: impl Into) { - self.0.insert( - DEFAULT_CLUSTER_NAME.into(), - Cluster::new_default(vec![cluster.into()]), - ); + false } - pub fn iter(&self) -> dashmap::iter::Iter { + pub fn iter(&self) -> dashmap::iter::Iter, BTreeSet> { self.0.iter() } - pub fn entry(&self, key: String) -> dashmap::mapref::entry::Entry { + pub fn entry( + &self, + key: Option, + ) -> dashmap::mapref::entry::Entry, BTreeSet> { self.0.entry(key) } - pub fn default_entry(&self, key: String) -> DashMapRefMut { - let mut entry = self.entry(key.clone()).or_default(); - entry.name.is_empty().then(|| entry.name = key); - entry + pub fn default_entry(&self) -> DashMapRefMut { + self.entry(None).or_default() } - pub fn default_cluster_mut(&self) -> DashMapRefMut { - self.default_entry(DEFAULT_CLUSTER_NAME.into()) - } - - /// Updates the locality of any endpoints which have no locality in any - /// clusters to `locality`. - pub fn update_unlocated_endpoints(&self, locality: &Locality) { - for mut entry in self.0.iter_mut() { - entry.update_locality(locality); - } - } - - pub fn localities(&self) -> impl Iterator + '_ { + pub fn endpoints(&self) -> impl Iterator + '_ { self.0 .iter() - .flat_map(|entry| entry.value().localities.clone().into_iter()) - } - - pub fn endpoints(&self) -> impl Iterator + '_ { - self.localities().flat_map(|locality| locality.endpoints) + .flat_map(|entry| entry.value().iter().cloned().collect::>()) } - pub fn merge(&self, map: Self) { - for cluster in map.iter() { - let span = tracing::info_span!("applied_cluster", cluster = cluster.name,); - let _entered = span.enter(); - - let cluster = cluster.value(); - self.default_entry(cluster.name.clone()).merge(cluster); + pub fn update_unlocated_endpoints(&self, locality: Locality) { + if let Some((_, set)) = self.0.remove(&None) { + self.0.insert(Some(locality), set); } } - pub fn contains_only_unique_endpoints(&self) -> bool { - self.endpoints() - .collect::>() - .len() - == self.endpoints().count() + pub fn merge(&self, locality: Option, endpoints: BTreeSet) { + if let Some(mut set) = self.get_mut(&locality) { + *set = endpoints; + } else { + self.insert(locality, endpoints); + } } } @@ -241,16 +178,31 @@ impl PartialEq for ClusterMap { } } +#[derive(Default, Debug, Deserialize, Serialize, PartialEq, Clone, Eq, schemars::JsonSchema)] +pub(crate) struct EndpointWithLocality { + pub endpoints: BTreeSet, + pub locality: Option, +} + +impl From<(Option, BTreeSet)> for EndpointWithLocality { + fn from((locality, endpoints): (Option, BTreeSet)) -> Self { + Self { + locality, + endpoints, + } + } +} + impl schemars::JsonSchema for ClusterMap { fn schema_name() -> String { - >::schema_name() + >::schema_name() } fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { - >::json_schema(gen) + >::json_schema(gen) } fn is_referenceable() -> bool { - >::is_referenceable() + >::is_referenceable() } } @@ -259,154 +211,75 @@ impl<'de> Deserialize<'de> for ClusterMap { where D: serde::Deserializer<'de>, { - let map = DashMap::::deserialize(deserializer)?; - - for mut entry in map.iter_mut() { - entry.name = entry.key().clone(); + let vec = Vec::::deserialize(deserializer)?; + if vec + .iter() + .map(|le| &le.locality) + .collect::>() + .len() + != vec.len() + { + return Err(serde::de::Error::custom( + "duplicate localities found in cluster map", + )); } - Ok(Self(map)) - } -} - -impl From> for ClusterMap { - fn from(value: DashMap) -> Self { - Self(value) + Ok(Self(DashMap::from_iter(vec.into_iter().map( + |EndpointWithLocality { + locality, + endpoints, + }| (locality, endpoints), + )))) } } -impl From for ClusterMap { - fn from(value: Cluster) -> Self { - Self::from([value]) - } -} - -impl From<[Cluster; N]> for ClusterMap { - fn from(value: [Cluster; N]) -> Self { - Self::from_iter(value) - } -} - -impl FromIterator for ClusterMap { - fn from_iter(iter: T) -> Self +impl Serialize for ClusterMap { + fn serialize(&self, ser: S) -> Result where - T: IntoIterator, + S: serde::Serializer, { - Self( - iter.into_iter() - .map(|cluster| (cluster.name.clone(), cluster)) - .collect(), - ) - } -} - -impl From<[(String, Cluster); N]> for ClusterMap { - fn from(value: [(String, Cluster); N]) -> Self { - Self::from_iter(value) + self.0 + .iter() + .map(|entry| EndpointWithLocality::from((entry.key().clone(), entry.value().clone()))) + .collect::>() + .serialize(ser) } } -impl FromIterator<(String, Cluster)> for ClusterMap { - fn from_iter(iter: T) -> Self - where - T: IntoIterator, - { - Self(iter.into_iter().collect()) +impl From, BTreeSet>> for ClusterMap { + fn from(value: DashMap, BTreeSet>) -> Self { + Self(value) } } -impl From for crate::xds::config::endpoint::v3::ClusterLoadAssignment { - fn from(cluster: Cluster) -> Self { +impl From<(Option, BTreeSet)> for proto::Cluster { + fn from((locality, endpoints): (Option, BTreeSet)) -> Self { Self { - cluster_name: cluster.name, - endpoints: cluster.localities.into_iter().map(From::from).collect(), - ..Self::default() + locality: locality.map(From::from), + endpoints: endpoints.iter().map(From::from).collect(), } } } -impl From<&'_ Cluster> for crate::xds::config::cluster::v3::Cluster { - fn from(cluster: &Cluster) -> Self { +impl From<(&Option, &BTreeSet)> for proto::Cluster { + fn from((locality, endpoints): (&Option, &BTreeSet)) -> Self { Self { - name: cluster.name.clone(), - load_assignment: Some(cluster.into()), - ..Self::default() + locality: locality.clone().map(From::from), + endpoints: endpoints.iter().map(From::from).collect(), } } } -impl From<&'_ Cluster> for crate::xds::config::endpoint::v3::ClusterLoadAssignment { - fn from(cluster: &Cluster) -> Self { +impl From<&'_ Endpoint> for proto::Endpoint { + fn from(endpoint: &Endpoint) -> Self { Self { - cluster_name: cluster.name.clone(), - endpoints: cluster.localities.iter().cloned().map(From::from).collect(), - ..Self::default() + host: endpoint.address.host.to_string(), + port: endpoint.address.port.into(), + metadata: Some((&endpoint.metadata).into()), } } } -impl TryFrom for Cluster { - type Error = eyre::Error; - - fn try_from( - mut cla: crate::xds::config::endpoint::v3::ClusterLoadAssignment, - ) -> Result { - use crate::xds::config::endpoint::v3::lb_endpoint; - - let localities = cla - .endpoints - .into_iter() - .map(|locality| { - let endpoints = locality - .lb_endpoints - .into_iter() - .map(|endpoint| { - let metadata = endpoint.metadata; - let endpoint = match endpoint.host_identifier { - Some(lb_endpoint::HostIdentifier::Endpoint(endpoint)) => Ok(endpoint), - Some(lb_endpoint::HostIdentifier::EndpointName(name_reference)) => { - match cla.named_endpoints.remove(&name_reference) { - Some(endpoint) => Ok(endpoint), - None => Err(eyre::eyre!( - "no endpoint found name reference {}", - name_reference - )), - } - } - None => Err(eyre::eyre!("no host found for endpoint")), - }?; - - // Extract the endpoint's address. - let address: EndpointAddress = endpoint - .address - .and_then(|address| address.address) - .ok_or_else(|| eyre::eyre!("No address provided."))? - .try_into()?; - - let endpoint = Endpoint::with_metadata( - address, - metadata - .map(crate::metadata::MetadataView::try_from) - .transpose()? - .unwrap_or_default(), - ); - Ok(endpoint) - }) - .collect::>()?; - - let locality = locality.locality.map(From::from); - - Ok(LocalityEndpoints::new(endpoints).with_locality(locality)) - }) - .collect::>()?; - - Ok(Cluster { - name: cla.cluster_name, - localities, - }) - } -} - #[cfg(test)] mod tests { use std::net::Ipv4Addr; @@ -419,50 +292,36 @@ mod tests { let de1 = Locality::region("de-1"); let mut endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into()); + let cluster1 = ClusterMap::default(); - let mut cluster1 = Cluster::new_default(vec![LocalityEndpoints::from(( - endpoint.clone(), - nl1.clone(), - ))]); - - let cluster2 = Cluster::new_default(vec![LocalityEndpoints::from(( - endpoint.clone(), - de1.clone(), - ))]); + cluster1.insert(Some(nl1.clone()), [endpoint.clone()].into()); + cluster1.merge(Some(de1.clone()), [endpoint.clone()].into()); - cluster1.merge(&cluster2); - - assert_eq!(cluster1.localities[&Some(nl1.clone())].endpoints.len(), 1); - assert!(cluster1.localities[&Some(nl1.clone())] - .endpoints + assert_eq!(cluster1.get(&Some(nl1.clone())).unwrap().len(), 1); + assert!(cluster1 + .get(&Some(nl1.clone())) + .unwrap() .contains(&endpoint)); - assert_eq!(cluster1.localities[&Some(de1.clone())].endpoints.len(), 1); - assert!(cluster1.localities[&Some(de1.clone())] - .endpoints + assert_eq!(cluster1.get(&Some(de1.clone())).unwrap().len(), 1); + assert!(cluster1 + .get(&Some(de1.clone())) + .unwrap() .contains(&endpoint)); endpoint.address.port = 8080; - let cluster3 = Cluster::new_default(vec![LocalityEndpoints::from(( - endpoint.clone(), - de1.clone(), - ))]); - cluster1.merge(&cluster3); + cluster1.merge(Some(de1.clone()), [endpoint.clone()].into()); - assert_eq!(cluster1.localities[&Some(nl1.clone())].endpoints.len(), 1); - assert_eq!(cluster1.localities[&Some(de1.clone())].endpoints.len(), 1); - assert!(cluster1.localities[&Some(de1.clone())] - .endpoints + assert_eq!(cluster1.get(&Some(nl1.clone())).unwrap().len(), 1); + assert_eq!(cluster1.get(&Some(de1.clone())).unwrap().len(), 1); + assert!(cluster1 + .get(&Some(de1.clone())) + .unwrap() .contains(&endpoint)); - let cluster4 = Cluster::new_default(vec![LocalityEndpoints { - locality: Some(de1.clone()), - endpoints: <_>::default(), - }]); - - cluster1.merge(&cluster4); + cluster1.merge(Some(de1.clone()), <_>::default()); - assert_eq!(cluster1.localities[&Some(nl1)].endpoints.len(), 1); - assert!(cluster1.localities[&Some(de1)].endpoints.is_empty()); + assert_eq!(cluster1.get(&Some(nl1.clone())).unwrap().len(), 1); + assert!(cluster1.get(&Some(de1.clone())).unwrap().is_empty()); } } diff --git a/src/config.rs b/src/config.rs index b524703455..43853dfb67 100644 --- a/src/config.rs +++ b/src/config.rs @@ -30,12 +30,11 @@ mod slot; pub mod watch; use crate::{ - cluster::{Cluster, ClusterMap}, + cluster::ClusterMap, filters::prelude::*, xds::{ - config::{endpoint::v3::ClusterLoadAssignment, listener::v3::Listener}, - service::discovery::v3::DiscoveryResponse, - Resource, ResourceType, + config::listener::v3::Listener, service::discovery::v3::DiscoveryResponse, Resource, + ResourceType, }, }; @@ -89,16 +88,16 @@ impl Config { replace_if_present!(filters, id); - if let Some(new_clusters) = map - .get("clusters") - .map(|value| serde_json::from_value(value.clone())) - .transpose()? - { - tracing::debug!(?new_clusters, old_clusters=?self.clusters, "merging new clusters"); + if let Some(value) = map.get("clusters").cloned() { + tracing::debug!(%value, "replacing clusters"); + let value: ClusterMap = serde_json::from_value(value)?; self.clusters.modify(|clusters| { - clusters.merge(new_clusters); + for cluster in value.iter() { + clusters.merge(cluster.key().clone(), cluster.value().clone()); + } + if let Some(locality) = locality { - clusters.update_unlocated_endpoints(&locality); + clusters.update_unlocated_endpoints(locality); } }); } @@ -116,14 +115,6 @@ impl Config { ) -> Result { let mut resources = Vec::new(); match resource_type { - ResourceType::Endpoint => { - for entry in self.clusters.read().iter() { - resources.push( - resource_type - .encode_to_any(&ClusterLoadAssignment::try_from(entry.value())?)?, - ); - } - } ResourceType::Listener => { resources.push(resource_type.encode_to_any(&Listener { filter_chains: vec![(&*self.filters.load()).try_into()?], @@ -131,31 +122,28 @@ impl Config { })?); } ResourceType::Cluster => { - let clusters: Vec<_> = if names.is_empty() { - self.clusters - .read() - .iter() - .map(|entry| entry.value().clone()) - .collect() + if names.is_empty() { + for cluster in self.clusters.read().iter() { + resources.push(resource_type.encode_to_any( + &crate::cluster::proto::Cluster::try_from(( + cluster.key(), + cluster.value(), + ))?, + )?); + } } else { - names - .iter() - .filter_map(|name| { - self.clusters - .read() - .get(name) - .map(|entry| entry.value().clone()) - }) - .collect() + for locality in names.iter().filter_map(|name| name.parse().ok()) { + if let Some(cluster) = self.clusters.read().get(&Some(locality)) { + resources.push(resource_type.encode_to_any( + &crate::cluster::proto::Cluster::try_from(( + cluster.key(), + cluster.value(), + ))?, + )?); + } + } }; - - for cluster in clusters { - resources.push(resource_type.encode_to_any( - &crate::xds::config::cluster::v3::Cluster::try_from(&cluster)?, - )?); - } } - resource => return Err(eyre::eyre!("Unsupported resource {}", resource.type_url())), }; Ok(DiscoveryResponse { @@ -169,18 +157,7 @@ impl Config { pub fn apply(&self, response: &Resource) -> crate::Result<()> { tracing::trace!(resource=?response, "applying resource"); - let apply_cluster = |cluster: Cluster| { - self.clusters - .write() - .default_entry(cluster.name.clone()) - .merge(&cluster); - }; - match response { - Resource::Endpoint(cla) => { - let cluster = Cluster::try_from(*cla.clone()).unwrap(); - (apply_cluster)(cluster) - } Resource::Listener(listener) => { let chain = listener .filter_chains @@ -193,12 +170,15 @@ impl Config { self.filters.store(Arc::new(chain.try_into()?)); } Resource::Cluster(cluster) => { - cluster - .load_assignment - .clone() - .map(Cluster::try_from) - .transpose()? - .map(apply_cluster); + self.clusters.write().merge( + cluster.locality.clone().map(From::from), + cluster + .endpoints + .iter() + .cloned() + .map(crate::endpoint::Endpoint::try_from) + .collect::>()?, + ); } } @@ -209,7 +189,7 @@ impl Config { pub fn apply_metrics(&self) { let clusters = self.clusters.read(); - crate::cluster::active_clusters().set(clusters.localities().count() as i64); + crate::cluster::active_clusters().set(clusters.len() as i64); crate::cluster::active_endpoints().set(clusters.endpoints().count() as i64); } } @@ -338,7 +318,7 @@ mod tests { fn deserialise_client() { let config = Config::default(); config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::new("127.0.0.1:25999".parse().unwrap())]) + clusters.insert_default([Endpoint::new("127.0.0.1:25999".parse().unwrap())].into()) }); let _ = serde_yaml::to_string(&config).unwrap(); @@ -348,10 +328,13 @@ mod tests { fn deserialise_server() { let config = Config::default(); config.clusters.modify(|clusters| { - clusters.insert_default(vec![ - Endpoint::new("127.0.0.1:26000".parse().unwrap()), - Endpoint::new("127.0.0.1:26001".parse().unwrap()), - ]) + clusters.insert_default( + [ + Endpoint::new("127.0.0.1:26000".parse().unwrap()), + Endpoint::new("127.0.0.1:26001".parse().unwrap()), + ] + .into(), + ) }); let _ = serde_yaml::to_string(&config).unwrap(); @@ -361,7 +344,7 @@ mod tests { fn parse_default_values() { let config: Config = serde_json::from_value(json!({ "version": "v1alpha1", - "clusters":{} + "clusters":[] })) .unwrap(); @@ -384,24 +367,20 @@ id: server-proxy fn parse_client() { let config: Config = serde_json::from_value(json!({ "version": "v1alpha1", - "clusters":{ - "default":{ - "localities": [{ - "endpoints": [{ - "address": "127.0.0.1:25999" - }], - }] - } - } + "clusters": [{ + "endpoints": [{ + "address": "127.0.0.1:25999" + }], + }] })) .unwrap(); let value = config.clusters.read(); assert_eq!( &*value, - &ClusterMap::new_with_default_cluster(vec![Endpoint::new( - (std::net::Ipv4Addr::LOCALHOST, 25999).into(), - )]) + &ClusterMap::new_default( + [Endpoint::new((std::net::Ipv4Addr::LOCALHOST, 25999).into(),)].into() + ) ) } @@ -409,53 +388,52 @@ id: server-proxy fn parse_server() { let config: Config = serde_json::from_value(json!({ "version": "v1alpha1", - "clusters":{ - "default":{ - "localities": [{ - "endpoints": [ - { - "address" : "127.0.0.1:26000", - "metadata": { - "quilkin.dev": { - "tokens": ["MXg3aWp5Ng==", "OGdqM3YyaQ=="], - } - } - }, - { - "address" : "127.0.0.1:26001", - "metadata": { - "quilkin.dev": { - "tokens": ["bmt1eTcweA=="], - } - } + "clusters": [{ + "endpoints": [ + { + "address" : "127.0.0.1:26000", + "metadata": { + "quilkin.dev": { + "tokens": ["MXg3aWp5Ng==", "OGdqM3YyaQ=="], } - ], - }] - } - } + } + }, + { + "address" : "127.0.0.1:26001", + "metadata": { + "quilkin.dev": { + "tokens": ["bmt1eTcweA=="], + } + } + } + ], + }] })) .unwrap_or_default(); let value = config.clusters.read(); assert_eq!( &*value, - &ClusterMap::new_with_default_cluster(vec![ - Endpoint::with_metadata( - "127.0.0.1:26000".parse().unwrap(), - Metadata { - tokens: vec!["1x7ijy6", "8gj3v2i"] - .into_iter() - .map(From::from) - .collect(), - }, - ), - Endpoint::with_metadata( - "127.0.0.1:26001".parse().unwrap(), - Metadata { - tokens: vec!["nkuy70x"].into_iter().map(From::from).collect(), - }, - ), - ]) + &ClusterMap::new_default( + [ + Endpoint::with_metadata( + "127.0.0.1:26000".parse().unwrap(), + Metadata { + tokens: vec!["1x7ijy6", "8gj3v2i"] + .into_iter() + .map(From::from) + .collect(), + }, + ), + Endpoint::with_metadata( + "127.0.0.1:26001".parse().unwrap(), + Metadata { + tokens: vec!["nkuy70x"].into_iter().map(From::from).collect(), + }, + ), + ] + .into() + ) ); } @@ -466,10 +444,8 @@ id: server-proxy version: v1alpha1 foo: bar clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 ", " # proxy @@ -478,10 +454,8 @@ foo: bar id: client-proxy port: 7000 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 + - endpoints: + - address: 127.0.0.1:7001 ", " # admin @@ -494,12 +468,10 @@ admin: # static.endpoints version: v1alpha1 clusters: - default: - localities: - - endpoints: - - address: 127.0.0.1:7001 - connection_ids: - - Mxg3aWp5Ng== + - endpoints: + - address: 127.0.0.1:7001 + connection_ids: + - Mxg3aWp5Ng== ", " # static.filters diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index 983e4041bb..74c4d09caf 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -1,6 +1,6 @@ pub mod agones; -use std::sync::Arc; +use std::{collections::BTreeSet, sync::Arc}; use futures::Stream; use k8s_openapi::api::core::v1::ConfigMap; @@ -8,7 +8,7 @@ use kube::runtime::watcher::Event; use agones::GameServer; -use crate::endpoint::{Endpoint, Locality, LocalityEndpoints}; +use crate::endpoint::{Endpoint, Locality}; pub fn update_filters_from_configmap( client: kube::Client, @@ -109,23 +109,16 @@ pub fn update_endpoints_from_gameservers( } }; tracing::trace!(endpoint=%serde_json::to_value(&endpoint).unwrap(), "Adding endpoint"); - match &locality { - Some(locality) => config - .clusters - .write() - .default_cluster_mut() - .insert((endpoint, locality.clone())), - None => config - .clusters - .write() - .default_cluster_mut() - .insert(endpoint), - }; + config.clusters.write() + .entry(locality.clone()) + .or_default() + .value_mut() + .insert(endpoint); tracing::trace!(clusters=%serde_json::to_value(&config.clusters).unwrap(), "current clusters"); } Event::Restarted(servers) => { - let servers: Vec<_> = servers + let servers: BTreeSet<_> = servers .into_iter() .filter(GameServer::is_allocated) .map(Endpoint::try_from) @@ -139,9 +132,13 @@ pub fn update_endpoints_from_gameservers( } }) .collect(); - let endpoints = LocalityEndpoints::from((servers, locality.clone())); - tracing::trace!(?endpoints, "Restarting with endpoints"); - config.clusters.write().insert_default(endpoints); + + tracing::trace!( + endpoints=%serde_json::to_value(servers.clone()).unwrap(), + "Restarting with endpoints" + ); + + config.clusters.write().merge(locality.clone(), servers); } Event::Deleted(server) => { @@ -153,7 +150,7 @@ pub fn update_endpoints_from_gameservers( }) }; - if found.is_none() { + if !found { tracing::warn!( endpoint=%serde_json::to_value(server.endpoint()).unwrap(), name=%serde_json::to_value(server.metadata.name).unwrap(), diff --git a/src/config/providers/k8s/agones.rs b/src/config/providers/k8s/agones.rs index 80db134b15..59e1ebd778 100644 --- a/src/config/providers/k8s/agones.rs +++ b/src/config/providers/k8s/agones.rs @@ -299,18 +299,6 @@ impl TryFrom for Endpoint { } } -impl TryFrom> for crate::endpoint::LocalityEndpoints { - type Error = tonic::Status; - - fn try_from(servers: Vec) -> Result { - Ok(servers - .into_iter() - .map(Endpoint::try_from) - .collect::, _>>()? - .into()) - } -} - #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct Health { /// Whether health checking is disabled or not diff --git a/src/config/watch/fs.rs b/src/config/watch/fs.rs index 4505c8cffe..4d699cc1b1 100644 --- a/src/config/watch/fs.rs +++ b/src/config/watch/fs.rs @@ -87,14 +87,15 @@ mod tests { tokio::time::sleep(std::time::Duration::from_millis(100)).await; source.clusters.modify(|clusters| { - clusters - .default_cluster_mut() - .insert(crate::endpoint::Endpoint::with_metadata( + clusters.insert_default( + [crate::endpoint::Endpoint::with_metadata( (std::net::Ipv4Addr::LOCALHOST, 4321).into(), crate::endpoint::Metadata { tokens: <_>::from([Vec::from(*b"1x7ijy6")]), }, - )); + )] + .into(), + ); }); tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/src/endpoint.rs b/src/endpoint.rs index 9bb447725a..c681d0336a 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -21,12 +21,7 @@ mod locality; use serde::{Deserialize, Serialize}; -use crate::xds::config::endpoint::v3::{lb_endpoint::HostIdentifier, Endpoint as EnvoyEndpoint}; - -pub use self::{ - address::EndpointAddress, - locality::{Locality, LocalityEndpoints, LocalitySet}, -}; +pub use self::{address::EndpointAddress, locality::Locality}; pub type EndpointMetadata = crate::metadata::MetadataView; @@ -80,38 +75,32 @@ impl std::str::FromStr for Endpoint { } } -impl From for crate::xds::config::endpoint::v3::LbEndpoint { +impl From for crate::cluster::proto::Endpoint { fn from(endpoint: Endpoint) -> Self { Self { - host_identifier: Some(HostIdentifier::Endpoint(EnvoyEndpoint { - address: Some(endpoint.address.into()), - ..<_>::default() - })), + host: endpoint.address.host.to_string(), + port: endpoint.address.port.into(), metadata: Some(endpoint.metadata.into()), - ..<_>::default() } } } -impl TryFrom for Endpoint { +impl TryFrom for Endpoint { type Error = eyre::Error; - fn try_from( - endpoint: crate::xds::config::endpoint::v3::LbEndpoint, - ) -> Result { - let address = match endpoint.host_identifier { - Some(HostIdentifier::Endpoint(endpoint)) => EndpointAddress::try_from(endpoint)?, - _ => return Err(eyre::eyre!("Endpoint host identifier not supported")), - }; + fn try_from(endpoint: crate::cluster::proto::Endpoint) -> Result { + let host: address::AddressKind = endpoint.host.parse()?; + if endpoint.port > u16::MAX as u32 { + return Err(eyre::eyre!("invalid endpoint port")); + } Ok(Self { - address, + address: (host, endpoint.port as u16).into(), metadata: endpoint .metadata - .map(crate::metadata::MetadataView::try_from) + .map(TryFrom::try_from) .transpose()? .unwrap_or_default(), - ..<_>::default() }) } } @@ -140,6 +129,12 @@ impl PartialOrd for Endpoint { } } +impl std::hash::Hash for Endpoint { + fn hash(&self, state: &mut H) { + self.address.hash(state); + } +} + /// Metadata specific to endpoints. #[derive( Default, Debug, Deserialize, Serialize, PartialEq, Clone, PartialOrd, Eq, schemars::JsonSchema, @@ -153,6 +148,15 @@ pub struct Metadata { pub tokens: base64_set::Set, } +impl From for crate::metadata::MetadataView { + fn from(metadata: Metadata) -> Self { + Self { + known: metadata, + ..<_>::default() + } + } +} + impl From for prost_types::Struct { fn from(metadata: Metadata) -> Self { let tokens = prost_types::Value { diff --git a/src/endpoint/address.rs b/src/endpoint/address.rs index b23088b672..2c6fc1f3ab 100644 --- a/src/endpoint/address.rs +++ b/src/endpoint/address.rs @@ -206,6 +206,12 @@ impl From<(String, u16)> for EndpointAddress { } } +impl From<(AddressKind, u16)> for EndpointAddress { + fn from((host, port): (AddressKind, u16)) -> Self { + Self { host, port } + } +} + impl From for EnvoySocketAddress { fn from(address: EndpointAddress) -> Self { use crate::xds::config::core::v3::socket_address::{PortSpecifier, Protocol}; diff --git a/src/endpoint/locality.rs b/src/endpoint/locality.rs index e2bfb553ef..6c211c256f 100644 --- a/src/endpoint/locality.rs +++ b/src/endpoint/locality.rs @@ -14,13 +14,8 @@ * limitations under the License. */ -use std::collections::BTreeSet; - use serde::{Deserialize, Serialize}; -use super::Endpoint; -use crate::xds::config::endpoint::v3::LocalityLbEndpoints; - /// The location of an [`Endpoint`]. #[derive( Clone, @@ -94,80 +89,40 @@ impl Locality { } } -/// A set of endpoints optionally grouped by a [`Locality`]. -#[derive( - Clone, - Default, - Debug, - Eq, - PartialEq, - PartialOrd, - Ord, - Deserialize, - Serialize, - schemars::JsonSchema, -)] -pub struct LocalityEndpoints { - pub locality: Option, - pub endpoints: BTreeSet, -} - -impl LocalityEndpoints { - /// Creates a new set of endpoints with no [`Locality`]. - pub fn new(endpoints: BTreeSet) -> Self { - Self::from(endpoints) - } - - /// Adds a [`Locality`] to the set of endpoints. - pub fn with_locality(mut self, locality: impl Into>) -> Self { - self.locality = locality.into(); - self - } - - /// Removes an endpoint. - pub fn remove(&mut self, endpoint: &Endpoint) { - self.endpoints.remove(endpoint); - } -} - -impl From for LocalityEndpoints { - fn from(endpoint: Endpoint) -> Self { - Self { - endpoints: [endpoint].into_iter().collect(), - ..Self::default() - } +impl std::fmt::Display for Locality { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.colon_separated_string().fmt(f) } } -impl From> for LocalityEndpoints { - fn from(endpoints: Vec) -> Self { - Self { - endpoints: endpoints.into_iter().collect(), - ..Self::default() - } - } -} +impl std::str::FromStr for Locality { + type Err = eyre::Error; -impl From> for LocalityEndpoints { - fn from(endpoints: Vec) -> Self { - Self { - endpoints: endpoints.into_iter().map(From::from).collect(), - ..Self::default() - } - } -} + fn from_str(input: &str) -> Result { + let vec: Vec<_> = input.split(':').collect(); -impl From> for LocalityEndpoints { - fn from(endpoints: BTreeSet) -> Self { - Self { - endpoints, - ..Self::default() - } + Ok(match vec.len() { + 1 => Self { + region: vec[0].into(), + ..<_>::default() + }, + 2 => Self { + region: vec[0].into(), + zone: vec[1].into(), + ..<_>::default() + }, + 3 => Self { + region: vec[0].into(), + zone: vec[1].into(), + sub_zone: vec[2].into(), + }, + _ => return Err(eyre::eyre!("invalid locality identifier")), + }) } } -impl From for Locality { - fn from(value: crate::xds::config::core::v3::Locality) -> Self { +impl From for Locality { + fn from(value: crate::cluster::proto::Locality) -> Self { Self { region: value.region, zone: value.zone, @@ -176,7 +131,7 @@ impl From for Locality { } } -impl From for crate::xds::config::core::v3::Locality { +impl From for crate::cluster::proto::Locality { fn from(value: Locality) -> Self { Self { region: value.region, @@ -185,203 +140,3 @@ impl From for crate::xds::config::core::v3::Locality { } } } - -impl TryFrom for LocalityEndpoints { - type Error = >::Error; - fn try_from(value: LocalityLbEndpoints) -> Result { - Ok(Self { - endpoints: value - .lb_endpoints - .into_iter() - .map(TryFrom::try_from) - .collect::>()?, - locality: value.locality.map(From::from), - }) - } -} - -impl From<(Vec, Option)> for LocalityEndpoints { - fn from((endpoints, locality): (Vec, Option)) -> Self { - Self::from(endpoints).with_locality(locality) - } -} - -impl From<(Endpoint, Locality)> for LocalityEndpoints { - fn from((endpoint, locality): (Endpoint, Locality)) -> Self { - Self::from(endpoint).with_locality(locality) - } -} - -impl From<(Endpoint, Option)> for LocalityEndpoints { - fn from((endpoint, locality): (Endpoint, Option)) -> Self { - Self::from(endpoint).with_locality(locality) - } -} - -impl From for LocalityLbEndpoints { - fn from(value: LocalityEndpoints) -> Self { - Self { - lb_endpoints: value.endpoints.into_iter().map(From::from).collect(), - locality: value.locality.map(From::from), - ..Self::default() - } - } -} - -/// Set around [`LocalityEndpoints`] to ensure that all unique localities are -/// different entries. Any duplicate localities provided are merged. -#[derive(Clone, Default, Debug, Eq, PartialEq)] -pub struct LocalitySet(std::collections::HashMap, LocalityEndpoints>); - -impl LocalitySet { - /// Creates a new set from the provided localities. - pub fn new(set: Vec) -> Self { - Self::from_iter(set) - } - - /// Inserts a new locality of endpoints. - pub fn insert(&mut self, mut locality: LocalityEndpoints) { - let entry = self.0.entry(locality.locality.clone()).or_default(); - entry.locality = locality.locality; - entry.endpoints.append(&mut locality.endpoints); - } - - /// Returns a reference to endpoints associated with the specified locality. - pub fn get(&self, key: &Option) -> Option<&LocalityEndpoints> { - self.0.get(key) - } - - /// Returns a mutable reference to endpoints associated with the specified locality. - pub fn get_mut(&mut self, key: &Option) -> Option<&mut LocalityEndpoints> { - self.0.get_mut(key) - } - - /// Removes the specified locality or all endpoints with no locality. - pub fn remove(&mut self, key: &Option) -> Option { - self.0.remove(key) - } - - /// Removes all localities. - pub fn clear(&mut self) { - self.0.clear(); - } - - /// Returns an iterator over the set of localities. - pub fn iter(&self) -> impl Iterator + '_ { - self.0.values() - } - - /// Returns a mutable iterator over the set of localities. - pub fn iter_mut(&mut self) -> impl Iterator + '_ { - self.0.values_mut() - } - - pub fn merge(&mut self, cluster: &Self) { - for (key, value) in &cluster.0 { - if tracing::enabled!(tracing::Level::INFO) { - let span = tracing::info_span!( - "applied_locality", - locality = &*key - .as_ref() - .map(|locality| locality.colon_separated_string()) - .unwrap_or_else(|| String::from("")) - ); - - let _entered = span.enter(); - if value.endpoints.is_empty() { - tracing::info!("removing all endpoints"); - } else if let Some(original_endpoints) = self.0.get(key) { - for endpoint in &original_endpoints.endpoints { - if !value.endpoints.contains(endpoint) { - tracing::info!(%endpoint.address, ?endpoint.metadata.known.tokens, "removing endpoint"); - } - } - } - - for endpoint in &value.endpoints { - tracing::info!(%endpoint.address, ?endpoint.metadata.known.tokens, "applying endpoint"); - } - } - - let entry = self.0.entry(key.clone()).or_default(); - *entry = value.clone(); - } - } -} - -impl Serialize for LocalitySet { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - self.0.values().collect::>().serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for LocalitySet { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - >::deserialize(deserializer).map(Self::new) - } -} - -impl schemars::JsonSchema for LocalitySet { - fn schema_name() -> String { - >::schema_name() - } - fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { - >::json_schema(gen) - } - - fn is_referenceable() -> bool { - >::is_referenceable() - } -} - -impl std::ops::Index<&Option> for LocalitySet { - type Output = LocalityEndpoints; - - fn index(&self, index: &Option) -> &Self::Output { - self.get(index).unwrap() - } -} - -impl std::ops::IndexMut<&Option> for LocalitySet { - fn index_mut(&mut self, index: &Option) -> &mut Self::Output { - self.get_mut(index).unwrap() - } -} - -impl From for LocalitySet -where - T: Into>, -{ - fn from(value: T) -> Self { - Self::new(value.into()) - } -} - -impl FromIterator for LocalitySet { - fn from_iter>(iter: I) -> Self { - let mut map = Self(<_>::default()); - - for locality in iter { - map.insert(locality); - } - - map - } -} - -impl IntoIterator for LocalitySet { - type Item = LocalityEndpoints; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - // Have to convert to vec to avoid `Values`'s lifetime parameter. - // Remove once GAT's are stable. - self.0.into_values().collect::>().into_iter() - } -} diff --git a/src/metadata.rs b/src/metadata.rs index 6aba492621..d89795fef0 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -23,8 +23,6 @@ pub mod build { use std::{collections::HashMap, convert::TryFrom}; -use crate::xds::config::core::v3::Metadata as ProtoMetadata; - pub use symbol::{Key, Reference, Symbol}; /// Shared state between [`Filter`][crate::filters::Filter]s during processing for a single packet. @@ -243,67 +241,67 @@ impl MetadataView { } } -// This impl means that any `T` that we can try convert from a protobuf struct -// at run-time can be constructed statically without going through -// conversion first. -impl From for MetadataView -where - T: TryFrom + Default, -{ - fn from(known: T) -> Self { - Self { - known, - unknown: <_>::default(), - } +impl + Default> From> for prost_types::Struct { + fn from(metadata: MetadataView) -> Self { + let mut prost_struct = prost_types::Struct::default(); + prost_struct.fields.insert( + String::from("quilkin.dev"), + crate::prost::value_from_struct(metadata.known.into()), + ); + + prost_struct.fields.extend( + metadata + .unknown + .into_iter() + .map(|(k, v)| (k, crate::prost::from_json(v))), + ); + + prost_struct } } -impl + Default> From> for ProtoMetadata { - fn from(metadata: MetadataView) -> Self { - let mut filter_metadata = HashMap::new(); - filter_metadata.insert(String::from("quilkin.dev"), metadata.known.into()); - filter_metadata.extend( +impl + Default + Clone> From<&'_ MetadataView> + for prost_types::Struct +{ + fn from(metadata: &MetadataView) -> Self { + let mut prost_struct = prost_types::Struct::default(); + prost_struct.fields.insert( + String::from("quilkin.dev"), + crate::prost::value_from_struct(metadata.known.clone().into()), + ); + + prost_struct.fields.extend( metadata .unknown - .into_iter() - .filter_map(|(k, v)| crate::prost::struct_from_json(v).map(|v| (k, v))), + .iter() + .map(|(k, v)| (k.clone(), crate::prost::from_json(v.clone()))), ); - Self { - filter_metadata, - ..<_>::default() - } + prost_struct } } -impl TryFrom for MetadataView +impl TryFrom for MetadataView where + E: Send + Sync + std::error::Error + 'static, T: TryFrom + Default, { - type Error = E; + type Error = eyre::Error; - fn try_from(mut value: ProtoMetadata) -> Result { + fn try_from(mut value: prost_types::Struct) -> Result { let known = value - .filter_metadata + .fields .remove(KEY) - .map(T::try_from) + .map(|value| match value.kind { + Some(prost_types::value::Kind::StructValue(value)) => { + T::try_from(value).map_err(From::from) + } + _ => Err(eyre::eyre!("wrong type for known metadata")), + }) .transpose()? .unwrap_or_default(); - let value = prost_types::value::Kind::StructValue(prost_types::Struct { - fields: value - .filter_metadata - .into_iter() - .map(|(k, v)| { - ( - k, - prost_types::Value { - kind: Some(prost_types::value::Kind::StructValue(v)), - }, - ) - }) - .collect(), - }); + let value = prost_types::value::Kind::StructValue(value); Ok(Self { known, diff --git a/src/prost.rs b/src/prost.rs index 7344b5516f..3fd47159c1 100644 --- a/src/prost.rs +++ b/src/prost.rs @@ -55,12 +55,9 @@ pub fn value_from_kind(kind: Kind) -> Value { } } -pub fn struct_from_json(value: Value) -> Option { - match from_json(value) { - prost_types::Value { - kind: Some(Kind::StructValue(r#struct)), - } => Some(r#struct), - _ => None, +pub fn value_from_struct(value: prost_types::Struct) -> prost_types::Value { + prost_types::Value { + kind: Some(prost_types::value::Kind::StructValue(value)), } } diff --git a/src/test_utils.rs b/src/test_utils.rs index 21023d2361..b94d1a1d94 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -24,9 +24,8 @@ use tokio::{ use tracing_subscriber::EnvFilter; use crate::{ - cluster::Cluster, config::Config, - endpoint::{Endpoint, EndpointAddress, LocalityEndpoints}, + endpoint::{Endpoint, EndpointAddress}, filters::{prelude::*, FilterRegistry}, metadata::Value, }; @@ -295,19 +294,14 @@ pub async fn create_socket() -> UdpSocket { pub fn config_with_dummy_endpoint() -> Config { let config = Config::default(); - config.clusters.write().insert(Cluster { - name: "default".into(), - localities: vec![LocalityEndpoints { - locality: None, - endpoints: [Endpoint { - address: (std::net::Ipv4Addr::LOCALHOST, 8080).into(), - ..<_>::default() - }] - .into(), + config.clusters.write().insert( + None, + [Endpoint { + address: (std::net::Ipv4Addr::LOCALHOST, 8080).into(), + ..<_>::default() }] - .into_iter() - .collect(), - }); + .into(), + ); config } diff --git a/src/xds.rs b/src/xds.rs index 542cd2c63e..e1292b0132 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -160,17 +160,14 @@ mod tests { addr.metadata.known.tokens.insert(token.into()); addr }; - let localities = crate::endpoint::LocalityEndpoints::from(address.clone()); + let clusters = crate::cluster::ClusterMap::default(); + clusters.insert_default([address].into()); let xds_port = crate::test_utils::available_addr().await.port(); let xds_config: Arc = serde_json::from_value(serde_json::json!({ "version": "v1alpha1", "id": "test-proxy", - "clusters": { - "default": { - "localities": [localities] - } - }, + "clusters": clusters, })) .map(Arc::new) .unwrap(); @@ -312,8 +309,8 @@ mod tests { let local_addr: crate::endpoint::EndpointAddress = socket.local_addr().unwrap().into(); config.clusters.modify(|clusters| { - let mut cluster = clusters.default_cluster_mut(); - cluster.localities.clear(); + let mut cluster = clusters.default_entry(); + cluster.clear(); cluster.insert(Endpoint::new(local_addr.clone())); }); tokio::time::sleep(std::time::Duration::from_millis(500)).await; @@ -348,7 +345,7 @@ mod tests { .read() .get_default() .unwrap() - .endpoints() + .iter() .next() .unwrap() .address diff --git a/src/xds/resource.rs b/src/xds/resource.rs index b0690d028d..235dd4bc46 100644 --- a/src/xds/resource.rs +++ b/src/xds/resource.rs @@ -16,9 +16,7 @@ use prost::Message; -use crate::xds::config::{ - cluster::v3::Cluster, endpoint::v3::ClusterLoadAssignment, listener::v3::Listener, -}; +use crate::xds::config::listener::v3::Listener; pub type ResourceMap = enum_map::EnumMap; @@ -34,38 +32,32 @@ macro_rules! type_urls { type_urls! { "type.googleapis.com": { - CLUSTER_TYPE = "envoy.config.cluster.v3.Cluster", - ENDPOINT_TYPE = "envoy.config.endpoint.v3.ClusterLoadAssignment", - EXTENSION_CONFIG_TYPE = "envoy.config.core.v3.TypedExtensionConfig", + CLUSTER_TYPE = "quilkin.config.v1alpha1.Cluster", LISTENER_TYPE = "envoy.config.listener.v3.Listener", - ROUTE_TYPE = "envoy.config.route.v3.RouteConfiguration", - RUNTIME_TYPE = "envoy.service.runtime.v3.Runtime", - SCOPED_ROUTE_TYPE = "envoy.config.route.v3.ScopedRouteConfiguration", - SECRET_TYPE = "envoy.extensions.transport_sockets.tls.v3.Secret", - VIRTUAL_HOST_TYPE = "envoy.config.route.v3.VirtualHost", } } #[derive(Clone, Debug)] pub enum Resource { - Cluster(Box), - Endpoint(Box), + Cluster(Box), Listener(Box), } impl Resource { - pub fn name(&self) -> &str { + pub fn name(&self) -> String { match self { - Self::Endpoint(endpoint) => &endpoint.cluster_name, - Self::Cluster(cluster) => &cluster.name, - Self::Listener(listener) => &listener.name, + Self::Cluster(cluster) => cluster + .locality + .clone() + .map(|locality| crate::endpoint::Locality::from(locality).to_string()) + .unwrap_or_default(), + Self::Listener(listener) => listener.name.to_string(), } } pub fn resource_type(&self) -> ResourceType { match self { Self::Cluster(_) => ResourceType::Cluster, - Self::Endpoint(_) => ResourceType::Endpoint, Self::Listener(_) => ResourceType::Listener, } } @@ -81,7 +73,6 @@ impl TryFrom for Resource { fn try_from(any: prost_types::Any) -> Result { Ok(match &*any.type_url { CLUSTER_TYPE => Resource::Cluster(<_>::decode(&*any.value)?), - ENDPOINT_TYPE => Resource::Endpoint(<_>::decode(&*any.value)?), LISTENER_TYPE => Resource::Listener(<_>::decode(&*any.value)?), url => return Err(UnknownResourceType(url.into()).into()), }) @@ -91,41 +82,17 @@ impl TryFrom for Resource { #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, enum_map::Enum)] pub enum ResourceType { Cluster, - Endpoint, - ExtensionConfig, Listener, - Route, - Runtime, - ScopedRoute, - Secret, - VirtualHost, } impl ResourceType { - pub const VARIANTS: &'static [Self] = &[ - Self::Cluster, - Self::Endpoint, - Self::ExtensionConfig, - Self::Listener, - Self::Route, - Self::Runtime, - Self::ScopedRoute, - Self::Secret, - Self::VirtualHost, - ]; + pub const VARIANTS: &'static [Self] = &[Self::Cluster, Self::Listener]; /// Returns the corresponding type URL for the response type. pub const fn type_url(&self) -> &'static str { match self { Self::Cluster => CLUSTER_TYPE, - Self::Endpoint => ENDPOINT_TYPE, - Self::ExtensionConfig => EXTENSION_CONFIG_TYPE, Self::Listener => LISTENER_TYPE, - Self::Route => ROUTE_TYPE, - Self::Runtime => RUNTIME_TYPE, - Self::ScopedRoute => SCOPED_ROUTE_TYPE, - Self::Secret => SECRET_TYPE, - Self::VirtualHost => VIRTUAL_HOST_TYPE, } } @@ -152,14 +119,7 @@ impl TryFrom<&'_ str> for ResourceType { fn try_from(url: &str) -> Result { Ok(match url { CLUSTER_TYPE => Self::Cluster, - ENDPOINT_TYPE => Self::Endpoint, - EXTENSION_CONFIG_TYPE => Self::ExtensionConfig, LISTENER_TYPE => Self::Listener, - ROUTE_TYPE => Self::Route, - RUNTIME_TYPE => Self::Runtime, - SCOPED_ROUTE_TYPE => Self::ScopedRoute, - SECRET_TYPE => Self::Secret, - VIRTUAL_HOST_TYPE => Self::VirtualHost, unknown => return Err(UnknownResourceType(unknown.to_owned())), }) } diff --git a/src/xds/server.rs b/src/xds/server.rs index b4afb6bcc2..6d08851c0d 100644 --- a/src/xds/server.rs +++ b/src/xds/server.rs @@ -107,7 +107,6 @@ impl ControlPlane { if let Err(error) = watcher.changed().await { tracing::error!(%error, "error watching changes"); } - this.push_update(ResourceType::Endpoint); this.push_update(ResourceType::Cluster); } } @@ -371,7 +370,7 @@ mod tests { #[tokio::test] async fn valid_response() { - const RESOURCE: ResourceType = ResourceType::Endpoint; + const RESOURCE: ResourceType = ResourceType::Cluster; const LISTENER_TYPE: ResourceType = ResourceType::Listener; let mut response = DiscoveryResponse { diff --git a/tests/capture.rs b/tests/capture.rs index 46d4e5494f..e69041844c 100644 --- a/tests/capture.rs +++ b/tests/capture.rs @@ -62,15 +62,18 @@ async fn token_router() { ); server_config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::with_metadata( - echo.clone(), - serde_json::from_value::>(serde_json::json!({ - "quilkin.dev": { - "tokens": ["YWJj"] - } - })) - .unwrap(), - )]) + clusters.insert_default( + [Endpoint::with_metadata( + echo.clone(), + serde_json::from_value::>(serde_json::json!({ + "quilkin.dev": { + "tokens": ["YWJj"] + } + })) + .unwrap(), + )] + .into(), + ) }); t.run_server(server_config, server_proxy, None); diff --git a/tests/compress.rs b/tests/compress.rs index d4f0e71004..1a0f98227c 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -38,7 +38,7 @@ on_write: COMPRESS let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { name: Compress::factory().name().into(), @@ -64,7 +64,7 @@ on_write: DECOMPRESS let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(server_addr.into())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(server_addr.into())].into())); client_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { name: Compress::factory().name().into(), diff --git a/tests/concatenate_bytes.rs b/tests/concatenate_bytes.rs index dfa5424774..6f09dccd38 100644 --- a/tests/concatenate_bytes.rs +++ b/tests/concatenate_bytes.rs @@ -42,7 +42,7 @@ bytes: YWJj #abc let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { name: ConcatenateBytes::factory().name().into(), diff --git a/tests/filter_order.rs b/tests/filter_order.rs index 3d4ccbfb78..b6a3b67f0b 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -61,7 +61,7 @@ on_write: DECOMPRESS let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![ Filter { diff --git a/tests/filters.rs b/tests/filters.rs index 3a5fcfc45b..ea74e2f2ce 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -54,7 +54,7 @@ async fn test_filter() { server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); t.run_server(server_config, server_proxy, None); @@ -66,9 +66,12 @@ async fn test_filter() { }; let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), - )]) + clusters.insert_default( + [Endpoint::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), + )] + .into(), + ) }); client_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { @@ -131,7 +134,7 @@ async fn debug_filter() { }; server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { name: factory.name().into(), @@ -152,9 +155,12 @@ async fn debug_filter() { }; let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), - )]) + clusters.insert_default( + [Endpoint::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), + )] + .into(), + ) }); client_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { diff --git a/tests/firewall.rs b/tests/firewall.rs index 756e73adc9..61ec79cb57 100644 --- a/tests/firewall.rs +++ b/tests/firewall.rs @@ -123,7 +123,7 @@ async fn test(t: &mut TestHelper, server_port: u16, yaml: &str) -> oneshot::Rece server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); t.run_server(server_config, server_proxy, None); diff --git a/tests/health.rs b/tests/health.rs index 78107c046c..83ceb1af52 100644 --- a/tests/health.rs +++ b/tests/health.rs @@ -33,7 +33,7 @@ async fn health_server() { }; let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.clusters.modify(|clusters| { - clusters.insert_default(vec!["127.0.0.1:0".parse::().unwrap()]) + clusters.insert_default(["127.0.0.1:0".parse::().unwrap()].into()) }); t.run_server( server_config, diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs index b23ac24ee7..a6962689d9 100644 --- a/tests/load_balancer.rs +++ b/tests/load_balancer.rs @@ -49,13 +49,7 @@ policy: ROUND_ROBIN let server_port = 12346; let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.clusters.modify(|clusters| { - clusters.insert_default( - echo_addresses - .iter() - .cloned() - .map(Endpoint::new) - .collect::>(), - ) + clusters.insert_default(echo_addresses.iter().cloned().map(Endpoint::new).collect()) }); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index a935e02f4d..51fa0a14c2 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -43,7 +43,7 @@ period: 1 let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![Filter { name: LocalRateLimit::factory().name().into(), diff --git a/tests/match.rs b/tests/match.rs index 584c4f3cc6..67404a1bc7 100644 --- a/tests/match.rs +++ b/tests/match.rs @@ -65,7 +65,7 @@ on_read: let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); server_config.filters.store( quilkin::filters::FilterChain::try_from(vec![ Filter { diff --git a/tests/metrics.rs b/tests/metrics.rs index 1ada884baf..76b53ff558 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -35,7 +35,7 @@ async fn metrics_server() { let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(echo.clone())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); t.run_server( server_config, server_proxy, @@ -51,7 +51,7 @@ async fn metrics_server() { let client_config = std::sync::Arc::new(quilkin::Config::default()); client_config .clusters - .modify(|clusters| clusters.insert_default(vec![Endpoint::new(server_addr.into())])); + .modify(|clusters| clusters.insert_default([Endpoint::new(server_addr.into())].into())); t.run_server(client_config, client_proxy, None); // let's send the packet diff --git a/tests/no_filter.rs b/tests/no_filter.rs index 74a10f1dcd..ee0ea2bd7f 100644 --- a/tests/no_filter.rs +++ b/tests/no_filter.rs @@ -36,10 +36,13 @@ async fn echo() { }; let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.clusters.modify(|clusters| { - clusters.insert_default(vec![ - Endpoint::new(server1.clone()), - Endpoint::new(server2.clone()), - ]) + clusters.insert_default( + [ + Endpoint::new(server1.clone()), + Endpoint::new(server2.clone()), + ] + .into(), + ) }); t.run_server(server_config, server_proxy, None); diff --git a/tests/token_router.rs b/tests/token_router.rs index d4907dd0e1..4b646c8f03 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -51,10 +51,13 @@ quilkin.dev: }; let server_config = std::sync::Arc::new(quilkin::Config::default()); server_config.clusters.modify(|clusters| { - clusters.insert_default(vec![Endpoint::with_metadata( - echo.clone(), - serde_yaml::from_str::>(endpoint_metadata).unwrap(), - )]) + clusters.insert_default( + [Endpoint::with_metadata( + echo.clone(), + serde_yaml::from_str::>(endpoint_metadata).unwrap(), + )] + .into(), + ) }); server_config.filters.store(