Skip to content

Commit

Permalink
fix client receive max handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 12, 2020
1 parent 9c70793 commit d66542c
Showing 1 changed file with 30 additions and 32 deletions.
62 changes: 30 additions & 32 deletions src/v5/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use bytes::Bytes;
use bytestring::ByteString;
use futures::future::Future;
use ntex::channel::{mpsc, oneshot};
use ntex::util::counter::Counter;

use super::codec as mqtt;
use crate::types::QoS;
Expand All @@ -15,9 +14,9 @@ pub struct MqttSink(Rc<RefCell<MqttSinkInner>>);
pub(crate) struct MqttSinkInner {
idx: u16,
cap: usize,
queue: VecDeque<(u16, oneshot::Sender<()>)>,
sink: Option<mpsc::Sender<mqtt::Packet>>,
inflight: CheckInFlight,
queue: VecDeque<(u16, oneshot::Sender<()>)>,
waiters: VecDeque<oneshot::Sender<()>>,
}

impl Clone for MqttSink {
Expand All @@ -33,14 +32,14 @@ impl MqttSink {
cap: max_receive,
sink: Some(sink),
queue: VecDeque::new(),
inflight: CheckInFlight(Counter::new(max_receive)),
waiters: VecDeque::new(),
})))
}

/// Get client receive credit
pub fn credit(&self) -> usize {
let inner = self.0.borrow();
inner.cap - inner.inflight.0.total()
inner.cap - inner.queue.len()
}

/// Close mqtt connection with default Disconnect message
Expand Down Expand Up @@ -79,7 +78,9 @@ impl MqttSink {
}

pub(crate) fn complete_publish_qos1(&self, packet_id: NonZeroU16) -> bool {
if let Some((idx, tx)) = self.0.borrow_mut().queue.pop_front() {
let mut inner = self.0.borrow_mut();

if let Some((idx, tx)) = inner.queue.pop_front() {
if idx != packet_id.get() {
log::trace!(
"MQTT protocol error, packet_id order does not match, expected {}, got: {}",
Expand All @@ -89,6 +90,12 @@ impl MqttSink {
} else {
log::trace!("Ack publish packet with id: {}", packet_id);
let _ = tx.send(());

while let Some(tx) = inner.waiters.pop_front() {
if tx.send(()).is_ok() {
break;
}
}
return true;
}
} else {
Expand Down Expand Up @@ -152,16 +159,25 @@ impl<'a> PublishBuilder<'a> {
/// Send publish packet with QoS 1
pub fn send_at_least_once(&mut self) -> impl Future<Output = Result<(), ()>> {
if let Some(mut packet) = self.packet.take() {
let sink = self.sink.clone();
let mut inflight = self.sink.0.borrow().inflight.clone();
let sink = self.sink.0.clone();

async move {
// receive max
(&mut inflight).await;
let guard = inflight.0.get();
let mut inner = sink.borrow_mut();
if inner.sink.is_some() {
// handle client receive maximum
if inner.cap - inner.queue.len() == 0 {
let (tx, rx) = oneshot::channel();
inner.waiters.push_back(tx);

drop(inner);
if rx.await.is_err() {
return Err(());
}

inner = sink.borrow_mut();
}

let mut inner = sink.0.borrow_mut();
let result = if inner.sink.is_some() {
// send publish to client
let (tx, rx) = oneshot::channel();

inner.idx += 1;
Expand All @@ -187,28 +203,10 @@ impl<'a> PublishBuilder<'a> {
}
} else {
Err(())
};

drop(guard);
result
}
}
} else {
panic!("PublishBuilder can be used only once.");
}
}
}

#[derive(Clone)]
struct CheckInFlight(Counter);

impl Future for CheckInFlight {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0.available(cx) {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

0 comments on commit d66542c

Please sign in to comment.