Skip to content

Commit

Permalink
sedeve-kit, action_driver
Browse files Browse the repository at this point in the history
  • Loading branch information
ybbh committed Mar 3, 2024
1 parent dc986d0 commit d75ed19
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/player/automata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ struct __ActionDriver {
impl __ActionDriver {
fn new(
client_id:NID,
server_id: NID, server_addr: SocketAddr,
server_id: NID,
server_addr: SocketAddr,
) -> Res<Self> {
let r_build = Builder::new_current_thread()
.enable_all()
Expand Down
121 changes: 95 additions & 26 deletions src/player/dtm_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
Expand All @@ -15,29 +16,37 @@ use scupt_util::node_id::NID;
use scupt_util::res::Res;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Sender;
use tokio::sync::mpsc::{unbounded_channel};
use tokio::sync::oneshot::Sender as AsyncOneshotSender;
use tokio::task::LocalSet;
use tokio::time::sleep;
use tokio::sync::mpsc::UnboundedSender as AsyncSender;
use tokio::sync::mpsc::UnboundedReceiver as AsyncReceiver;
use std::sync::mpsc::Sender as SyncSender;

use crate::player::async_action_driver::AsyncActionDriver;
use crate::player::async_action_driver_impl::AsyncActionDriverImpl;
use crate::player::dtm_client_handler::DTMClientHandler;
use crate::player::msg_ctrl::MessageControl;
use crate::player::sync_action_driver::SyncActionDriver;
use crate::player::sync_action_driver_impl::SyncActionDriverImpl;

type DTMClientNode = Node<
MessageControl,
DTMClientHandler
>;


struct _ClientContext {
node_id:NID,
dtm_server_node_id: NID,
dtm_server_addr: SocketAddr,
node: DTMClientNode,
// sender/receiver would redirect send message to message loop task
sender: UnboundedSender<(Message<MessageControl>, Sender<Message<MessageControl>>)>,
receiver: StdMutex<Option<UnboundedReceiver<(Message<MessageControl>, Sender<Message<MessageControl>>)>>>,
async_sender: AsyncSender<(Message<MessageControl>, AsyncOneshotSender<Message<MessageControl>>)>,
async_receiver: StdMutex<Option<AsyncReceiver<(Message<MessageControl>, AsyncOneshotSender<Message<MessageControl>>)>>>,
sync_sender: AsyncSender<(Message<MessageControl>, SyncSender<Message<MessageControl>>)>,
sync_receiver: StdMutex<Option<AsyncReceiver<(Message<MessageControl>, SyncSender<Message<MessageControl>>)>>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -65,29 +74,45 @@ impl DTMClient {
false,
stop_notify,
)?;
let (sender, receiver) = unbounded_channel();
let (async_sender, async_receiver) = unbounded_channel();
let (sync_sender, sync_receiver) = unbounded_channel();

Ok(Self {
context: Arc::new(_ClientContext {
node_id: client_id,
dtm_server_node_id: server_id,
dtm_server_addr: server_addr,
node,
sender,
receiver:StdMutex::new(Some(receiver))
async_sender,
async_receiver:StdMutex::new(Some(async_receiver)),
sync_sender,
sync_receiver: StdMutex::new(Some(sync_receiver)),
}),
})
}


fn sender(&self) -> Res<UnboundedSender<(Message<MessageControl>, Sender<Message<MessageControl>>)>> {
Ok(self.context.sender.clone())
fn async_sender(&self) -> Res<AsyncSender<(Message<MessageControl>, AsyncOneshotSender<Message<MessageControl>>)>> {
Ok(self.context.async_sender.clone())
}

fn sync_sender(&self) -> Res<AsyncSender<(Message<MessageControl>, SyncSender<Message<MessageControl>>)>> {
Ok(self.context.sync_sender.clone())
}

pub fn new_async_driver(&self) -> Res<Arc<dyn AsyncActionDriver>> {
let driver = Arc::new(AsyncActionDriverImpl::new(
self.context.node_id,
self.context.dtm_server_node_id,
self.sender()?));
self.async_sender()?));
Ok(driver)
}

pub fn new_sync_driver(&self) -> Res<Arc<dyn SyncActionDriver>> {
let driver = Arc::new(SyncActionDriverImpl::new(
self.context.node_id,
self.context.dtm_server_node_id,
self.sync_sender()?));
Ok(driver)
}

Expand Down Expand Up @@ -137,42 +162,62 @@ impl _ClientContext {
}

async fn dtm_client_message_loop(&self) -> Res<()> {
let mut r = {
let mut receiver1 = {
let mut opt_r = None;
let mut g = self.receiver.lock().unwrap();
let mut g = self.async_receiver.lock().unwrap();
std::mem::swap(&mut (*g), &mut opt_r);
opt_r.unwrap()
};

let endpoint = self.connect_to_dtm_player().await?;
let resp_senders = HashMap::new();
let mut receiver2 = {
let mut opt_r = None;
let mut g = self.sync_receiver.lock().unwrap();
std::mem::swap(&mut (*g), &mut opt_r);
opt_r.unwrap()
};
let endpoint1 = self.connect_to_dtm_player().await?;
let endpoint2 = self.connect_to_dtm_player().await?;
let resp_senders1 = HashMap::new();
let resp_senders2 = HashMap::new();

loop {
self.handle_message(&endpoint, &mut r, &resp_senders).await?;
self.handle_message(
&endpoint1, &mut receiver1, &resp_senders1,
&endpoint2, &mut receiver2, &resp_senders2).await?;
}
}


async fn handle_message(
&self,
endpoint:&Endpoint,
incoming:&mut UnboundedReceiver<(Message<MessageControl>, Sender<Message<MessageControl>>)>,
resp_senders : &HashMap<String, Sender<Message<MessageControl>>>,
endpoint1:&Endpoint,
incoming1:&mut AsyncReceiver<(Message<MessageControl>, AsyncOneshotSender<Message<MessageControl>>)>,
resp_senders1: &HashMap<String, AsyncOneshotSender<Message<MessageControl>>>,
endpoint2:&Endpoint,
incoming2:&mut AsyncReceiver<(Message<MessageControl>, SyncSender<Message<MessageControl>>)>,
resp_senders2 : &HashMap<String, SyncSender<Message<MessageControl>>>,
) -> Res<()> {
select! {
r1 = self.handle_recv_response(endpoint, resp_senders) => {
r1 = self.handle_recv_response_async(endpoint1, resp_senders1) => {
r1
},
r2 = self.handle_incoming_request(endpoint, incoming, resp_senders) => {
r2 = self.handle_incoming_request(endpoint1, incoming1, resp_senders1) => {
r2
}
r3 = self.handle_recv_response_sync(endpoint2, resp_senders2) => {
r3
},
r4 = self.handle_incoming_request(endpoint2, incoming2, resp_senders2) => {
r4
}
}
}

async fn handle_recv_response(
async fn handle_recv_response<S>(
&self,
endpoint:&Endpoint,
resp_senders : &HashMap<String, Sender<Message<MessageControl>>>,
) -> Res<()> {
resp_senders : &HashMap<String, S>,
) -> Res<(NID, NID, MessageControl, S)> {
let r_m = endpoint.recv::<MessageControl>().await;
let (from, to, m) = match r_m {
Ok(m) => {
Expand All @@ -189,16 +234,40 @@ impl _ClientContext {
}
None => { panic!("error, no such message, id:{}", id); }
};

Ok((from, to, m, sender))
}

async fn handle_recv_response_async(
&self,
endpoint:&Endpoint,
resp_senders : &HashMap<String, AsyncOneshotSender<Message<MessageControl>>>,
) -> Res<()> {
let (from, to, m, sender) =
self.handle_recv_response(endpoint, resp_senders).await?;
let mm = Message::new(m, to, from);
let _ = sender.send(mm);
Ok(())
}


async fn handle_recv_response_sync(
&self,
endpoint:&Endpoint,
resp_senders : &HashMap<String, SyncSender<Message<MessageControl>>>,
) -> Res<()> {
let (from, to, m, sender) =
self.handle_recv_response(endpoint, resp_senders).await?;
let mm = Message::new(m, to, from);
let _ = sender.send(mm);
Ok(())
}

async fn handle_incoming_request(
async fn handle_incoming_request<S:Debug>(
&self,
endpoint:&Endpoint,
incoming:&mut UnboundedReceiver<(Message<MessageControl>, Sender<Message<MessageControl>>)>,
resp_senders : &HashMap<String, Sender<Message<MessageControl>>>,
incoming:&mut AsyncReceiver<(Message<MessageControl>, S)>,
resp_senders : &HashMap<String, S>,
) -> Res<()> {
let opt_in = incoming.recv().await;
match opt_in {
Expand Down
1 change: 1 addition & 0 deletions src/player/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ mod test_dtm_player;

mod action_waiter;
mod sync_action_driver;
mod sync_action_driver_impl;


6 changes: 3 additions & 3 deletions src/player/sync_action_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use scupt_util::res::Res;

use crate::action::action_type::ActionType;

// TODO, SyncActionDriver implement

pub trait SyncActionDriver: Sync + Send {
async fn begin_action(&self, action_type:ActionType,source:NID, dest:NID, action: String) -> Res<()>;
fn begin_action(&self, action_type:ActionType,source:NID, dest:NID, action: String) -> Res<()>;

async fn end_action(&self, action_type:ActionType, source:NID, dest:NID, action: String) -> Res<()>;
fn end_action(&self, action_type:ActionType, source:NID, dest:NID, action: String) -> Res<()>;
}
98 changes: 98 additions & 0 deletions src/player/sync_action_driver_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::sync::Arc;

use scupt_util::error_type::ET;
use scupt_util::message::Message;
use scupt_util::node_id::NID;
use scupt_util::res::Res;
use std::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::UnboundedSender;

use tracing::trace;
use uuid::Uuid;
use scupt_util::serde_json_string::SerdeJsonString;

use crate::action::action_serde_json_value::ActionSerdeJsonValue;
use crate::action::action_type::ActionType;
use crate::player::msg_ctrl::MessageControl;
use crate::player::sync_action_driver::SyncActionDriver;

pub struct SyncActionDriverImplInner {
node_id:NID,
server_id:NID,
sender: UnboundedSender<(Message<MessageControl>, Sender<Message<MessageControl>>)>,
}

pub struct SyncActionDriverImpl {
inner: Arc<SyncActionDriverImplInner>,
}

impl SyncActionDriverImpl {
pub fn new(node_id:NID, server_node_id:NID, sender:UnboundedSender<(Message<MessageControl>, Sender<Message<MessageControl>>)>) -> Self {
Self {
inner: Arc::new(SyncActionDriverImplInner::new(node_id, server_node_id, sender)),
}
}
}


impl SyncActionDriver for SyncActionDriverImpl {
fn begin_action(&self, action_type:ActionType, source:NID, dest:NID, payload_json:String) -> Res<()> {
let action = ActionSerdeJsonValue::from_json(action_type, source, dest, payload_json)?.to_serde_json_string();
self.inner.async_begin_action(action)
}

fn end_action(&self, action_type:ActionType, source:NID, dest:NID, payload_json:String) -> Res<()> {
let action = ActionSerdeJsonValue::from_json(action_type, source, dest, payload_json)?.to_serde_json_string();
self.inner.async_end_action(action)
}
}

impl SyncActionDriverImplInner {
fn new(node_id:NID, server_id:NID,
sender:UnboundedSender<(Message<MessageControl>, Sender<Message<MessageControl>>)>
) -> Self {
Self {
node_id,
server_id,
sender,
}
}

fn async_begin_action(&self, action: SerdeJsonString) -> Res<()> {
self.async_send_action(action, true)
}

fn async_end_action(&self, action: SerdeJsonString) -> Res<()> {
self.async_send_action(action, false)
}

fn async_send_action(&self, action: SerdeJsonString, begin_action: bool) -> Res<()> {
let uuid = Uuid::new_v4();
let m = MessageControl::ActionReq {
id: uuid.to_string(),
action,
begin: begin_action,
};
let req = Message::new(m, self.node_id, self.server_id);

// send request
let (req_sender, resp_receiver) = channel();
match self.sender.send((req, req_sender)) {
Ok(()) => {}
Err(e) => { return Err(ET::SenderError(e.to_string())); }
};

let response:MessageControl = match resp_receiver.recv() {
Ok(m) => { m.payload() }
Err(e) => { return Err(ET::RecvError(e.to_string())); }
};

trace!("receive response {:?}", response);
match response {
MessageControl::ActionACK { .. } => { }
_ => { panic!("not possible") }
}
Ok(())
}
}

0 comments on commit d75ed19

Please sign in to comment.