Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap actor message handling to avoid panicking #477

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 62 additions & 57 deletions src/cch/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,73 +138,27 @@ pub struct CchState {
orders_db: CchOrdersDb,
}

#[ractor::async_trait]
impl Actor for CchActor {
type Msg = CchMessage;
type State = CchState;
type Arguments = ();

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_config: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let lnd_rpc_url: Uri = self.config.lnd_rpc_url.clone().try_into()?;
let cert = match self.config.resolve_lnd_cert_path() {
Some(path) => Some(
tokio::fs::read(&path)
.await
.with_context(|| format!("read cert file {}", path.display()))?,
),
None => None,
};
let macaroon = match self.config.resolve_lnd_macaroon_path() {
Some(path) => Some(
tokio::fs::read(&path)
.await
.with_context(|| format!("read macaroon file {}", path.display()))?,
),
None => None,
};
let lnd_connection = LndConnectionInfo {
uri: lnd_rpc_url,
cert,
macaroon,
};

let payments_tracker =
LndPaymentsTracker::new(myself.clone(), lnd_connection.clone(), self.token.clone());
self.tracker
.spawn(async move { payments_tracker.run().await });

Ok(CchState {
lnd_connection,
orders_db: Default::default(),
})
}

async fn handle(
impl CchActor {
async fn handle_actor_message(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
myself: ActorRef<<Self as Actor>::Msg>,
message: <Self as Actor>::Msg,
state: &mut <Self as Actor>::State,
) {
match message {
CchMessage::SendBTC(send_btc, port) => {
let result = self.send_btc(state, send_btc).await;
if !port.is_closed() {
// ignore error
let _ = port.send(result);
}
Ok(())
}
CchMessage::ReceiveBTC(receive_btc, port) => {
let result = self.receive_btc(myself, state, receive_btc).await;
if !port.is_closed() {
// ignore error
let _ = port.send(result);
}
Ok(())
}
CchMessage::GetReceiveBTCOrder(payment_hash, port) => {
let result = state
Expand All @@ -216,21 +170,18 @@ impl Actor for CchActor {
// ignore error
let _ = port.send(result);
}
Ok(())
}
CchMessage::SettleSendBTCOrder(event) => {
tracing::debug!("settle_send_btc_order {:?}", event);
if let Err(err) = self.settle_send_btc_order(state, event).await {
tracing::error!("settle_send_btc_order failed: {}", err);
}
Ok(())
}
CchMessage::SettleReceiveBTCOrder(event) => {
tracing::debug!("settle_receive_btc_order {:?}", event);
if let Err(err) = self.settle_receive_btc_order(state, event).await {
tracing::error!("settle_receive_btc_order failed: {}", err);
}
Ok(())
}
CchMessage::PendingReceivedTlcNotification(tlc_notification) => {
if let Err(err) = self
Expand All @@ -239,7 +190,6 @@ impl Actor for CchActor {
{
tracing::error!("handle_pending_received_tlc_notification failed: {}", err);
}
Ok(())
}
CchMessage::SettledTlcNotification(tlc_notification) => {
if let Err(err) = self
Expand All @@ -248,12 +198,67 @@ impl Actor for CchActor {
{
tracing::error!("handle_settled_tlc_notification failed: {}", err);
}
Ok(())
}
}
}
}

#[ractor::async_trait]
impl Actor for CchActor {
type Msg = CchMessage;
type State = CchState;
type Arguments = ();

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_config: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let lnd_rpc_url: Uri = self.config.lnd_rpc_url.clone().try_into()?;
let cert = match self.config.resolve_lnd_cert_path() {
Some(path) => Some(
tokio::fs::read(&path)
.await
.with_context(|| format!("read cert file {}", path.display()))?,
),
None => None,
};
let macaroon = match self.config.resolve_lnd_macaroon_path() {
Some(path) => Some(
tokio::fs::read(&path)
.await
.with_context(|| format!("read macaroon file {}", path.display()))?,
),
None => None,
};
let lnd_connection = LndConnectionInfo {
uri: lnd_rpc_url,
cert,
macaroon,
};

let payments_tracker =
LndPaymentsTracker::new(myself.clone(), lnd_connection.clone(), self.token.clone());
self.tracker
.spawn(async move { payments_tracker.run().await });

Ok(CchState {
lnd_connection,
orders_db: Default::default(),
})
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.handle_actor_message(myself, message, state).await;
Ok(())
}
}

impl CchActor {
pub fn new(
config: CchConfig,
Expand Down
71 changes: 41 additions & 30 deletions src/ckb/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,13 @@ pub enum CkbChainMessage {
),
}

#[ractor::async_trait]
impl Actor for CkbChainActor {
type Msg = CkbChainMessage;
type State = CkbChainState;
type Arguments = CkbConfig;

async fn pre_start(
impl CkbChainActor {
async fn handle_actor_message(
&self,
_myself: ActorRef<Self::Msg>,
config: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let secret_key = config.read_secret_key()?;
let secp = secp256k1::Secp256k1::new();
let pub_key = secret_key.public_key(&secp);
let pub_key_hash = ckb_hash::blake2b_256(pub_key.serialize());
let funding_source_lock_script =
get_script_by_contract(Contract::Secp256k1Lock, &pub_key_hash[0..20]);
Ok(CkbChainState {
config,
secret_key,
funding_source_lock_script,
})
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
myself: ActorRef<<Self as Actor>::Msg>,
message: <Self as Actor>::Msg,
state: &mut <Self as Actor>::State,
) {
use CkbChainMessage::{Fund, SendTx, Sign, TraceTx};
match message {
Fund(tx, request, reply_port) => {
Expand Down Expand Up @@ -251,7 +228,7 @@ impl Actor for CkbChainActor {
// ignore error
let _ = reply_port.send(status);
}
return Ok(());
return;
}
None => sleep(Duration::from_secs(5)).await,
}
Expand All @@ -272,6 +249,40 @@ impl Actor for CkbChainActor {
});
}
}
}
}

#[ractor::async_trait]
impl Actor for CkbChainActor {
type Msg = CkbChainMessage;
type State = CkbChainState;
type Arguments = CkbConfig;

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
config: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let secret_key = config.read_secret_key()?;
let secp = secp256k1::Secp256k1::new();
let pub_key = secret_key.public_key(&secp);
let pub_key_hash = ckb_hash::blake2b_256(pub_key.serialize());
let funding_source_lock_script =
get_script_by_contract(Contract::Secp256k1Lock, &pub_key_hash[0..20]);
Ok(CkbChainState {
config,
secret_key,
funding_source_lock_script,
})
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.handle_actor_message(myself, message, state).await;
Ok(())
}
}
Expand Down
Loading
Loading