Skip to content

Commit

Permalink
Merge branch 'master' into Forward-Ordering-key
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored Sep 11, 2023
2 parents 311d402 + 2faad19 commit b19bd99
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 40 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ jobs:
- name: Build
run: cargo build --features protobuf-src
- name: Check Clippy
run: cargo clippy --tests --all-features -- -D warnings
run: |
cargo clippy --tests --features telemetry,protobuf-src -- -D warnings
cargo clippy --tests --no-default-features --features compression,tokio-rustls-runtime,async-std-rustls-runtime,auth-oauth2,telemetry,protobuf-src -- -D warnings
- name: Install nightly rustfmt
run: rustup toolchain install nightly --component rustfmt
- name: Check format
Expand Down
9 changes: 0 additions & 9 deletions CONTRIBUTING.md

This file was deleted.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ bit-vec = "^0.6.3"
futures = "^0.3.28"
futures-io = "^0.3.28"
native-tls = { version = "^0.2.11", optional = true }
rustls = { version = "^0.21.5", optional = true }
rustls = { version = "^0.21.6", optional = true }
webpki-roots = { version = "^0.25.1", optional = true }
pem = "^3.0.0"
tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true }
Expand Down Expand Up @@ -66,7 +66,7 @@ prost-build = "^0.11.9"
protobuf-src = { version = "1.1.0", optional = true }

[features]
default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"]
default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2" ]
compression = [ "lz4", "flate2", "zstd", "snap" ]
tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ]
tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ]
Expand Down
38 changes: 23 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
## pulsar-rs: Future-based Rust client for [Apache Pulsar](https://pulsar.apache.org/)
# pulsar-rs: Future-based Rust client for [Apache Pulsar](https://pulsar.apache.org/)

