diff --git a/bootstrap/crds/main.tf b/bootstrap/crds/main.tf index ec9d4d6..c2fb93d 100644 --- a/bootstrap/crds/main.tf +++ b/bootstrap/crds/main.tf @@ -8,6 +8,7 @@ resource "kubernetes_manifest" "customresourcedefinition_kupoports_demeter_run" "spec" = { "group" = "demeter.run" "names" = { + "categories" = [] "kind" = "KupoPort" "plural" = "kupoports" "shortNames" = [ @@ -39,6 +40,11 @@ resource "kubernetes_manifest" "customresourcedefinition_kupoports_demeter_run" "name" = "Endpoint URL" "type" = "string" }, + { + "jsonPath" = ".spec.authentication" + "name" = "Authentication" + "type" = "string" + }, { "jsonPath" = ".status.authToken" "name" = "Auth Token" @@ -52,6 +58,13 @@ resource "kubernetes_manifest" "customresourcedefinition_kupoports_demeter_run" "properties" = { "spec" = { "properties" = { + "authentication" = { + "enum" = [ + "none", + "apiKey", + ] + "type" = "string" + } "network" = { "enum" = [ "mainnet", @@ -72,6 +85,7 @@ resource "kubernetes_manifest" "customresourcedefinition_kupoports_demeter_run" } } "required" = [ + "authentication", "network", "operatorVersion", "pruneUtxo", diff --git a/operator/src/controller.rs b/operator/src/controller.rs index 797a6d6..c04013c 100644 --- a/operator/src/controller.rs +++ b/operator/src/controller.rs @@ -1,19 +1,20 @@ -use crate::{ - auth::handle_auth, - gateway::{handle_http_route, handle_reference_grant}, - Error, Metrics, Network, Result, State, -}; use futures::StreamExt; use kube::{ api::ListParams, runtime::{controller::Action, watcher::Config as WatcherConfig, Controller}, - Api, Client, CustomResource, ResourceExt, + Api, Client, CustomResource, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use tracing::{error, info, instrument}; +use crate::{ + auth::handle_auth, + gateway::{handle_http_route, handle_reference_grant}, + Error, Metrics, Network, Result, State, +}; + pub static KUPO_PORT_FINALIZER: &str = "kupoports.demeter.run"; struct Context { @@ -26,6 +27,13 @@ impl Context { } } +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum Authentication { + None, + ApiKey, +} + #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] #[kube( kind = "KupoPort", @@ -35,15 +43,14 @@ impl Context { namespaced )] #[kube(status = "KupoPortStatus")] -#[kube( - printcolumn = r#" +#[kube(printcolumn = r#" {"name": "Network", "jsonPath": ".spec.network", "type": "string"}, {"name": "Pruned", "jsonPath": ".spec.pruneUtxo", "type": "boolean"}, {"name": "Throughput Tier", "jsonPath":".spec.throughputTier", "type": "string"}, {"name": "Endpoint URL", "jsonPath": ".status.endpointUrl", "type": "string"}, + {"name": "Authentication", "jsonPath": ".spec.authentication", "type": "string"}, {"name": "Auth Token", "jsonPath": ".status.authToken", "type": "string"} - "# -)] + "#)] #[serde(rename_all = "camelCase")] pub struct KupoPortSpec { pub operator_version: String, @@ -51,33 +58,21 @@ pub struct KupoPortSpec { pub prune_utxo: bool, // throughput should be 0, 1, 2 pub throughput_tier: String, + pub authentication: Authentication, } #[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct KupoPortStatus { - #[serde(skip_serializing_if = "Option::is_none")] pub endpoint_url: Option, #[serde(skip_serializing_if = "Option::is_none")] pub auth_token: Option, } -fn build_private_dns_service_name(network: &Network, prune_utxo: bool) -> String { - if prune_utxo { - return format!("kupo-{}-pruned", network); - } - format!("kupo-{}", network) -} - async fn reconcile(crd: Arc, ctx: Arc) -> Result { - let client = ctx.client.clone(); - let namespace = crd.namespace().unwrap(); - - let private_dns_service_name = - build_private_dns_service_name(&crd.spec.network, crd.spec.prune_utxo); - handle_reference_grant(client.clone(), &namespace, &crd, &private_dns_service_name).await?; - handle_http_route(client.clone(), &namespace, &crd, &private_dns_service_name).await?; - handle_auth(client.clone(), &namespace, &crd).await?; + handle_reference_grant(ctx.client.clone(), &crd).await?; + handle_http_route(ctx.client.clone(), &crd).await?; + handle_auth(ctx.client.clone(), &crd).await?; Ok(Action::await_change()) } diff --git a/operator/src/handlers/auth.rs b/operator/src/handlers/auth.rs index 9b234ed..596df5e 100644 --- a/operator/src/handlers/auth.rs +++ b/operator/src/handlers/auth.rs @@ -6,7 +6,7 @@ use base64::{ use bech32::ToBase32; use k8s_openapi::{api::core::v1::Secret, apimachinery::pkg::apis::meta::v1::OwnerReference}; use kube::{ - api::{Patch, PatchParams, PostParams}, + api::{DeleteParams, Patch, PatchParams, PostParams}, core::ObjectMeta, Api, Client, CustomResourceExt, Resource, ResourceExt, }; @@ -16,150 +16,189 @@ use std::collections::BTreeMap; use tracing::info; use crate::{ - create_resource, get_acl_name, get_auth_name, get_config, get_resource, kong_consumer, - kong_plugin, patch_resource, patch_resource_status, Error, KupoPort, KupoPortStatus, + create_resource, delete_resource, get_acl_name, get_auth_name, get_config, get_resource, + kong_consumer, kong_plugin, patch_resource, replace_resource_status, Authentication, Error, + KupoPort, }; -pub async fn handle_auth( - client: Client, - namespace: &str, - resource: &KupoPort, -) -> Result<(), Error> { - handle_auth_secret(client.clone(), namespace, resource).await?; - handle_auth_plugin(client.clone(), namespace, resource).await?; - handle_acl_secret(client.clone(), namespace, resource).await?; - handle_acl_plugin(client.clone(), namespace, resource).await?; - handle_consumer(client.clone(), namespace, resource).await?; +pub async fn handle_auth(client: Client, crd: &KupoPort) -> Result<(), Error> { + handle_auth_secret(client.clone(), crd).await?; + handle_auth_plugin(client.clone(), crd).await?; + + handle_acl_secret(client.clone(), crd).await?; + handle_acl_plugin(client.clone(), crd).await?; + + handle_consumer(client.clone(), crd).await?; + Ok(()) } -async fn handle_auth_secret( - client: Client, - namespace: &str, - resource: &KupoPort, -) -> Result<(), Error> { - let name = get_auth_name(&resource.name_any()); - let api_key = generate_api_key(&name, namespace).await?; +async fn handle_auth_secret(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let name = get_auth_name(&crd.name_any()); let kupo_port = KupoPort::api_resource(); - let api = Api::::namespaced(client.clone(), namespace); - - let secret = auth_secret(&name, &api_key, resource.clone()); + let api = Api::::namespaced(client.clone(), &namespace); let result = api.get_opt(&name).await?; - if result.is_some() { - info!(resource = resource.name_any(), "Updating secret"); - let patch_params = PatchParams::default(); - api.patch(&name, &patch_params, &Patch::Merge(secret)) - .await?; - } else { - info!(resource = resource.name_any(), "Creating secret"); - let post_params = PostParams::default(); - api.create(&post_params, &secret).await?; + let mut status = crd.status.clone().unwrap_or_default(); + + match crd.spec.authentication { + Authentication::ApiKey => { + let api_key = generate_api_key(&name, &namespace).await?; + let secret = build_auth_secret(&name, &api_key, crd.clone()); + + status.auth_token = Some(api_key); + + if result.is_some() { + info!(resource = crd.name_any(), "Updating auth secret"); + let patch_params = PatchParams::default(); + api.patch(&name, &patch_params, &Patch::Merge(secret)) + .await?; + } else { + info!(resource = crd.name_any(), "Creating auth secret"); + let post_params = PostParams::default(); + api.create(&post_params, &secret).await?; + } + } + Authentication::None => { + if result.is_some() { + info!(resource = crd.name_any(), "Deleting auth secret"); + api.delete(&name, &DeleteParams::default()).await?; + } + } } - let status = KupoPortStatus { - auth_token: Some(api_key), - ..Default::default() - }; - - patch_resource_status( + replace_resource_status( client.clone(), - namespace, + &namespace, kupo_port, - &resource.name_any(), + &crd.name_any(), serde_json::to_value(status)?, ) .await?; + Ok(()) } -async fn handle_auth_plugin( - client: Client, - namespace: &str, - resource: &KupoPort, -) -> Result<(), Error> { - let name = get_auth_name(&resource.name_any()); +async fn handle_auth_plugin(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let name = get_auth_name(&crd.name_any()); let kong_plugin = kong_plugin(); - let result = get_resource(client.clone(), namespace, &kong_plugin, &name).await?; - let (metadata, data, raw) = auth_plugin(resource.clone())?; - - if result.is_some() { - info!(resource = resource.name_any(), "Updating auth plugin"); - patch_resource(client.clone(), namespace, kong_plugin, &name, raw).await?; - } else { - info!(resource = resource.name_any(), "Creating auth plugin"); - create_resource(client.clone(), namespace, kong_plugin, metadata, data).await?; + let result = get_resource(client.clone(), &namespace, &kong_plugin, &name).await?; + + match crd.spec.authentication { + Authentication::ApiKey => { + let (metadata, data, raw) = build_auth_plugin(crd.clone())?; + if result.is_some() { + info!(resource = crd.name_any(), "Updating auth plugin"); + patch_resource(client.clone(), &namespace, kong_plugin, &name, raw).await?; + } else { + info!(resource = crd.name_any(), "Creating auth plugin"); + create_resource(client.clone(), &namespace, kong_plugin, metadata, data).await?; + } + } + Authentication::None => { + if result.is_some() { + info!(resource = crd.name_any(), "Deleting auth plugin"); + delete_resource(client.clone(), &namespace, kong_plugin, &name).await?; + } + } } + Ok(()) } -async fn handle_acl_secret( - client: Client, - namespace: &str, - resource: &KupoPort, -) -> Result<(), Error> { - let name = get_acl_name(&resource.name_any()); +async fn handle_acl_secret(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let name = get_acl_name(&crd.name_any()); - let api = Api::::namespaced(client.clone(), namespace); + let api = Api::::namespaced(client.clone(), &namespace); - let secret = acl_secret(&name, resource.clone()); let result = api.get_opt(&name).await?; - if result.is_some() { - println!("Updating acl secret for {}", resource.name_any()); - let patch_params = PatchParams::default(); - api.patch(&name, &patch_params, &Patch::Merge(secret)) - .await?; - } else { - println!("Creating acl secret for {}", resource.name_any()); - let post_params = PostParams::default(); - api.create(&post_params, &secret).await?; + match crd.spec.authentication { + Authentication::ApiKey => { + let secret = build_acl_secret(&name, crd.clone()); + + if result.is_some() { + info!(resource = crd.name_any(), "Updating acl secret"); + let patch_params = PatchParams::default(); + api.patch(&name, &patch_params, &Patch::Merge(secret)) + .await?; + } else { + info!(resource = crd.name_any(), "Creating acl secret"); + let post_params = PostParams::default(); + api.create(&post_params, &secret).await?; + } + } + Authentication::None => { + if result.is_some() { + info!(resource = crd.name_any(), "Deleting acl secret"); + api.delete(&name, &DeleteParams::default()).await?; + } + } } Ok(()) } -async fn handle_acl_plugin( - client: Client, - namespace: &str, - resource: &KupoPort, -) -> Result<(), Error> { - let name = get_acl_name(&resource.name_any()); +async fn handle_acl_plugin(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let name = get_acl_name(&crd.name_any()); let kong_plugin = kong_plugin(); - let result = get_resource(client.clone(), namespace, &kong_plugin, &name).await?; - let (metadata, data, raw) = acl_plugin(resource.clone())?; - - if result.is_some() { - println!("Updating acl plugin for: {}", resource.name_any()); - patch_resource(client.clone(), namespace, kong_plugin, &name, raw).await?; - } else { - println!("Creating acl plugin for: {}", resource.name_any()); - create_resource(client.clone(), namespace, kong_plugin, metadata, data).await?; + let result = get_resource(client.clone(), &namespace, &kong_plugin, &name).await?; + let (metadata, data, raw) = build_acl_plugin(crd.clone())?; + + match crd.spec.authentication { + Authentication::ApiKey => { + if result.is_some() { + info!(resource = crd.name_any(), "Updating acl plugin"); + patch_resource(client.clone(), &namespace, kong_plugin, &name, raw).await?; + } else { + info!(resource = crd.name_any(), "Creating acl plugin"); + create_resource(client.clone(), &namespace, kong_plugin, metadata, data).await?; + } + } + Authentication::None => { + if result.is_some() { + info!(resource = crd.name_any(), "Deleting acl plugin"); + delete_resource(client.clone(), &namespace, kong_plugin, &name).await?; + } + } } + Ok(()) } -async fn handle_consumer( - client: Client, - namespace: &str, - resource: &KupoPort, -) -> Result<(), Error> { - let name = get_auth_name(&resource.name_any()); +async fn handle_consumer(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let name = get_auth_name(&crd.name_any()); let kong_consumer = kong_consumer(); - let result = get_resource(client.clone(), namespace, &kong_consumer, &name).await?; - let (metadata, data, raw) = consumer(resource.clone())?; - - if result.is_some() { - info!(resource = resource.name_any(), "Updating consumer"); - patch_resource(client.clone(), namespace, kong_consumer, &name, raw).await?; - } else { - info!(resource = resource.name_any(), "Creating consumer"); - create_resource(client.clone(), namespace, kong_consumer, metadata, data).await?; + let result = get_resource(client.clone(), &namespace, &kong_consumer, &name).await?; + + match crd.spec.authentication { + Authentication::ApiKey => { + let (metadata, data, raw) = build_consumer(crd.clone())?; + if result.is_some() { + info!(resource = crd.name_any(), "Updating consumer"); + patch_resource(client.clone(), &namespace, kong_consumer, &name, raw).await?; + } else { + info!(resource = crd.name_any(), "Creating consumer"); + create_resource(client.clone(), &namespace, kong_consumer, metadata, data).await?; + } + } + Authentication::None => { + if result.is_some() { + info!(resource = crd.name_any(), "Deleting consumer"); + delete_resource(client.clone(), &namespace, kong_consumer, &name).await?; + } + } } + Ok(()) } @@ -182,7 +221,7 @@ async fn generate_api_key(name: &str, namespace: &str) -> Result Ok(with_bech) } -fn auth_secret(name: &str, api_key: &str, owner: KupoPort) -> Secret { +fn build_auth_secret(name: &str, api_key: &str, owner: KupoPort) -> Secret { let mut string_data = BTreeMap::new(); string_data.insert("key".into(), api_key.into()); @@ -210,7 +249,7 @@ fn auth_secret(name: &str, api_key: &str, owner: KupoPort) -> Secret { } } -fn auth_plugin(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Error> { +fn build_auth_plugin(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Error> { let kong_plugin = kong_plugin(); let metadata = ObjectMeta::deserialize(&json!({ @@ -244,7 +283,7 @@ fn auth_plugin(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Er Ok((metadata, data, raw)) } -fn acl_secret(name: &str, owner: KupoPort) -> Secret { +fn build_acl_secret(name: &str, owner: KupoPort) -> Secret { let mut string_data = BTreeMap::new(); string_data.insert("group".into(), owner.name_any()); @@ -272,7 +311,7 @@ fn acl_secret(name: &str, owner: KupoPort) -> Secret { } } -fn acl_plugin(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Error> { +fn build_acl_plugin(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Error> { let kong_plugin = kong_plugin(); let metadata = ObjectMeta::deserialize(&json!({ @@ -305,7 +344,7 @@ fn acl_plugin(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Err Ok((metadata, data, raw)) } -fn consumer(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Error> { +fn build_consumer(owner: KupoPort) -> Result<(ObjectMeta, JsonValue, JsonValue), Error> { let kong_consumer = kong_consumer(); let config = get_config(); diff --git a/operator/src/handlers/gateway.rs b/operator/src/handlers/gateway.rs index 59551fa..7d1777e 100644 --- a/operator/src/handlers/gateway.rs +++ b/operator/src/handlers/gateway.rs @@ -6,30 +6,28 @@ use tracing::info; use crate::{ create_resource, get_acl_name, get_auth_name, get_config, get_rate_limit_name, get_resource, http_route, patch_resource, patch_resource_status, reference_grant, Error, KupoPort, - KupoPortStatus, + KupoPortStatus, kupo_service_name, }; -pub async fn handle_http_route( - client: Client, - namespace: &str, - resource: &KupoPort, - private_dns_service_name: &str, -) -> Result<(), Error> { - let name = format!("kupo-{}", resource.name_any()); - let host_name = build_host(&resource.name_any(), &namespace_to_slug(namespace)); +pub async fn handle_http_route(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let kupo_service = kupo_service_name(&crd.spec.network, crd.spec.prune_utxo); + + let name = format!("kupo-{}", crd.name_any()); + let host_name = build_host(&crd.name_any(), &namespace_to_slug(&namespace)); let http_route = http_route(); let kupo_port = KupoPort::api_resource(); - let result = get_resource(client.clone(), namespace, &http_route, &name).await?; + let result = get_resource(client.clone(), &namespace, &http_route, &name).await?; - let (metadata, data, raw) = route(&name, &host_name, resource, private_dns_service_name)?; + let (metadata, data, raw) = route(&name, &host_name, crd, &kupo_service)?; if result.is_some() { - info!(resource = resource.name_any(), "Updating http route"); - patch_resource(client.clone(), namespace, http_route, &name, raw).await?; + info!(resource = crd.name_any(), "Updating http route"); + patch_resource(client.clone(), &namespace, http_route, &name, raw).await?; } else { - info!(resource = resource.name_any(), "Creating http route"); - create_resource(client.clone(), namespace, http_route, metadata, data).await?; + info!(resource = crd.name_any(), "Creating http route"); + create_resource(client.clone(), &namespace, http_route, metadata, data).await?; } let status = KupoPortStatus { @@ -38,31 +36,29 @@ pub async fn handle_http_route( }; patch_resource_status( client.clone(), - namespace, + &namespace, kupo_port, - &resource.name_any(), + &crd.name_any(), serde_json::to_value(status)?, ) .await?; Ok(()) } -pub async fn handle_reference_grant( - client: Client, - namespace: &str, - resource: &KupoPort, - private_dns_service_name: &str, -) -> Result<(), Error> { - let name = format!("{}-{}-http", namespace, resource.name_any()); +pub async fn handle_reference_grant(client: Client, crd: &KupoPort) -> Result<(), Error> { + let namespace = crd.namespace().unwrap(); + let kupo_service = kupo_service_name(&crd.spec.network, crd.spec.prune_utxo); + + let name = format!("{}-{}-http", namespace, crd.name_any()); let reference_grant = reference_grant(); let config = get_config(); let result = get_resource(client.clone(), &config.namespace, &reference_grant, &name).await?; - let (metadata, data, raw) = grant(&name, private_dns_service_name, namespace)?; + let (metadata, data, raw) = grant(&name, &kupo_service, &namespace)?; if result.is_some() { - info!(resource = resource.name_any(), "Updating reference grant"); + info!(resource = crd.name_any(), "Updating reference grant"); patch_resource( client.clone(), &config.namespace, @@ -72,7 +68,7 @@ pub async fn handle_reference_grant( ) .await?; } else { - info!(resource = resource.name_any(), "Creating reference grant"); + info!(resource = crd.name_any(), "Creating reference grant"); // we need to get the deserialized payload create_resource( client.clone(), diff --git a/operator/src/helpers/mod.rs b/operator/src/helpers/mod.rs index 0e70807..3163676 100644 --- a/operator/src/helpers/mod.rs +++ b/operator/src/helpers/mod.rs @@ -1,11 +1,13 @@ use kube::{ - api::{Patch, PatchParams, PostParams}, + api::{DeleteParams, Patch, PatchParams, PostParams}, core::{DynamicObject, ObjectMeta}, discovery::ApiResource, Api, Client, }; use serde_json::json; +use crate::Network; + pub fn http_route() -> ApiResource { ApiResource { group: "gateway.networking.k8s.io".into(), @@ -106,4 +108,41 @@ pub async fn patch_resource_status( Ok(()) } -// Similarly define GRPCRoute, KongPlugin, KongConsumer, UtxoRpcPort structs +pub async fn replace_resource_status( + client: Client, + namespace: &str, + api_resource: ApiResource, + name: &str, + payload: serde_json::Value, +) -> Result<(), kube::Error> { + let api: Api = Api::namespaced_with(client, namespace, &api_resource); + + let status = json!({ "status": payload }); + + let post_params = PostParams::default(); + + api.replace_status(name, &post_params, status.to_string().into_bytes()) + .await?; + + Ok(()) +} + +pub async fn delete_resource( + client: Client, + namespace: &str, + api_resource: ApiResource, + name: &str, +) -> Result<(), kube::Error> { + let api: Api = Api::namespaced_with(client, namespace, &api_resource); + + api.delete(name, &DeleteParams::default()).await?; + + Ok(()) +} + +pub fn kupo_service_name(network: &Network, prune_utxo: bool) -> String { + if prune_utxo { + return format!("kupo-{}-pruned", network); + } + format!("kupo-{}", network) +} diff --git a/operator/yaml/crd.yaml b/operator/yaml/crd.yaml index b837e4b..146d235 100644 --- a/operator/yaml/crd.yaml +++ b/operator/yaml/crd.yaml @@ -26,6 +26,9 @@ spec: - jsonPath: .status.endpointUrl name: Endpoint URL type: string + - jsonPath: .spec.authentication + name: Authentication + type: string - jsonPath: .status.authToken name: Auth Token type: string @@ -36,6 +39,11 @@ spec: properties: spec: properties: + authentication: + enum: + - none + - apiKey + type: string network: enum: - mainnet @@ -50,6 +58,7 @@ spec: throughputTier: type: string required: + - authentication - network - operatorVersion - pruneUtxo