Skip to content

Commit

Permalink
Reconfigure respect --sequencer and --nodeset if provider is not …
Browse files Browse the repository at this point in the history
…supplied (#2721)

* Reconfigure respect `--sequencer` and `--nodeset` if provider is not supplied

Summary:
Respect both the sequencer and nodeset flag if provider is not supplied
as long as the loglet is already using replicated provider.

Also make sure there is noway you can go back to local or in-memory provider
once you go replicated
  • Loading branch information
muhamadazmy authored Feb 13, 2025
1 parent 8385eae commit 45bf7c0
Showing 1 changed file with 58 additions and 42 deletions.
100 changes: 58 additions & 42 deletions tools/restatectl/src/commands/log/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

use std::num::NonZeroU32;

use anyhow::Context;
use anyhow::{bail, Context};
use cling::prelude::*;
use tonic::codec::CompressionEncoding;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::{ChainExtension, SealAndExtendChainRequest};
use restate_cli_util::{c_eprintln, c_println};
use restate_types::logs::metadata::{ProviderKind, SegmentIndex};
use restate_types::logs::metadata::{ProviderKind, Segment, SegmentIndex};
use restate_types::logs::{LogId, LogletId};
use restate_types::nodes_config::Role;
use restate_types::protobuf::common::Version;
Expand Down Expand Up @@ -58,31 +58,63 @@ pub struct ReconfigureOpts {
}

async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> anyhow::Result<()> {
let extension = match opts.provider {
Some(provider) => {
let params = match provider {
ProviderKind::Local => rand::random::<u64>().to_string(),
#[cfg(any(test, feature = "memory-loglet"))]
ProviderKind::InMemory => rand::random::<u64>().to_string(),
#[cfg(feature = "replicated-loglet")]
ProviderKind::Replicated => replicated_loglet_params(connection, opts).await?,
};

Some(ChainExtension {
provider: provider.to_string(),
segment_index: opts.segment_index,
params,
})
let logs = connection.get_logs().await?;

let log_id = LogId::from(opts.log_id);
let chain = logs
.chain(&log_id)
.with_context(|| format!("Unknown log id '{log_id}'"))?;

let tail_segment = chain.tail();

let tail_index = opts
.segment_index
.map(SegmentIndex::from)
.unwrap_or(chain.tail_index());

let next_loglet_id = LogletId::new(log_id, tail_index.next());

let provider = opts.provider.unwrap_or(tail_segment.config.kind);

let params = match (provider, tail_segment.config.kind) {
// we can always go to replicated loglet
(ProviderKind::Replicated, _) => {
replicated_loglet_params(opts, next_loglet_id, &tail_segment)?
}
// but never back to anything else
(_, ProviderKind::Replicated) => {
bail!(
"Switching back to {} provider kind is not supported",
provider
);
}
(ProviderKind::Local, _) => {
if opts.sequencer.is_some() || !opts.nodeset.is_empty() {
bail!("'sequencer' or 'nodeset' are only allowed with 'replicated' provider");
}
u64::from(next_loglet_id).to_string()
}
None => None,
#[cfg(any(test, feature = "memory-loglet"))]
(ProviderKind::InMemory, _) => {
if opts.sequencer.is_some() || !opts.nodeset.is_empty() {
bail!("'sequencer' or 'nodeset' are only allowed with 'replicated' provider");
}
u64::from(next_loglet_id).to_string()
}
};

let extension = ChainExtension {
provider: provider.to_string(),
segment_index: Some(tail_index.into()),
params,
};

let request = SealAndExtendChainRequest {
log_id: opts.log_id,
min_version: Some(Version {
value: opts.min_version.get(),
}),
extension,
extension: Some(extension),
};

let response = connection
Expand Down Expand Up @@ -126,30 +158,14 @@ async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> any
Ok(())
}

async fn replicated_loglet_params(
connection: &ConnectionInfo,
fn replicated_loglet_params(
opts: &ReconfigureOpts,
loglet_id: LogletId,
tail: &Segment<'_>,
) -> anyhow::Result<String> {
let logs = connection.get_logs().await?;

let log_id = LogId::from(opts.log_id);
let chain = logs
.chain(&log_id)
.with_context(|| format!("Unknown log id '{log_id}'"))?;

let tail_index = opts
.segment_index
.map(SegmentIndex::from)
.unwrap_or(chain.tail_index());

let loglet_id = LogletId::new(log_id, tail_index.next());

let tail_segment = chain.tail();

let params = if tail_segment.config.kind == ProviderKind::Replicated {
let last_params =
ReplicatedLogletParams::deserialize_from(tail_segment.config.params.as_bytes())
.context("Last segment params in chain is invalid")?;
let params = if tail.config.kind == ProviderKind::Replicated {
let last_params = ReplicatedLogletParams::deserialize_from(tail.config.params.as_bytes())
.context("Last segment params in chain is invalid")?;

ReplicatedLogletParams {
loglet_id,
Expand All @@ -172,7 +188,7 @@ async fn replicated_loglet_params(
} else {
NodeSet::from_iter(opts.nodeset.iter().cloned())
},
replication: opts.replication.clone().context("Missing replication-factor. Replication factor is required if last segment is not of replicated type")?,
replication: opts.replication.clone().context("Missing replication. Replication factor is required if last segment is not of replicated type")?,
sequencer: opts.sequencer.context("Missing sequencer. Sequencer is required if last segment is not of replicated type")?,
}
};
Expand Down

0 comments on commit 45bf7c0

Please sign in to comment.