Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release fluvio 0.15.2 #358

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ lint: build


resync-ver:
rm -rf versioned_docs/version-0.15.1
rsync -aurv docs/ versioned_docs/version-0.15.1
rm -rf versioned_docs/version-0.15.2
rsync -aurv docs/ versioned_docs/version-0.15.2

sync-ver:
rsync -aurv docs/ versioned_docs/version-0.15.1
rsync -aurv docs/ versioned_docs/version-0.15.2
82 changes: 82 additions & 0 deletions news/this-week-in-fluvio-0071.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
title: "This Week in Fluvio #71"
date: 2025-02-06
weight: 20
---
Fluvio is a distributed, programmable streaming platform written in Rust.

---
We released **Fluvio 0.15.2** last week.

## New release
Fluvio **v0.15.2** is now available!

To update you can run `fvm update`

```bash
$ fvm update

info: Updating fluvio stable to version 0.15.2. Current version is 0.15.1.
info: Downloading (1/5): [email protected]
info: Downloading (2/5): [email protected]
info: Downloading (3/5): [email protected]
info: Downloading (4/5): [email protected]
info: Downloading (5/5): [email protected]
done: Installed fluvio version 0.15.2
done: Now using fluvio version 0.15.2

```

If you don't have Fluvio in your machine run:

```
curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
```

If you are enjoying Fluvio please share with your friends!

:::info
Also check out the Stateful Data Flow (SDF) streaming analytics in beta [SDF Examples](https://github.com/infinyon/stateful-dataflows-examples)
:::

## New features
Notable changes in this new version:


- Fixed the error when a record is larger than `max_request_size` parameter.
- Produce config builder is mutable now, allowing a better handling when have advanced conditionals and making it a lot easier to wrapper it.

See the [CHANGELOG] for details

## Good First Issues
We love our open source community contributors. Here are some issues that you could contribute to. All the best.

- [Improve fluvio topic describe with additional information]
- [Different default SPU port]
- [Delete the consumer when delete topic]
- [Remove localhost from fluvio in favor of 127.0.0.1]
- [When a topic is deleted, connected clients should have their connection closed]


---

Get in touch with us on [GitHub Discussions] or join [our Discord channel] and come say hello!

See some of the interesting community projects, examples, and utilities in the [Fluvio Community] GitHub org.


For the full list of changes this week, be sure to check out [our CHANGELOG].

[Fluvio open source]: https://github.com/infinyon/fluvio
[our CHANGELOG]: https://github.com/infinyon/fluvio/blob/master/CHANGELOG.md
[our Discord channel]: https://discordapp.com/invite/bBG2dTz
[GitHub Discussions]: https://github.com/infinyon/fluvio/discussions

[this form]: https://infinyon.com/request/ss-early-access/
[CHANGELOG]: https://github.com/infinyon/fluvio/blob/v0.15.2/CHANGELOG.md
[When a topic is deleted, connected clients should have their connection closed]: https://github.com/infinyon/fluvio/issues/3836
[Delete the consumer when delete topic]: https://github.com/infinyon/fluvio/issues/4308
[Remove localhost from fluvio in favor of 127.0.0.1]: https://github.com/infinyon/fluvio/issues/3866
[Improve fluvio topic describe with additional information]: https://github.com/infinyon/fluvio/issues/3968
[Different default SPU port]: https://github.com/infinyon/fluvio/issues/3739
[Fluvio Community]: https://github.com/fluvio-community
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
flate2 = "1.0.35"
fluvio = "0.24.4"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fluvio = "0.24.4"
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-auto";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-manual";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
stream.offset_commit().expect("offset commit failed");
stream.offset_flush().await.expect("offset flush failed");
}
}
28 changes: 28 additions & 0 deletions versioned_docs/version-0.15.2/_embeds/apis/rust/consumer-simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use async_std::stream::StreamExt;

use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::from_end(1))
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Read;
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use flate2::{bufread::GzEncoder, Compression};

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_file(
SmartModuleKind::Map,
"regex_text.wasm",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation from a wasm file
fn build_smartmodule_from_file(
kind: SmartModuleKind,
file_path: &str,
spec: &str
) -> SmartModuleInvocation {
// Read smartmodule wasm file
let raw_buffer = std::fs::read(file_path).expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer).expect("failed to read encoded wasm file");

// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: kind,
params: params,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::BTreeMap;
use async_std::stream::StreamExt;

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_name(
SmartModuleKind::Map,
"fluvio/[email protected]",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation using smartmodule name
fn build_smartmodule_from_name(
kind: SmartModuleKind,
smartmodule_name: &str,
spec: &str
) -> SmartModuleInvocation {
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(smartmodule_name.to_string()),
kind: kind,
params: params,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use fluvio::metadata::topic::TopicSpec;
use fluvio::Fluvio;

const TOPIC_NAME: &str = "hello-rust";
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;

// List topics
let topics = admin.all::<TopicSpec>().await.expect("Failed to list topics");
let topic_names = topics.iter().map(|topic| topic.name.clone()).collect::<Vec<String>>();

println!("Topics:\n - {}", topic_names.join("\n - "));
}
15 changes: 15 additions & 0 deletions versioned_docs/version-0.15.2/_embeds/apis/rust/producer-kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create key and value
let key = "Hello";
let value = "Fluvio";

// create producer & send key/value
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(key, value).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");

println!("Sent [{}] {}", key, value);
}
Loading