diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index 56d7bde86..59dec529c 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -10,14 +10,14 @@ use std::num::NonZeroU32; -use anyhow::Context; +use anyhow::{bail, Context}; use cling::prelude::*; use tonic::codec::CompressionEncoding; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; use restate_admin::cluster_controller::protobuf::{ChainExtension, SealAndExtendChainRequest}; use restate_cli_util::{c_eprintln, c_println}; -use restate_types::logs::metadata::{ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ProviderKind, Segment, SegmentIndex}; use restate_types::logs::{LogId, LogletId}; use restate_types::nodes_config::Role; use restate_types::protobuf::common::Version; @@ -58,23 +58,55 @@ pub struct ReconfigureOpts { } async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> anyhow::Result<()> { - let extension = match opts.provider { - Some(provider) => { - let params = match provider { - ProviderKind::Local => rand::random::().to_string(), - #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => rand::random::().to_string(), - #[cfg(feature = "replicated-loglet")] - ProviderKind::Replicated => replicated_loglet_params(connection, opts).await?, - }; - - Some(ChainExtension { - provider: provider.to_string(), - segment_index: opts.segment_index, - params, - }) + let logs = connection.get_logs().await?; + + let log_id = LogId::from(opts.log_id); + let chain = logs + .chain(&log_id) + .with_context(|| format!("Unknown log id '{log_id}'"))?; + + let tail_segment = chain.tail(); + + let tail_index = opts + .segment_index + .map(SegmentIndex::from) + .unwrap_or(chain.tail_index()); + + let next_loglet_id = LogletId::new(log_id, tail_index.next()); + + let provider = opts.provider.unwrap_or(tail_segment.config.kind); + + let params = match (provider, tail_segment.config.kind) { + // we can always go to replicated loglet + (ProviderKind::Replicated, _) => { + replicated_loglet_params(opts, next_loglet_id, &tail_segment)? + } + // but never back to anything else + (_, ProviderKind::Replicated) => { + bail!( + "Switching back to {} provider kind is not supported", + provider + ); + } + (ProviderKind::Local, _) => { + if opts.sequencer.is_some() || !opts.nodeset.is_empty() { + bail!("'sequencer' or 'nodeset' are only allowed with 'replicated' provider"); + } + u64::from(next_loglet_id).to_string() } - None => None, + #[cfg(any(test, feature = "memory-loglet"))] + (ProviderKind::InMemory, _) => { + if opts.sequencer.is_some() || !opts.nodeset.is_empty() { + bail!("'sequencer' or 'nodeset' are only allowed with 'replicated' provider"); + } + u64::from(next_loglet_id).to_string() + } + }; + + let extension = ChainExtension { + provider: provider.to_string(), + segment_index: Some(tail_index.into()), + params, }; let request = SealAndExtendChainRequest { @@ -82,7 +114,7 @@ async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> any min_version: Some(Version { value: opts.min_version.get(), }), - extension, + extension: Some(extension), }; let response = connection @@ -126,30 +158,14 @@ async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> any Ok(()) } -async fn replicated_loglet_params( - connection: &ConnectionInfo, +fn replicated_loglet_params( opts: &ReconfigureOpts, + loglet_id: LogletId, + tail: &Segment<'_>, ) -> anyhow::Result { - let logs = connection.get_logs().await?; - - let log_id = LogId::from(opts.log_id); - let chain = logs - .chain(&log_id) - .with_context(|| format!("Unknown log id '{log_id}'"))?; - - let tail_index = opts - .segment_index - .map(SegmentIndex::from) - .unwrap_or(chain.tail_index()); - - let loglet_id = LogletId::new(log_id, tail_index.next()); - - let tail_segment = chain.tail(); - - let params = if tail_segment.config.kind == ProviderKind::Replicated { - let last_params = - ReplicatedLogletParams::deserialize_from(tail_segment.config.params.as_bytes()) - .context("Last segment params in chain is invalid")?; + let params = if tail.config.kind == ProviderKind::Replicated { + let last_params = ReplicatedLogletParams::deserialize_from(tail.config.params.as_bytes()) + .context("Last segment params in chain is invalid")?; ReplicatedLogletParams { loglet_id, @@ -172,7 +188,7 @@ async fn replicated_loglet_params( } else { NodeSet::from_iter(opts.nodeset.iter().cloned()) }, - replication: opts.replication.clone().context("Missing replication-factor. Replication factor is required if last segment is not of replicated type")?, + replication: opts.replication.clone().context("Missing replication. Replication factor is required if last segment is not of replicated type")?, sequencer: opts.sequencer.context("Missing sequencer. Sequencer is required if last segment is not of replicated type")?, } };