Skip to content

Commit

Permalink
Merge pull request #2 from Argonus/support-cached-endpoints
Browse files Browse the repository at this point in the history
Add resource version parameter for kubernates strategy.
  • Loading branch information
Argonus authored Dec 2, 2024
2 parents 103931e + 38d7b5d commit cac5d3c
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- Add `kubernates_resource_version` option to Kubernetes strategy

## 3.4.1

- Use new cypher names
- Allow Epmd strategy to reconnect after connection failures
- Detect Self Signed Certificate Authority for Kubernetes Strategy
Expand Down
18 changes: 16 additions & 2 deletions lib/strategy/kubernetes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Cluster.Strategy.Kubernetes do
- `:kubernetes_selector`
- `:kubernetes_service_name`
- `:kubernetes_ip_lookup_mode`
- `:kubernetes_resource_version`
- `:mode`
## Getting `<basename>`
Expand Down Expand Up @@ -70,6 +71,12 @@ defmodule Cluster.Strategy.Kubernetes do
Then, this strategy will fetch the IP of all pods with that label and attempt to connect.
### `kubernetes_resource_version` option
When setting this value, this strategy will use given resource version value to fetch k8s resources,
where each modification of the resource increments the resource version.
If set to `0` kubernetes will use cached version, that may be outdated.
### `:mode` option
Expand Down Expand Up @@ -361,6 +368,7 @@ defmodule Cluster.Strategy.Kubernetes do
service_name = Keyword.get(config, :kubernetes_service_name)
selector = Keyword.fetch!(config, :kubernetes_selector)
ip_lookup_mode = Keyword.get(config, :kubernetes_ip_lookup_mode, :endpoints)
resource_version = Keyword.get(config, :kubernetes_resource_version, nil)

master_name = Keyword.get(config, :kubernetes_master, @kubernetes_master)
cluster_domain = System.get_env("CLUSTER_DOMAIN", "#{cluster_name}.local")
Expand All @@ -382,10 +390,16 @@ defmodule Cluster.Strategy.Kubernetes do
app_name != nil and selector != nil ->
selector = URI.encode(selector)

resource_version_param =
if is_nil(resource_version), do: "", else: "&resourceVersion=#{resource_version}"

path =
case ip_lookup_mode do
:endpoints -> "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}"
:pods -> "api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}"
:endpoints ->
"api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}#{resource_version_param}"

:pods ->
"api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}#{resource_version_param}"
end

headers = [{~c"authorization", ~c"Bearer #{token}"}]
Expand Down
32 changes: 32 additions & 0 deletions test/fixtures/vcr_cassettes/kubernetes.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@
"type": "ok"
}
},
{
"request": {
"body": "",
"headers": {
"authorization": "***"
},
"method": "get",
"options": {
"httpc_options": [],
"http_options": {
"ssl": "[verify: :verify_none]"
}
},
"request_body": "",
"url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/endpoints?labelSelector=app=test_selector&resourceVersion=0"
},
"response": {
"binary": false,
"body": "{\"kind\":\"EndpointsList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"subsets\":[{\"addresses\":[{\"hostname\":\"my-hostname-0\",\"ip\":\"10.48.33.136\",\"nodeName\":\"gke-jshmrtn-cluster-default-pool-a61da41f-db9x\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"airatel-service-localization\",\"name\":\"development-4292695165-mgq9f\",\"uid\":\"eb0f3e80-0295-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037783\"}}],\"ports\":[{\"name\":\"web\",\"port\":8443,\"protocol\":\"TCP\"}]}]}]}\n",
"headers": {
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
"content-length": "877",
"content-type": "application/json"
},
"status_code": [
"HTTP/1.1",
200,
"OK"
],
"type": "ok"
}
},
{
"request": {
"body": "",
Expand Down
32 changes: 32 additions & 0 deletions test/fixtures/vcr_cassettes/kubernetes_pods.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,37 @@
],
"type": "ok"
}
},
{
"request": {
"body": "",
"headers": {
"authorization": "***"
},
"method": "get",
"options": {
"httpc_options": [],
"http_options": {
"ssl": "[verify: :verify_none]"
}
},
"request_body": "",
"url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/pods?labelSelector=app=test_selector&resourceVersion=0"
},
"response": {
"binary": false,
"body": "{\"kind\":\"PodList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"spec\": { \"hostname\": \"my-hostname-0\" },\"status\":{\"podIP\": \"10.48.33.136\"}}]}\n",
"headers": {
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
"content-length": "877",
"content-type": "application/json"
},
"status_code": [
"HTTP/1.1",
200,
"OK"
],
"type": "ok"
}
}
]
55 changes: 55 additions & 0 deletions test/kubernetes_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,33 @@ defmodule Cluster.Strategy.KubernetesTest do
end
end

test "works with resource version" do
use_cassette "kubernetes", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
kubernetes_resource_version: 0,
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, _}, 5_000
end)
end
end

test "works with dns and cluster_name" do
use_cassette "kubernetes", custom: true do
capture_log(fn ->
Expand Down Expand Up @@ -201,6 +228,34 @@ defmodule Cluster.Strategy.KubernetesTest do
end
end

test "works with pods and resource version" do
use_cassette "kubernetes_pods", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_ip_lookup_mode: :pods,
kubernetes_resource_version: 0,
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, :"[email protected]"}, 5_000
end)
end
end

test "works with pods and dns" do
use_cassette "kubernetes_pods", custom: true do
capture_log(fn ->
Expand Down

0 comments on commit cac5d3c

Please sign in to comment.