Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: force-delete #173

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 97 additions & 47 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,38 @@ async fn reconcile_deleted_resource(
Ok(Action::await_change())
}
Err(kube::Error::Api(err)) if err.code == 404 => {
resource_sync.stop_remote_watches_if_watching(ctx).await;
stop_watches_and_remove_resource_sync_finalizers(resource_sync, name, parent_api, ctx)
.await
}
Err(err) => Err(err.into()),
}
}

let patched_finalizers = resource_sync
.finalizers_clone_or_empty()
.with_item_removed(&FINALIZER.to_string());
async fn stop_watches_and_remove_resource_sync_finalizers(
resource_sync: Arc<ResourceSync>,
name: &str,
parent_api: &Api<ResourceSync>,
ctx: Arc<Context>,
) -> Result<Action> {
resource_sync.stop_remote_watches_if_watching(ctx).await;

// Target has been deleted, remove the finalizer from the ResourceSync
let patch = Merge(json!({
"metadata": {
"finalizers": patched_finalizers,
},
}));
let patched_finalizers = resource_sync
.finalizers_clone_or_empty()
.with_item_removed(&FINALIZER.to_string());

parent_api
.patch(name, &PatchParams::default(), &patch)
.await?;
// Target has been deleted, remove the finalizer from the ResourceSync
let patch = Merge(json!({
"metadata": {
"finalizers": patched_finalizers,
},
}));

// We have removed our finalizer, so nothing more needs to be done
Ok(Action::await_change())
}
Err(err) => Err(err.into()),
}
parent_api
.patch(name, &PatchParams::default(), &patch)
.await?;

// We have removed our finalizer, so nothing more needs to be done
Ok(Action::await_change())
}

async fn add_target_finalizer(
Expand Down Expand Up @@ -197,35 +207,13 @@ async fn reconcile(resource_sync: Arc<ResourceSync>, ctx: Arc<Context>) -> Resul
.ok_or(Error::NameRequired)?;
let parent_api = resource_sync.api(ctx.client.clone());

let result = {
let resource_sync = Arc::clone(&resource_sync);

info!(?name, "running reconciler");

debug!(?resource_sync.spec, "got");
let local_ns = resource_sync.namespace().ok_or(Error::NamespaceRequired)?;

let target_api = resource_sync
.spec
.target
.api_for(ctx.client.clone(), &local_ns)
.await?;
let source_api = resource_sync
.spec
.source
.api_for(ctx.client.clone(), &local_ns)
.await?;

match resource_sync {
resource_sync if resource_sync.has_been_deleted() => {
reconcile_deleted_resource(resource_sync, &name, target_api, &parent_api, ctx).await
}
resource_sync if !resource_sync.has_target_finalizer() => {
add_target_finalizer(resource_sync, &name, &parent_api).await
}
_ => reconcile_normally(resource_sync, &name, source_api, target_api, ctx).await,
}
};
let result = reconcile_helper(
Arc::clone(&resource_sync),
Arc::clone(&ctx),
&name,
&parent_api,
)
.await;

let status = match &result {
Err(err) => {
Expand Down Expand Up @@ -258,6 +246,68 @@ async fn reconcile(resource_sync: Arc<ResourceSync>, ctx: Arc<Context>) -> Resul
result
}

async fn reconcile_helper(
resource_sync: Arc<ResourceSync>,
ctx: Arc<Context>,
name: &String,
parent_api: &Api<ResourceSync>,
) -> Result<Action> {
let resource_sync = Arc::clone(&resource_sync);

info!(?name, "running reconciler");

debug!(?resource_sync.spec, "got");
let local_ns = resource_sync.namespace().ok_or(Error::NamespaceRequired)?;

let (source_api, target_api) =
match source_and_target_apis(&resource_sync, &ctx, local_ns).await {
Ok(apis) => apis,
Err(_)
if resource_sync.has_force_delete_option_enabled()
&& resource_sync.has_been_deleted() =>
{
debug!(?name, "force-deleting ResourceSync");
return stop_watches_and_remove_resource_sync_finalizers(
resource_sync,
name,
parent_api,
ctx,
)
.await;
}
Err(err) => return Err(err),
};

match resource_sync {
resource_sync if resource_sync.has_been_deleted() => {
reconcile_deleted_resource(resource_sync, name, target_api, parent_api, ctx).await
}
resource_sync if !resource_sync.has_target_finalizer() => {
add_target_finalizer(resource_sync, name, parent_api).await
}
_ => reconcile_normally(resource_sync, name, source_api, target_api, ctx).await,
}
}

async fn source_and_target_apis(
resource_sync: &Arc<ResourceSync>,
ctx: &Arc<Context>,
local_ns: String,
) -> Result<(NamespacedApi, NamespacedApi)> {
let target_api = resource_sync
.spec
.target
.api_for(ctx.client.clone(), &local_ns)
.await?;
let source_api = resource_sync
.spec
.source
.api_for(ctx.client.clone(), &local_ns)
.await?;

Ok((source_api, target_api))
}

fn sync_failing_transition_time(status: &Option<ResourceSyncStatus>) -> Time {
let now = Time(Utc::now());

Expand Down
48 changes: 48 additions & 0 deletions src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ use kube::{
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

static FORCE_DELETE_ANNOTATION: &str = "sinker.influxdata.io/force-delete";

impl ResourceSync {
pub fn has_force_delete_option_enabled(&self) -> bool {
self.metadata
.annotations
.as_ref()
.map(|annotations| annotations.get(FORCE_DELETE_ANNOTATION))
.unwrap_or_default()
.cloned()
.unwrap_or_default()
.parse()
.unwrap_or_default()
}
}

#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)]
#[kube(
group = "sinker.influxdata.io",
Expand Down Expand Up @@ -142,3 +158,35 @@ impl SinkerContainer {
crd
}
}

#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use rstest::rstest;
use std::collections::BTreeMap;

#[rstest]
#[case::no_annotations(
ResourceSync{metadata: Default::default(),spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_not_present(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::new()), ..Default::default()},spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_is_false(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), false.to_string())])), ..Default::default()},spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_is_other(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), "other".to_string())])), ..Default::default()},spec: Default::default(),status: None,}, false
)]
#[case::force_delete_annotation_is_true(
ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), true.to_string())])), ..Default::default()},spec: Default::default(),status: None,}, true
)]
#[tokio::test]
async fn test_resource_sync_has_force_delete_option_enabled(
#[case] resource_sync: ResourceSync,
#[case] expected: bool,
) {
assert_eq!(resource_sync.has_force_delete_option_enabled(), expected);
}
}
Loading