Skip to content

Commit

Permalink
Merge pull request #80 from wenig/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
wenig authored Nov 13, 2023
2 parents c4bd11c + 679ffeb commit a3c134b
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 55 deletions.
13 changes: 8 additions & 5 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ authors:
- family-names: Wenig
given-names: Phillip
orcid: https://orcid.org/0000-0002-8942-4322
title: "Actix-Telepathy"
version: 0.5.3
# doi: ...
date-released: 2022-08-09
url: "https://github.com/wenig/actix-telepathy"
- family-names: Papenbrock
given-names: Thorsten
orcid: https://orcid.org/0000-0002-4019-8221
title: Actix-Telepathy
doi: 10.1145/3623506.3623575
booktitle: Proceedings of the 10th ACM SIGPLAN International Workshop on Reactive and Event-Based Languages and Systems
year: 2023
url: https://github.com/wenig/actix-telepathy
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "actix-telepathy"
version = "0.5.4"
version = "0.5.5"
authors = ["wenig <[email protected]>"]
edition = "2018"
license = "Apache-2.0"
Expand Down Expand Up @@ -32,7 +32,7 @@ rayon = "1.5.0"
futures-sink = "0.3.21"

[dependencies]
actix_telepathy_derive = { version = "0.3.2", optional = true }
actix_telepathy_derive = { version = "0.3.3", optional = true }

