Skip to content

Commit

Permalink
feat: support asyncio, bump version=0.3.6-ALPHA
Browse files Browse the repository at this point in the history
  • Loading branch information
CherishCai committed Mar 16, 2024
1 parent c350c1b commit 51293a4
Show file tree
Hide file tree
Showing 10 changed files with 652 additions and 82 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nacos-sdk-rust-binding-py"
version = "0.3.5"
version = "0.3.6-ALPHA"
edition = "2021"
license = "Apache-2.0"
publish = false
Expand All @@ -18,10 +18,13 @@ crate-type = ["cdylib"]
doc = false

[dependencies]
pyo3 = "0.18"
pyo3 = "0.20"
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
# for block api
futures = { version = "0.3", default-features = false, features = [] }

nacos-sdk = { version = "0.3.5", features = ["default"] }
#nacos-sdk = { git = "https://github.com/nacos-group/nacos-sdk-rust.git", features = ["default"] }
nacos-sdk = { version = "0.3.5", features = ["async"] }
#nacos-sdk = { git = "https://github.com/nacos-group/nacos-sdk-rust.git", features = ["async"] }

tracing-subscriber = { version = "0.3", features = ["default"] }
#tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time"] } # occur `<unknown time>`
Expand Down
66 changes: 66 additions & 0 deletions examples/async_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/python3

import asyncio
import nacos_sdk_rust_binding_py as nacos

client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos")

# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。
# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。
config_client = nacos.AsyncNacosConfigClient(client_options)


# 自定义配置监听的函数,接受的参数为 `nacos.NacosConfigResponse`
def listen_config(config_resp: nacos.NacosConfigResponse):
print(f"listen_config,config_resp={str(config_resp)}")
print(f"listen_config,config_resp.content={config_resp.content}")


async def main():
await asyncio.sleep(1)

data_id = "todo-dataid"
group = "LOVE"
publish_content = "test-content"

# 添加配置监听(对目标 data_id, group 配置变化的监听)
await config_client.add_listener(data_id, group, listen_config)

# 推送配置
await config_client.publish_config(data_id, group, publish_content)

await asyncio.sleep(1)

# 获取配置,返回值为 `nacos.NacosConfigResponse`
config_content_resp = await config_client.get_config_resp(data_id, group)

# 获取配置,返回值为 content: String
get_config_content = await config_client.get_config(data_id, group)

assert get_config_content == publish_content
assert config_content_resp.content == publish_content

print(f"get_config_content={get_config_content}")
print(f"config_content_resp={str(config_content_resp)},resp_content={config_content_resp.content}")

await asyncio.sleep(1)

# 推送配置,使配置监听函数被调用
await config_client.publish_config(data_id, group, "publish_content for listen_config")

# 等待一段时间供用户查看 Nacos 服务器上被监听的配置
await asyncio.sleep(300)

# 删除配置
await config_client.remove_config(data_id, group)

# 获取已删除的配置,会抛出异常
try:
get_config_content_removed = await config_client.get_config(data_id, group)
except RuntimeError:
print("config already be removed.")

await asyncio.sleep(10)

# 运行主任务
asyncio.run(main())
55 changes: 55 additions & 0 deletions examples/async_naming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/python3

import asyncio
import nacos_sdk_rust_binding_py as nacos

client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos")

# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。
# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。
naming_client = nacos.AsyncNacosNamingClient(client_options)


# 自定义服务订阅函数,接受的参数为 `nacos.NacosConfigResponse`
def subscribe_instances(instances: [nacos.NacosServiceInstance]):
print(f"subscribe_instances,instances={str(instances)}")
for ins in instances:
print(f"subscribe_instances,instances[x].ip={ins.ip}")


async def main():
await asyncio.sleep(1)

service_name = "todo-service-name"
group = "dev"
service_instance = nacos.NacosServiceInstance("127.0.0.1", 8080)

# 添加服务订阅(对目标 service_name, group 的服务实例变化的监听)
await naming_client.subscribe(service_name, group, None, subscribe_instances)

await asyncio.sleep(1)

# 注册服务实例
await naming_client.register_instance(service_name, group, service_instance)

await asyncio.sleep(1)

# 获取服务实例列表
get_instances = await naming_client.get_all_instances(service_name, group)

assert len(get_instances) > 0
assert get_instances[0].ip == service_instance.ip

print(f"get_instances={str(get_instances)}")
for i in get_instances:
print(f"get_instances[x].ip={i.ip}")

# 批量注册服务实例,可使前面的配置监听函数被调用
service_instance2 = nacos.NacosServiceInstance("127.0.0.2", 8080)
await naming_client.batch_register_instance(service_name, group, [service_instance, service_instance2])

# 等待一段时间
await asyncio.sleep(300)

# 运行主任务
asyncio.run(main())
2 changes: 2 additions & 0 deletions examples/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos")

# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。
# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。
config_client = nacos.NacosConfigClient(client_options)

