Skip to content

Commit

Permalink
agent: use new pub_id when evolving materializations
Browse files Browse the repository at this point in the history
Ensure that the materialization controller will always use a new
publication id when applying onIncompatibleSchemaChange actions.
Previously, it would re-use the publication id of the one that had
failed. That doesn't work if the failed publication was a "touch"
publication. Applying `onIncompatibleSchemaChange` actions modifies the
model, and so the publication id must be greater than `last_pub_id`.
Generating a new id ensures that is always the case.
  • Loading branch information
psFried committed Sep 19, 2024
1 parent c90b6fd commit 8c314dd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 38 deletions.
3 changes: 2 additions & 1 deletion crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl MaterializationStatus {

if result.status.has_incompatible_collections() {
let PublicationResult {
pub_id: publication_id,
built,
mut detail,
mut draft,
Expand All @@ -103,6 +102,8 @@ impl MaterializationStatus {
.with_maybe_retry(backoff_publication_failure(state.failures))
.context("applying evolution actions")?;

// Always use a new pub_id since we're modifying the model
let publication_id = control_plane.next_pub_id();
let new_result = control_plane
.publish(publication_id, detail, state.logs_token, draft)
.await
Expand Down
53 changes: 16 additions & 37 deletions crates/agent/src/controllers/publication_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ fn count_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Sch
.unwrap()
}

fn is_touch_pub(draft: &tables::DraftCatalog) -> bool {
draft.tests.iter().all(|r| r.is_touch)
&& draft.collections.iter().all(|r| r.is_touch)
&& draft.captures.iter().all(|r| r.is_touch)
&& draft.materializations.iter().all(|r| r.is_touch)
}

impl PublicationInfo {
pub fn is_success(&self) -> bool {
// TODO: should EmptyDraft be considered successful?
self.result.as_ref().is_some_and(|s| s.is_success())
}

pub fn observed(publication: &PublicationResult) -> Self {
let is_touch = publication.draft.tests.iter().all(|r| r.is_touch)
&& publication.draft.collections.iter().all(|r| r.is_touch)
&& publication.draft.captures.iter().all(|r| r.is_touch)
&& publication
.draft
.materializations
.iter()
.all(|r| r.is_touch);
let is_touch = is_touch_pub(&publication.draft);
PublicationInfo {
id: publication.pub_id,
created: Some(publication.started_at),
Expand Down Expand Up @@ -156,7 +156,6 @@ impl PublicationInfo {
/// Represents a draft that is pending publication
#[derive(Debug)]
pub struct PendingPublication {
pub is_touch: bool,
/// The draft to be published
pub draft: tables::DraftCatalog,
/// Reasons for updating the draft, which will be joined together to become
Expand All @@ -175,7 +174,6 @@ impl PartialEq for PendingPublication {
impl PendingPublication {
pub fn new() -> Self {
PendingPublication {
is_touch: false,
draft: tables::DraftCatalog::default(),
details: Vec::new(),
}
Expand All @@ -189,8 +187,9 @@ impl PendingPublication {
tracing::info!("starting touch");
let new_hash = new_dependency_hash.unwrap_or("None");
let old_hash = state.live_dependency_hash.as_deref().unwrap_or("None");
self.details.push(format!("in response to change in dependencies, prev hash: {old_hash}, new hash: {new_hash}"));

self.details.push(format!(
"in response to change in dependencies, prev hash: {old_hash}, new hash: {new_hash}"
));
let model = state
.live_spec
.as_ref()
Expand All @@ -205,7 +204,7 @@ impl PendingPublication {
scope,
Some(state.last_pub_id),
Some(&model.to_raw_value()),
true,
true, // is_touch
)
.unwrap();
}
Expand Down Expand Up @@ -237,7 +236,6 @@ impl PendingPublication {
}

pub fn update_pending_draft(&mut self, detail: impl Into<String>) -> &mut tables::DraftCatalog {
self.is_touch = false;
self.details.push(detail.into());
&mut self.draft
}
Expand All @@ -248,33 +246,14 @@ impl PendingPublication {
status: &mut PublicationStatus,
control_plane: &mut C,
) -> anyhow::Result<PublicationResult> {
let pub_id = if self.is_touch {
debug_assert!(
self.draft.captures.iter().all(|c| c.is_touch),
"all drafted specs must have is_touch: true for touch pub"
);
debug_assert!(
self.draft.collections.iter().all(|c| c.is_touch),
"all drafted specs must have is_touch: true for touch pub"
);
debug_assert!(
self.draft.materializations.iter().all(|c| c.is_touch),
"all drafted specs must have is_touch: true for touch pub"
);
debug_assert!(
self.draft.tests.iter().all(|c| c.is_touch),
"all drafted specs must have is_touch: true for touch pub"
);

let is_touch = is_touch_pub(&self.draft);
let pub_id = if is_touch {
state.last_pub_id
} else {
control_plane.next_pub_id()
};
let PendingPublication {
is_touch,
draft,
details,
} = std::mem::replace(self, PendingPublication::new());
let PendingPublication { draft, details } =
std::mem::replace(self, PendingPublication::new());

let detail = details.join(", ");
let result = control_plane
Expand Down

0 comments on commit 8c314dd

Please sign in to comment.