Skip to content

Commit

Permalink
Fix some ci errors
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Feb 21, 2023
1 parent 3bc661c commit 491a903
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .github/checks/copyright.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ count=0

for file in $(find . -name '*.rs' | grep -v '/target')
do
if ! grep 'Copyright \(....-\)\?202[12], The Tremor Team' "$file" > /dev/null
if ! grep 'Copyright \(....-\)\?202[123], The Tremor Team' "$file" > /dev/null
then
echo "##[error] Copyright missing in $file"
count=$((count + 1))
Expand Down
2 changes: 1 addition & 1 deletion .github/checks/safety.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ EOF



files=$(find . -name '*.rs' | grep -v -f .checkignore | grep -v 'test.rs$')
files=$(find . -name '*.rs' | grep -v -f .checkignore | grep -v 'test.rs$' | grep -v '/test/')

while getopts hamuiprebldxcft opt; do
case $opt in
Expand Down
13 changes: 0 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ url = "2.3"
value-trait = "0.5"
zstd = "*"
axum = "0.6"
axum-macros = "0.3"

# blaster / blackhole
hdrhistogram = "7"
Expand Down
16 changes: 15 additions & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod api;
pub mod archive;
pub mod network;
Expand Down Expand Up @@ -43,7 +57,7 @@ pub enum ClusterError {
Config(ConfigError),
Client(Error),
JoinError(JoinError),
// FIXME: this is a horrible hack
// TODO: this is a horrible hack
Runtime(Mutex<crate::Error>),
}