[![crates](https://img.shields.io/crates/v/pulsar.svg)](https://crates.io/crates/pulsar)
[![docs](https://img.shields.io/docsrs/pulsar)](https://docs.rs/pulsar)

[Documentation](https://docs.rs/pulsar)

This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with [Tokio](https://tokio.rs/) and [async-std](https://async.rs/).

Features:

- URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup
- multi topic consumers (based on a regex or list)
- TLS connection
- configurable executor (Tokio or async-std)
- automatic reconnection with exponential back off
- message batching
- compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)
- telemetry using [tracing](https://github.com/tokio-rs/tracing) crate (can be activated with Cargo features)
- URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup;
- Multi topic consumers (based on a regex or list);
- TLS connection;
- Configurable executor (Tokio or async-std);
- Automatic reconnection with exponential back off;
- Message batching;
- Compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features);
- Telemetry using [tracing](https://github.com/tokio-rs/tracing) crate (can be activated with Cargo features).

### Getting Started
## Getting Started

Add the following dependencies in your `Cargo.toml`:

Expand All @@ -34,7 +32,7 @@ Try out [examples](examples):
- [consumer](examples/consumer.rs)
- [reader](examples/reader.rs)

### Project Maintainers
## Project Maintainers

- [@CleverAkanoa](https://github.com/CleverAkanoa)
- [@DonghunLouisLee](https://github.com/DonghunLouisLee)
Expand All @@ -45,7 +43,7 @@ Try out [examples](examples):
- [@stearnsc](https://github.com/stearnsc)
- [@tisonkun](https://github.com/tisonkun)

### Contribution
## Contribution

This project welcomes your PR and issues. For example, refactoring, adding features, correcting English, etc.

Expand All @@ -55,8 +53,18 @@ Thanks to all the people who already contributed!
<img src="https://contributors-img.web.app/image?repo=streamnative/pulsar-rs" />
</a>

### License
## License

This library is licensed under the terms of both the MIT license and the Apache License (Version 2.0), and may include packages written by third parties which carry their own copyright notices and license terms.

See [LICENSE-APACHE](LICENSE-APACHE), [LICENSE-MIT](LICENSE-MIT), and [COPYRIGHT](COPYRIGHT) for details.

## History

This project is originally created by [@stearnsc](https://github.com/stearnsc) and others at [Wyyerd](https://github.com/wyyerd) at 2018. Later at 2022, the orginal creators [decided to transfer the repository to StreamNative](https://github.com/streamnative-oss/sn-pulsar-rs/issues/20).

Currently, this project is actively maintained under the StreamNative organization with a diverse [maintainers group](#project-maintainers).

## About StreamNative

Founded in 2019 by the original creators of Apache Pulsar, [StreamNative](https://streamnative.io/) is one of the leading contributors to the open-source Apache Pulsar project. We have helped engineering teams worldwide make the move to Pulsar with [StreamNative Cloud](https://streamnative.io/product), a fully managed service to help teams accelerate time-to-production.
5 changes: 3 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ impl<Exe: Executor> Connection<Exe> {
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
#[allow(unused_variables)] // allow_insecure_connection and tls_hostname_verification_enabled are native-tls only
async fn prepare_stream(
connection_id: Uuid,
address: SocketAddr,
Expand Down Expand Up @@ -964,7 +965,7 @@ impl<Exe: Executor> Connection<Exe> {
},
);

root_store.add_server_trust_anchors(trust_anchors.into_iter());
root_store.add_trust_anchors(trust_anchors.into_iter());
let config = rustls::ClientConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
Expand Down Expand Up @@ -1075,7 +1076,7 @@ impl<Exe: Executor> Connection<Exe> {
},
);

root_store.add_server_trust_anchors(trust_anchors.into_iter());
root_store.add_trust_anchors(trust_anchors.into_iter());
let config = rustls::ClientConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
Expand Down
16 changes: 8 additions & 8 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ impl Executor for TokioExecutor {
}

/// Wrapper for the async-std executor
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
#[derive(Clone, Debug)]
pub struct AsyncStdExecutor;

#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
impl Executor for AsyncStdExecutor {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn spawn(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<(), ()> {
Expand Down Expand Up @@ -153,7 +153,7 @@ pub enum JoinHandle<T> {
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
Tokio(tokio::task::JoinHandle<T>),
/// wrapper for async-std's `JoinHandle`
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
AsyncStd(async_std::task::JoinHandle<T>),
// here to avoid a compilation error since T is not used
#[cfg(all(
Expand All @@ -176,7 +176,7 @@ impl<T> Future for JoinHandle<T> {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => Poll::Ready(v.ok()),
},
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
JoinHandle::AsyncStd(j) => match Pin::new(j).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => Poll::Ready(Some(v)),
Expand All @@ -200,7 +200,7 @@ pub enum Interval {
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
Tokio(tokio::time::Interval),
/// wrapper for async-std's interval
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
AsyncStd(async_std::stream::Interval),
#[cfg(all(
not(feature = "tokio-runtime"),
Expand All @@ -226,7 +226,7 @@ impl Stream for Interval {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Some(())),
},
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
Interval::AsyncStd(j) => match Pin::new_unchecked(j).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => Poll::Ready(v),
Expand All @@ -251,7 +251,7 @@ pub enum Delay {
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
Tokio(tokio::time::Sleep),
/// wrapper around async-std's `Delay`
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
AsyncStd(Pin<Box<dyn Future<Output = ()> + Send + Sync>>),
}

Expand All @@ -267,7 +267,7 @@ impl Future for Delay {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(()),
},
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
Delay::AsyncStd(j) => match Pin::new_unchecked(j).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(()),
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ extern crate prost_derive;
#[macro_use]
extern crate serde;

#[cfg(all(features = "tokio-rustls-runtime", features = "tokio-runtime"))]
#[cfg(all(feature = "tokio-rustls-runtime", feature = "tokio-runtime"))]
compile_error!("You have selected both features \"tokio-rustls-runtime\" and \"tokio-runtime\" which are exclusive, please choose one of them");
#[cfg(all(features = "async-std-rustls-runtime", features = "async-std-runtime"))]
#[cfg(all(feature = "async-std-rustls-runtime", feature = "async-std-runtime"))]
compile_error!("You have selected both features \"async-std-rustls-runtime\" and \"async-std-runtime\" which are exclusive, please choose one of them");

pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage};
Expand All @@ -172,7 +172,7 @@ pub use connection_manager::{
};
pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions};
pub use error::Error;
#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))]
#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
pub use executor::AsyncStdExecutor;
pub use executor::Executor;
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
Expand Down

0 comments on commit b19bd99

Please sign in to comment.