time.sleep(1)
Expand Down
2 changes: 2 additions & 0 deletions examples/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos")

# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。
# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。
naming_client = nacos.NacosNamingClient(client_options)

time.sleep(1)
Expand Down
170 changes: 170 additions & 0 deletions src/async_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#![deny(clippy::all)]

use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::{pyclass, pymethods, PyAny, PyErr, PyResult, Python, ToPyObject};
use pyo3_asyncio::tokio::future_into_py;

use std::sync::Arc;

use crate::config::{transfer_conf_resp, NacosConfigChangeListener};

/// Async Client api of Nacos Config.
#[pyclass(module = "nacos_sdk_rust_binding_py")]
pub struct AsyncNacosConfigClient {
inner: Arc<dyn nacos_sdk::api::config::ConfigService + Send + Sync + 'static>,
}

#[pymethods]
impl AsyncNacosConfigClient {
/// Build a Config Client.
#[new]
pub fn new(client_options: crate::ClientOptions) -> PyResult<Self> {
// print to console or file
let _ = crate::init_logger();

let props = nacos_sdk::api::props::ClientProps::new()
.server_addr(client_options.server_addr)
.namespace(client_options.namespace)
.app_name(
client_options
.app_name
.unwrap_or(nacos_sdk::api::constants::UNKNOWN.to_string()),
);

// need enable_auth_plugin_http with username & password
let is_enable_auth = client_options.username.is_some() && client_options.password.is_some();

let props = if is_enable_auth {
props
.auth_username(client_options.username.unwrap())
.auth_password(client_options.password.unwrap())
} else {
props
};

let config_service_builder = if is_enable_auth {
nacos_sdk::api::config::ConfigServiceBuilder::new(props).enable_auth_plugin_http()
} else {
nacos_sdk::api::config::ConfigServiceBuilder::new(props)
};

let config_service = config_service_builder
.build()
.map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?;

Ok(Self {
inner: Arc::new(config_service),
})
}

/// Get config's content.
/// If it fails, pay attention to err
pub fn get_config<'p>(
&self,
py: Python<'p>,
data_id: String,
group: String,
) -> PyResult<&'p PyAny> {
let this = self.inner.clone();
future_into_py(py, async move {
let config_resp = this
.get_config(data_id, group)
.await
.map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?;
Ok(transfer_conf_resp(config_resp).content)
})
}

/// Get NacosConfigResponse.
/// If it fails, pay attention to err
pub fn get_config_resp<'p>(
&self,
py: Python<'p>,
data_id: String,
group: String,
) -> PyResult<&'p PyAny> {
let this = self.inner.clone();
future_into_py(py, async move {
let config_resp = this
.get_config(data_id, group)
.await
.map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?;
Ok(transfer_conf_resp(config_resp))
})
}

/// Publish config.
/// If it fails, pay attention to err
pub fn publish_config<'p>(
&self,
py: Python<'p>,
data_id: String,
group: String,
content: String,
) -> PyResult<&'p PyAny> {
let this = self.inner.clone();
future_into_py(py, async move {
this.publish_config(data_id, group, content, None)
.await
.map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))
})
}

/// Remove config.
/// If it fails, pay attention to err
pub fn remove_config<'p>(
&self,
py: Python<'p>,
data_id: String,
group: String,
) -> PyResult<&'p PyAny> {
let this = self.inner.clone();
future_into_py(py, async move {
this.remove_config(data_id, group)
.await
.map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))
})
}

/// Add NacosConfigChangeListener callback func, which listen the config change.
/// If it fails, pay attention to err
#[pyo3(signature = (data_id, group, listener))]
pub fn add_listener<'p>(
&self,
py: Python<'p>,
data_id: String,
group: String,
listener: &PyAny, // PyFunction arg: <NacosConfigResponse>
) -> PyResult<&'p PyAny> {
if !listener.is_callable() {
return Err(PyErr::new::<PyValueError, _>(
"Arg `listener` must be a callable",
));
}
let listen_wrap = Arc::new(NacosConfigChangeListener {
func: Arc::new(listener.to_object(py)),
});
let this = self.inner.clone();
future_into_py(py, async move {
this.add_listener(data_id, group, listen_wrap)
.await
.map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?;
Ok(())
})
}

/// Remove NacosConfigChangeListener callback func, but noop....
/// The logic is not implemented internally, and only APIs are provided as compatibility.
/// Users maybe do not need it? Not removing the listener is not a big problem, Sorry!
#[pyo3(signature = (data_id, group, listener))]
#[allow(unused_variables)]
pub fn remove_listener<'p>(
&self,
py: Python<'p>,
data_id: String,
group: String,
listener: &PyAny, // PyFunction arg: <NacosConfigResponse>
) -> PyResult<&'p PyAny> {
future_into_py(py, async { Ok(()) })
}
}
Loading

0 comments on commit 51293a4

Please sign in to comment.