Expand Down
12 changes: 10 additions & 2 deletions src/raft/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ impl Tremor {
}
}
Ok(result)
} else {
let err = resp.error_for_status_ref().expect_err("FIXME");
} else if let Err(err) = resp.error_for_status_ref() {
error!(
"Received {} with body: {}",
resp.status(),
resp.text().await?
);
Err(Error::HTTP(err))
} else {
Err("Heisenerror, not error nor success".into())
}
}
}
Expand Down Expand Up @@ -426,12 +427,14 @@ Snapshot:
#[derive(Debug)]
pub enum Error {
HTTP(reqwest::Error),
Other(String),
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::HTTP(e) => e.fmt(f),
Self::Other(e) => e.fmt(f),
}
}
}
Expand All @@ -443,3 +446,8 @@ impl From<reqwest::Error> for Error {
Self::HTTP(e)
}
}
impl<'s> From<&'s str> for Error {
fn from(e: &'s str) -> Self {
Self::Other(e.into())
}
}
2 changes: 1 addition & 1 deletion src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Running {
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 1 per IP.
// FIXME .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip())
// TODO .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip())
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
Expand Down
7 changes: 5 additions & 2 deletions src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ fn id_to_bin(id: u64) -> Result<Vec<u8>, Error> {
}

fn bin_to_id(buf: &[u8]) -> Result<u64, Error> {
Ok((&buf[0..8]).read_u64::<BigEndian>()?)
Ok(buf
.get(0..8)
.ok_or_else(|| Error::Other(format!("Invalid buffer length: {}", buf.len()).into()))?
.read_u64::<BigEndian>()?)
}

#[derive(Debug)]
Expand All @@ -204,7 +207,7 @@ pub enum Error {
RocksDB(rocksdb::Error),
Io(std::io::Error),
Storage(openraft::StorageError),
// FIXME: this is horrid, aaaaaahhhhh!
// TODO: this is horrid, aaaaaahhhhh!
Tremor(Mutex<RuntimeError>),
TremorScript(Mutex<tremor_script::errors::Error>),
MissingApp(AppId),
Expand Down
2 changes: 0 additions & 2 deletions src/raft/store/statemachine/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,11 @@ impl AppsStateMachine {
.map_err(store_w_err)?;

// deploy the flow but don't start it yet
dbg!("DEPLOY");
self.world
.deploy_flow(app_id.clone(), &deploy)
.await
.map_err(sm_w_err)?;
// change the flow state to the intended state
dbg!("START");
self.world
.change_flow_state(instance, intended_state)
.await
Expand Down
1 change: 0 additions & 1 deletion src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ impl Runtime {
app_id: AppId,
flow: &ast::DeployFlow<'static>,
) -> Result<FlowInstanceId> {
// FIXME: return a FlowInstanceId here
let (tx, rx) = oneshot::channel();
self.flows
.send(flow_supervisor::Msg::DeployFlow {
Expand Down
90 changes: 23 additions & 67 deletions tremor-cli/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ async fn handle_signals(

impl Cluster {
/// Run the cluster command
#[allow(clippy::too_many_lines)] // FIXME
pub(crate) async fn run(self) -> Result<()> {
match self.command {
// rm -r temp/test-db*; cargo run -p tremor-cli -- cluster bootstrap --db-dir temp/test-db1 --api 127.0.0.1:8001 --rpc 127.0.0.1:9001
Expand Down Expand Up @@ -232,11 +231,10 @@ impl Cluster {
}

impl AppsCommands {
#[allow(clippy::too_many_lines)] // FIXME
pub(crate) async fn run(self, api: &str) -> Result<()> {
let client = Client::new(api)?;
match self {
AppsCommands::List { json } => {
let client = Client::new(api)?;
let r = client.list().await.map_err(|e| format!("error: {e}"))?;
if json {
println!("{}", serde_json::to_string_pretty(&r)?);
Expand All @@ -247,106 +245,64 @@ impl AppsCommands {
}
}
AppsCommands::Install { file } => {
let client = Client::new(api)?;
let mut file = file::open(&file).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let res = client
.install(&buf)
.await
.map_err(|e| format!("error: {e}"));

match res {
match client.install(&buf).await {
Ok(app_id) => println!("Application `{app_id}` successfully installed",),
Err(e) => eprintln!("Application failed to install: {e:?}"),
Err(e) => eprintln!("Application failed to install: {e}"),
}
}
AppsCommands::Uninstall { app } => {
let client = Client::new(api)?;
let app_id = AppId(app);
let res = client
.uninstall_app(&app_id)
.await
.map_err(|e| format!("error: {e}"));

if let Err(e) = res {
eprintln!("Application `{app_id}` failed to be uninstalled: {e:?}");
} else {
println!("App `{app_id}` successfully uninstalled",);
match client.uninstall_app(&app_id).await {
Ok(app_id) => println!("App `{app_id}` successfully uninstalled",),
Err(e) => eprintln!("Application `{app_id}` failed to be uninstalled: {e}"),
}
}

AppsCommands::Start {
app,
flow,
instance,
config,
paused,
} => {
let client = Client::new(api)?;
let config: HashMap<String, OwnedValue> = if let Some(config) = config {
serde_json::from_str(&config)?
} else {
HashMap::default()
};
let flow =
flow.map_or_else(|| FlowDefinitionId("main".to_string()), FlowDefinitionId);
let config: HashMap<String, OwnedValue> =
config.map_or_else(|| Ok(HashMap::new()), |c| serde_json::from_str(&c))?;
let flow = flow.map_or_else(|| FlowDefinitionId::from("main"), FlowDefinitionId);
let app_id = AppId(app);
let instance_id = FlowInstanceId::new(app_id, instance);
let running = !paused;
let res = client
.start(&flow, &instance_id, config, running)
.await
.map_err(|e| format!("error: {e}"));

if let Err(e) = res {
eprintln!("Instance `{instance_id}` failed to start: {e:?}");
} else {
println!("Instance `{instance_id}` successfully started",);
match client.start(&flow, &instance_id, config, !paused).await {
Ok(instance_id) => println!("Instance `{instance_id}` successfully started",),
Err(e) => eprintln!("Instance `{instance_id}` failed to start: {e}"),
}
}

AppsCommands::Stop { app, instance } => {
let client = Client::new(api)?;
let instance_id = FlowInstanceId::new(app, instance);
let res = client
.stop_instance(&instance_id)
.await
.map_err(|e| format!("error: {e}"));
if let Err(e) = res {
eprintln!("Instance `{instance_id}` failed to stop: {e:?}");
} else {
println!("Instance `{instance_id}` stopped",);
match client.stop_instance(&instance_id).await {
Ok(instance_id) => println!("Instance `{instance_id}` stopped"),
Err(e) => eprintln!("Instance `{instance_id}` failed to stop: {e}"),
}
}
AppsCommands::Pause { app, instance } => {
let client = Client::new(api)?;
let app_id = AppId(app);
let instance_id = FlowInstanceId::new(app_id, instance);
let res = client
match client
.change_instance_state(&instance_id, TremorInstanceState::Pause)
.await
.map_err(|e| format!("error: {e}"));

if let Err(e) = res {
eprintln!("Instance `{instance_id}` failed to pause: {e:?}");
} else {
println!("Instance `{instance_id}` successfully paused",);
{
Ok(instance_id) => println!("Instance `{instance_id}` successfully paused"),
Err(e) => eprintln!("Instance `{instance_id}` failed to pause: {e}"),
}
}
AppsCommands::Resume { app, instance } => {
let client = Client::new(api)?;
let instance_id = FlowInstanceId::new(app, instance);

let res = client
match client
.change_instance_state(&instance_id, TremorInstanceState::Resume)
.await
.map_err(|e| format!("error: {e}"));

if let Err(e) = res {
eprintln!("Instance `{instance_id}` failed to resume: {e:?}");
} else {
println!("Instance `{instance_id}` successfully resumed",);
{
Ok(instance_id) => println!("Instance `{instance_id}` successfully resumed"),
Err(e) => eprintln!("Instance `{instance_id}` failed to resume: {e}"),
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions tremor-script/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ lazy_static::lazy_static! {
};
}

/// FIXME: for cluster undeploy we need to be able to delete parts of the arena

/// Memory arena for source to get static lifeimtes

#[derive(Debug)]
Expand Down

0 comments on commit 491a903

Please sign in to comment.