From 8c314dd63742af00728fb1ec0e25f5b647eb2b33 Mon Sep 17 00:00:00 2001 From: Phil Date: Thu, 19 Sep 2024 12:55:44 -0400 Subject: [PATCH] agent: use new pub_id when evolving materializations Ensure that the materialization controller will always use a new publication id when applying onIncompatibleSchemaChange actions. Previously, it would re-use the publication id of the one that had failed. That doesn't work if the failed publication was a "touch" publication. Applying `onIncompatibleSchemaChange` actions modifies the model, and so the publication id must be greater than `last_pub_id`. Generating a new id ensures that is always the case. --- .../agent/src/controllers/materialization.rs | 3 +- .../src/controllers/publication_status.rs | 53 ++++++------------- 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index 9036d35b0a..4c38a2acdb 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -89,7 +89,6 @@ impl MaterializationStatus { if result.status.has_incompatible_collections() { let PublicationResult { - pub_id: publication_id, built, mut detail, mut draft, @@ -103,6 +102,8 @@ impl MaterializationStatus { .with_maybe_retry(backoff_publication_failure(state.failures)) .context("applying evolution actions")?; + // Always use a new pub_id since we're modifying the model + let publication_id = control_plane.next_pub_id(); let new_result = control_plane .publish(publication_id, detail, state.logs_token, draft) .await diff --git a/crates/agent/src/controllers/publication_status.rs b/crates/agent/src/controllers/publication_status.rs index 74ac7817ad..f3aa06446d 100644 --- a/crates/agent/src/controllers/publication_status.rs +++ b/crates/agent/src/controllers/publication_status.rs @@ -114,6 +114,13 @@ fn count_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Sch .unwrap() } +fn is_touch_pub(draft: &tables::DraftCatalog) -> bool { + draft.tests.iter().all(|r| r.is_touch) + && draft.collections.iter().all(|r| r.is_touch) + && draft.captures.iter().all(|r| r.is_touch) + && draft.materializations.iter().all(|r| r.is_touch) +} + impl PublicationInfo { pub fn is_success(&self) -> bool { // TODO: should EmptyDraft be considered successful? @@ -121,14 +128,7 @@ impl PublicationInfo { } pub fn observed(publication: &PublicationResult) -> Self { - let is_touch = publication.draft.tests.iter().all(|r| r.is_touch) - && publication.draft.collections.iter().all(|r| r.is_touch) - && publication.draft.captures.iter().all(|r| r.is_touch) - && publication - .draft - .materializations - .iter() - .all(|r| r.is_touch); + let is_touch = is_touch_pub(&publication.draft); PublicationInfo { id: publication.pub_id, created: Some(publication.started_at), @@ -156,7 +156,6 @@ impl PublicationInfo { /// Represents a draft that is pending publication #[derive(Debug)] pub struct PendingPublication { - pub is_touch: bool, /// The draft to be published pub draft: tables::DraftCatalog, /// Reasons for updating the draft, which will be joined together to become @@ -175,7 +174,6 @@ impl PartialEq for PendingPublication { impl PendingPublication { pub fn new() -> Self { PendingPublication { - is_touch: false, draft: tables::DraftCatalog::default(), details: Vec::new(), } @@ -189,8 +187,9 @@ impl PendingPublication { tracing::info!("starting touch"); let new_hash = new_dependency_hash.unwrap_or("None"); let old_hash = state.live_dependency_hash.as_deref().unwrap_or("None"); - self.details.push(format!("in response to change in dependencies, prev hash: {old_hash}, new hash: {new_hash}")); - + self.details.push(format!( + "in response to change in dependencies, prev hash: {old_hash}, new hash: {new_hash}" + )); let model = state .live_spec .as_ref() @@ -205,7 +204,7 @@ impl PendingPublication { scope, Some(state.last_pub_id), Some(&model.to_raw_value()), - true, + true, // is_touch ) .unwrap(); } @@ -237,7 +236,6 @@ impl PendingPublication { } pub fn update_pending_draft(&mut self, detail: impl Into) -> &mut tables::DraftCatalog { - self.is_touch = false; self.details.push(detail.into()); &mut self.draft } @@ -248,33 +246,14 @@ impl PendingPublication { status: &mut PublicationStatus, control_plane: &mut C, ) -> anyhow::Result { - let pub_id = if self.is_touch { - debug_assert!( - self.draft.captures.iter().all(|c| c.is_touch), - "all drafted specs must have is_touch: true for touch pub" - ); - debug_assert!( - self.draft.collections.iter().all(|c| c.is_touch), - "all drafted specs must have is_touch: true for touch pub" - ); - debug_assert!( - self.draft.materializations.iter().all(|c| c.is_touch), - "all drafted specs must have is_touch: true for touch pub" - ); - debug_assert!( - self.draft.tests.iter().all(|c| c.is_touch), - "all drafted specs must have is_touch: true for touch pub" - ); - + let is_touch = is_touch_pub(&self.draft); + let pub_id = if is_touch { state.last_pub_id } else { control_plane.next_pub_id() }; - let PendingPublication { - is_touch, - draft, - details, - } = std::mem::replace(self, PendingPublication::new()); + let PendingPublication { draft, details } = + std::mem::replace(self, PendingPublication::new()); let detail = details.join(", "); let result = control_plane