Skip to content

Commit

Permalink
Obey clippy
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Sep 15, 2023
1 parent dbc470a commit 9eb11f5
Show file tree
Hide file tree
Showing 33 changed files with 83 additions and 76 deletions.
2 changes: 1 addition & 1 deletion src/codec/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Codec for Csv {
}?;

let mut fields = vec![];
for field in record.iter() {
for field in &record {
fields.push(Value::String(Cow::from(field.to_string())));
}

Expand Down
6 changes: 3 additions & 3 deletions src/codec/influx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ mod tests {
#[test]
pub fn encode_mixed_bag() {
let s: Value = literal!({
"measurement": r#"wea,\ ther"#,
"measurement": r"wea,\ ther",
"tags": {},
"fields": {"temp=erature": 82.0, r#"too\ \\\"hot""#: true},
"timestamp": 1_465_839_830_100_400_200_i64
Expand Down Expand Up @@ -215,7 +215,7 @@ mod tests {
literal!({
"measurement": "weather",
"tags": {"location": "us-midwest"},
"fields": {"temperature_str": r#"too hot\cold"#},
"fields": {"temperature_str": r"too hot\cold"},
"timestamp": 1_465_839_830_100_400_200_i64
}),
"case 7"
Expand All @@ -226,7 +226,7 @@ mod tests {
literal!({
"measurement": "weather",
"tags": {"location": "us-midwest"},
"fields": {"temperature_str": r#"too hot\\cold"#},
"fields": {"temperature_str": r"too hot\\cold"},
"timestamp": 1_465_839_830_100_400_204_i64
}),
"case 8"
Expand Down
2 changes: 1 addition & 1 deletion src/codec/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
))
})?;
let mut elem = String::with_capacity(16);
for (id, params) in sd.iter() {
for (id, params) in sd {
elem.push('[');
elem.push_str(id);
let params = params.as_array().ok_or_else(|| {
Expand Down
4 changes: 3 additions & 1 deletion src/codec/tremor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Tremor {
#[inline]
fn write_object<E: ByteOrder>(o: &Object, w: &mut impl Write) -> Result<()> {
Self::write_type_and_len::<E>(Self::OBJECT, o.len(), w)?;
for (k, v) in o.iter() {
for (k, v) in o {
w.write_u64::<E>(k.len() as u64)?;
w.write_all(k.as_bytes())?;
Tremor::encode_::<E>(v, w)?;
Expand Down Expand Up @@ -383,6 +383,8 @@ mod test {

use proptest::prelude::*;

// ALLOW: This is a test
#[allow(clippy::arc_with_non_send_sync)]
fn arb_tremor_value() -> BoxedStrategy<Value<'static>> {
let leaf = prop_oneof![
Just(Value::Static(StaticNode::Null)),
Expand Down
4 changes: 2 additions & 2 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ async fn connector_task(
connector_addr.send_sink(SinkMsg::Pause).await?;

connector_state = State::Paused;
quiescence_beacon.pause();
quiescence_beacon.pause()?;

info!("{ctx} Paused.");
}
Expand All @@ -757,7 +757,7 @@ async fn connector_task(
info!("{ctx} Resuming...");
ctx.swallow_err(connector.on_resume(&ctx).await, "Error during on_resume");
connector_state = State::Running;
quiescence_beacon.resume();
quiescence_beacon.resume()?;

connector_addr.send_source(SourceMsg::Resume)?;
connector_addr.send_sink(SinkMsg::Resume).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl ClickhouseSink {
.as_object()
.ok_or_else(|| Error::from(ErrorKind::ExpectedObjectEvent(input.value_type())))?;

for (column_name, expected_type) in columns.iter() {
for (column_name, expected_type) in columns {
// If the value is not present, then we can replace it by null.
const NULL: &Value = &Value::const_null();
let cell = object.get(column_name.as_str()).unwrap_or(NULL);
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ impl Sink for ElasticSink {
let default_index = self.config.index.clone();
let task_ctx = ctx.clone();
task::spawn(async move {
let r: Result<Value> = (|| async {
let r: Result<Value> = async {
// build bulk request (we can't do that in a separate function)
let mut ops = BulkOperations::new();
// per request options - extract from event metadata (ignoring batched)
Expand All @@ -669,7 +669,7 @@ impl Sink for ElasticSink {
.and_then(Response::error_for_status_code)?;
let value = response.json::<StaticValue>().await?;
Ok(value.into_value())
})()
}
.await;
match r {
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/gcl/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Config {
let mut labels = HashMap::new();

if let Some(has_meta) = meta.get_object("labels") {
for (k, v) in has_meta.iter() {
for (k, v) in has_meta {
labels.insert(k.to_string(), v.to_string());
}
}
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/gcs/resumable_upload_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl<TClient: HttpClientTrait, TBackoffStrategy: BackoffStrategy + Send + Sync>
})
.await?;
let status = response.status();
if status.is_server_error() {}
let mut data: Vec<u8> = Vec::new();
while let Some(chunk) = response.data().await.transpose()? {
data.extend_from_slice(&chunk);
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl Sink for KafkaProducerSink {
}
if let Some(headers_obj) = kafka_meta.get_object("headers") {
let mut headers = OwnedHeaders::new_with_capacity(headers_obj.len());
for (k, v) in headers_obj.iter() {
for (k, v) in headers_obj {
// supporting string or bytes as headers value
if let Some(v_bytes) = v.as_bytes() {
headers = headers.insert(Header {
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ pub(crate) fn instrumentation_library_metrics_to_pb(
.as_array()
.ok_or("Invalid json mapping for InstrumentationLibraryMetrics")?;
let mut pb = Vec::with_capacity(data.len());
for data in data.iter() {
for data in data {
let mut metrics = Vec::new();
if let Some(data) = data.get_array("metrics") {
for metric in data {
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/tests/clickhouse/more_complex_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async fn test() -> Result<()> {
g: i32 = -32_000,
h: i64 = -33_000,
i: &str = "hello",
j: DateTime<Tz> = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp_opt(1_634_400_000, 0).expect("valid timestamp literal"), Utc),
j: DateTime<Tz> = DateTime::<Utc>::from_naive_utc_and_offset(NaiveDateTime::from_timestamp_opt(1_634_400_000, 0).expect("valid timestamp literal"), Utc),
// k: DateTime<Tz> = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(1_634_400_000, 0), Utc),
// l: DateTime<Tz> = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(1_634_400_000, 0), Utc),
// m: DateTime<Tz> = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(1_634_400_000, 0), Utc),
Expand Down
6 changes: 3 additions & 3 deletions src/connectors/tests/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn fake_server_dispatch(
}

impl TestHttpServer {
async fn new(raw_url: String) -> Result<Self> {
fn new(raw_url: String) -> Self {
let mut instance = TestHttpServer { acceptor: None };
instance.acceptor = Some(spawn(async move {
let url: Url<HttpDefaults> = Url::parse(&raw_url)?;
Expand Down Expand Up @@ -118,7 +118,7 @@ impl TestHttpServer {
};
Ok(())
}));
Ok(instance)
instance
}

fn stop(&mut self) {
Expand Down Expand Up @@ -153,7 +153,7 @@ async fn rtt(
let defn = literal!({
"config": config,
});
let mut fake = TestHttpServer::new(url.clone()).await?;
let mut fake = TestHttpServer::new(url.clone());
let mut harness = ConnectorHarness::new(
function_name!(),
&http_impl::client::Builder::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl EchoServer {
Ok(())
}

pub(crate) async fn run(&mut self) -> Result<()> {
pub(crate) fn run(&mut self) -> Result<()> {
let server_run = self.i_shall_run.clone();
let mut conns = vec![];
let addr = self.addr.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/tests/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn tcp_client_test(use_tls: bool) -> Result<()> {

// simple echo server
let mut echo_server = EchoServer::new(server_addr.clone(), use_tls);
echo_server.run().await?;
echo_server.run()?;
let tls_config = if use_tls {
literal!({
"cafile": "./tests/localhost.cert",
Expand Down
14 changes: 6 additions & 8 deletions src/connectors/tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl TestClient<WebSocket<MaybeTlsStream<std::net::TcpStream>>> {

fn send(&mut self, data: &str) -> Result<()> {
self.client
.write_message(Message::Text(data.into()))
.send(Message::Text(data.into()))
.chain_err(|| "Failed to send to ws server")
}

Expand All @@ -154,7 +154,7 @@ impl TestClient<WebSocket<MaybeTlsStream<std::net::TcpStream>>> {
}

fn expect(&mut self) -> Result<ExpectMessage> {
match self.client.read_message() {
match self.client.read() {
Ok(Message::Text(data)) => Ok(ExpectMessage::Text(data)),
Ok(Message::Binary(data)) => Ok(ExpectMessage::Binary(data)),
Ok(other) => Ok(ExpectMessage::Unexpected(other)),
Expand All @@ -169,7 +169,7 @@ impl TestClient<WebSocket<MaybeTlsStream<std::net::TcpStream>>> {
reason: "WS Test client closing.".into(),
}))?;
// finish closing handshake
self.client.write_pending()?;
self.client.flush()?;
info!("WS test client closed.");
Ok(())
}
Expand Down Expand Up @@ -215,7 +215,7 @@ impl TestServer {
}
}

async fn start(&mut self) -> Result<()> {
fn start(&mut self) {
let endpoint = self.endpoint.clone();
let tx = self.tx.clone();
let stopped = self.stopped.clone();
Expand All @@ -235,8 +235,6 @@ impl TestServer {
}
info!("Test Server stopped.");
});

Ok(())
}

fn stop(&mut self) {
Expand Down Expand Up @@ -366,7 +364,7 @@ async fn ws_client_binary_routing() -> Result<()> {

let free_port = find_free_tcp_port().await?;
let mut ts = TestServer::new("127.0.0.1", free_port);
ts.start().await?;
ts.start();

let defn = literal!({
"codec": "json",
Expand Down Expand Up @@ -415,7 +413,7 @@ async fn ws_client_text_routing() -> Result<()> {

let free_port = find_free_tcp_port().await?;
let mut ts = TestServer::new("127.0.0.1", free_port);
ts.start().await?;
ts.start();

let defn = literal!({
"codec": "json",
Expand Down
28 changes: 14 additions & 14 deletions src/connectors/utils/quiescence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::errors::Result;
use event_listener::Event;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -88,36 +89,35 @@ impl QuiescenceBeacon {
}

/// pause both reading and writing
pub fn pause(&mut self) {
if self
.0
pub fn pause(&mut self) -> Result<()> {
self.0
.state
.compare_exchange(
Inner::RUNNING,
Inner::PAUSED,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_err()
{}
.map_err(|_| "failed to pause")?;
Ok(())
}

/// Resume both reading and writing.
///
/// Has no effect if not currently paused.
pub fn resume(&mut self) {
if self
.0
pub fn resume(&mut self) -> Result<()> {
self.0
.state
.compare_exchange(
Inner::PAUSED,
Inner::RUNNING,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_err()
{}
.map_err(|_| "failed to resume")?;

self.0.resume_event.notify(Self::MAX_LISTENERS); // we might have been paused, so notify here
Ok(())
}

/// notify consumers of this beacon that reading and writing should be stopped
Expand All @@ -142,7 +142,7 @@ mod tests {
assert!(beacon.continue_reading().await);
assert!(beacon.continue_writing().await);

ctrl_beacon.pause();
ctrl_beacon.pause()?;

let timeout_ms = Duration::from_millis(50);
let read_future = beacon.continue_reading();
Expand All @@ -154,7 +154,7 @@ mod tests {
// no progress for writing while being paused
assert_eq!(futures::poll!(write_future.as_mut()), Poll::Pending);

ctrl_beacon.resume();
ctrl_beacon.resume()?;

// future created during pause will be picked up and completed after resume only
assert_eq!(futures::poll!(read_future.as_mut()), Poll::Ready(true));
Expand All @@ -168,7 +168,7 @@ mod tests {
assert!(timeout(timeout_ms, beacon.continue_writing()).await?);

// a resume after stopping reading has no effect
ctrl_beacon.resume();
ctrl_beacon.resume()?;
// don't continue reading when stopped reading
assert!(!timeout(timeout_ms, beacon.continue_reading()).await?);
// writing is still fine
Expand All @@ -181,7 +181,7 @@ mod tests {
assert!(!timeout(timeout_ms, beacon.continue_writing()).await?);

// a resume after a full stop has no effect
ctrl_beacon.resume();
ctrl_beacon.resume()?;
// no reading upon full stop
assert!(!timeout(timeout_ms, beacon.continue_reading()).await?);
// no writing upon full stop
Expand Down
13 changes: 6 additions & 7 deletions src/system/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,10 @@ impl Flow {

let addr = spawn_task(
flow_alias.clone(),
pipelines,
&pipelines,
connectors,
&flow.defn.connections,
)
.await?;
);

addr.send(Msg::Start).await?;

Expand Down Expand Up @@ -394,12 +393,12 @@ async fn link(

/// task handling flow instance control plane
#[allow(clippy::too_many_lines)]
async fn spawn_task(
fn spawn_task(
id: Alias,
pipelines: HashMap<String, pipeline::Addr>,
pipelines: &HashMap<String, pipeline::Addr>,
connectors: HashMap<String, connectors::Addr>,
links: &[ConnectStmt],
) -> Result<Addr> {
) -> Addr {
#[derive(Debug)]
/// wrapper for all possible messages handled by the flow task
enum MsgWrapper {
Expand Down Expand Up @@ -713,7 +712,7 @@ async fn spawn_task(
info!("{prefix} Stopped.");
Result::Ok(())
});
Ok(addr)
addr
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 9eb11f5

Please sign in to comment.