log = "0.4"
env_logger = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![crates.io](https://img.shields.io/crates/v/actix-telepathy?label=latest)](https://crates.io/crates/actix-telepathy)
![Tests on main](https://github.com/wenig/actix-telepathy/workflows/Rust/badge.svg)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Dependency Status](https://deps.rs/crate/actix-telepathy/0.5.4/status.svg)](https://deps.rs/crate/actix-telepathy/0.5.4)
[![Dependency Status](https://deps.rs/crate/actix-telepathy/0.5.5/status.svg)](https://deps.rs/crate/actix-telepathy/0.5.5)
![Downloads](https://img.shields.io/crates/d/actix-telepathy.svg)

# Actix Telepathy
Expand Down Expand Up @@ -29,7 +29,7 @@ So far, we only support single seed nodes. Connecting to different seed nodes ca
```toml
[dependencies]
actix = "0.13.1"
actix-telepathy = "0.5.4"
actix-telepathy = "0.5.5"
```

### main.rs
Expand Down
2 changes: 1 addition & 1 deletion actix-telepathy-derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "actix_telepathy_derive"
version = "0.3.2"
version = "0.3.3"
authors = ["wenig <[email protected]>"]
edition = "2018"
license = "Apache-2.0"
Expand Down
54 changes: 13 additions & 41 deletions actix-telepathy-derive/src/remote_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use proc_macro::TokenStream;
use quote::quote;
use serde_derive::{Deserialize, Serialize};
use std::fs::File;
use syn::parse::Parser;
use syn::{parse_macro_input, DeriveInput, Result};
type AttributeArgs = syn::punctuated::Punctuated<syn::Meta, syn::Token![,]>;

const TELEPATHY_CONFIG_FILE: &str = "telepathy.yaml";
const WITH_SOURCE: &str = "with_source";
Expand Down Expand Up @@ -47,7 +45,7 @@ pub fn remote_message_macro(input: TokenStream) -> TokenStream {
Some(source) => {
let attr = source.clone().unwrap();
quote! {
self.#attr.network_interface = Some(addr);
self.#attr.node.network_interface = Some(addr);
}
}
None => quote! {},
Expand Down Expand Up @@ -99,44 +97,18 @@ fn get_with_source_attr(ast: &DeriveInput) -> Result<Vec<Option<syn::Type>>> {
});

match attr {
Some(a) => {
if let syn::Meta::List(ref list) = a {
let parser = AttributeArgs::parse_terminated;
let args = match parser.parse2(list.tokens.clone()) {
Ok(args) => args,
Err(_) => {
return Err(syn::Error::new_spanned(
a,
format!(
"The correct syntax is #[{}(Message, Message, ...)]",
WITH_SOURCE
),
))
}
};
Ok(args.iter().map(|m| meta_item_to_struct(m).ok()).collect())
} else {
Err(syn::Error::new_spanned(
a,
format!(
"The correct syntax is #[{}(Message, Message, ...)]",
WITH_SOURCE
),
))
}
}
None => Ok(vec![]),
}
}

fn meta_item_to_struct(meta_item: &syn::Meta) -> syn::Result<syn::Type> {
match meta_item {
syn::Meta::Path(ref path) => match path.get_ident() {
Some(ident) => syn::parse_str::<syn::Type>(&ident.to_string())
.map_err(|_| syn::Error::new_spanned(ident, "Expect Message")),
None => Err(syn::Error::new_spanned(path, "Expect Message")),
Some(a) => match a {
syn::Meta::Path(path) => match path.get_ident() {
Some(ident) => syn::parse_str::<syn::Type>(&ident.to_string())
.map(|ty| vec![Some(ty)])
.map_err(|_| syn::Error::new_spanned(ident, "Expect type")),
None => Err(syn::Error::new_spanned(path, "Expect type")),
},
_ => Err(syn::Error::new_spanned(
a,
format!("The correct syntax is #[{}(<RemoteAddr>)]", WITH_SOURCE),
)),
},
syn::Meta::NameValue(val) => Err(syn::Error::new_spanned(val, "Expect Message")),
meta => Err(syn::Error::new_spanned(meta, "Expect type")),
None => Ok(vec![]),
}
}
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@
//! Now, every new member receives a `MyMessage` from every [ClusterListener](./trait.ClusterListener.html) in the cluster.
//!
//! Before we could use the [RemoteAddr](./struct.RemoteAddr.html), we had to make sure, that it is pointing to the correct [RemoteActor](./trait.RemoteActor.html), which is `MyActor` in that case.
//! Therefore, we had to call `change_id` on the [RemoteAddr](./struct.RemoteAddr.html). A [RemoteAddr](./struct.RemoteAddr.html) points to a specific actor on a remote machine.
//! Per default, this is the [NetworkInterface](./struct.NetworkInterface.html) actor.
//! Therefore, we had to call `get_remote_addr` on the [Node](./struct.Node.html). A [RemoteAddr](./struct.RemoteAddr.html) points to a specific actor on a remote machine.
#[cfg(feature = "derive")]
pub use actix_telepathy_derive::*;
Expand Down
9 changes: 9 additions & 0 deletions src/remote/addr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ impl RemoteAddr {
}
}

impl Default for RemoteAddr {
fn default() -> Self {
RemoteAddr {
node: Node::default(),
id: AddrRepresentation::Key("Default".to_string()),
}
}
}

impl Clone for RemoteAddr {
fn clone(&self) -> Self {
RemoteAddr::new(self.node.clone(), self.id.clone())
Expand Down
9 changes: 9 additions & 0 deletions src/remote/addr/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ impl Node {
}
}

impl Default for Node {
fn default() -> Self {
Node {
socket_addr: "0.1.2.3:8080".parse().unwrap(),
network_interface: None,
}
}
}

impl Clone for Node {
fn clone(&self) -> Self {
Node::new(self.socket_addr, self.network_interface.clone())
Expand Down
123 changes: 121 additions & 2 deletions src/remote/addr/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{AddrRepresentation, AddrRequest, AddrResolver, AddrResponse};
use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use actix_telepathy_derive::{RemoteActor, RemoteMessage};
use core::panic;
use port_scanner::request_open_port;
use rayon::iter::IntoParallelRefIterator;
use rayon::iter::ParallelIterator;
Expand All @@ -14,7 +15,10 @@ use std::time::Duration;
use tokio::time::sleep;

#[derive(RemoteMessage, Serialize, Deserialize)]
struct TestMessage {}
#[with_source(source)]
struct TestMessage {
source: RemoteAddr,
}

#[derive(RemoteActor)]
#[remote_messages(TestMessage)]
Expand Down Expand Up @@ -61,7 +65,9 @@ async fn addr_resolver_registers_and_resolves_addr() {
ta.clone().recipient(),
identifier.clone(),
));
ta.do_send(TestMessage {});
ta.do_send(TestMessage {
source: RemoteAddr::default(),
});
sleep(Duration::from_secs(1)).await;
assert_eq!(
(*(identifiers.lock().unwrap())).get(0).unwrap(),
Expand Down Expand Up @@ -188,3 +194,116 @@ impl Handler<ClusterLog> for OwnListenerGossipIntroduction {
}
}
}

// ----------------------------------------------

struct OwnListenerGossipIntroduction2 {
pub returned: Arc<Mutex<Option<RemoteAddr>>>,
}
impl ClusterListener for OwnListenerGossipIntroduction2 {}
impl Supervised for OwnListenerGossipIntroduction2 {}

impl Actor for OwnListenerGossipIntroduction2 {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Context<Self>) {
self.subscribe_system_async::<ClusterLog>(ctx);
}
}

impl Handler<ClusterLog> for OwnListenerGossipIntroduction2 {
type Result = ();

fn handle(&mut self, msg: ClusterLog, _ctx: &mut Context<Self>) -> Self::Result {
match msg {
ClusterLog::NewMember(node) => {
let remote_addr = node.get_remote_addr(TestActor2::ACTOR_ID.to_string());
(*(self.returned.lock().unwrap())) = Some(remote_addr);
}
_ => (),
}
}
}

#[derive(RemoteActor)]
#[remote_messages(TestMessage)]
struct TestActor2 {
pub returned: Arc<Mutex<Option<RemoteAddr>>>,
}

impl Actor for TestActor2 {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Context<Self>) {
self.register(ctx.address().recipient());
}
}

impl Handler<TestMessage> for TestActor2 {
type Result = ();

fn handle(&mut self, msg: TestMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.returned.lock().unwrap().replace(msg.source);
}
}

struct TestParams2 {
ip: SocketAddr,
seeds: Vec<SocketAddr>,
}

#[test]
#[ignore] //github workflows don't get the timing right
fn remote_addr_filled_in_with_source() {
let ip1: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let ip2: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();

let arr = [
TestParams2 {
ip: ip1.clone(),
seeds: vec![],
},
TestParams2 {
ip: ip2.clone(),
seeds: vec![ip1.clone()],
},
];
arr.par_iter()
.for_each(|p| build_cluster_2(p.ip, p.seeds.clone()));
}

#[actix_rt::main]
async fn build_cluster_2(own_ip: SocketAddr, other_ip: Vec<SocketAddr>) {
let _cluster = Cluster::new(own_ip, other_ip);
let returned_received: Arc<Mutex<Option<RemoteAddr>>> = Arc::new(Mutex::new(None));
let _test_actor_addr = TestActor2 {
returned: returned_received.clone(),
}
.start();
let returned: Arc<Mutex<Option<RemoteAddr>>> = Arc::new(Mutex::new(None));
let _listener = OwnListenerGossipIntroduction2 {
returned: returned.clone(),
}
.start();
sleep(Duration::from_millis(200)).await;
let guard = returned.lock().unwrap();
let remote_addr = guard.as_ref().expect("Something should be returned");
let mut own_remote_addr = remote_addr.clone();
own_remote_addr.node.socket_addr = own_ip;
remote_addr.do_send(TestMessage {
source: own_remote_addr,
});
sleep(Duration::from_millis(200)).await;
assert_eq!(
(returned_received.lock().unwrap())
.as_ref()
.unwrap()
.node
.socket_addr,
remote_addr.node.socket_addr
);
}

0 comments on commit a3c134b

Please sign in to comment.