Skip to content

Commit

Permalink
use default_locator if SEDP message have no locator
Browse files Browse the repository at this point in the history
  • Loading branch information
tomiy-0x62 committed Feb 19, 2025
1 parent b7b51f6 commit 3da3e7f
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 77 deletions.
85 changes: 35 additions & 50 deletions src/discovery/structure/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ pub struct SDPBuiltinData {
pub vendor_id: Option<VendorId>,
pub expects_inline_qos: Option<bool>,
pub available_builtin_endpoint: Option<BitFlags<BuiltinEndpoint>>, // parameter_id: ParameterId::PID_BUILTIN_ENDPOINT_SET
pub metarraffic_unicast_locator_list: Option<Vec<Locator>>,
pub metarraffic_multicast_locator_list: Option<Vec<Locator>>,
pub default_multicast_locator_list: Option<Vec<Locator>>,
pub default_unicast_locator_list: Option<Vec<Locator>>,
pub metarraffic_unicast_locator_list: Vec<Locator>,
pub metarraffic_multicast_locator_list: Vec<Locator>,
pub default_multicast_locator_list: Vec<Locator>,
pub default_unicast_locator_list: Vec<Locator>,
pub manual_liveliness_count: Option<Count>,
pub lease_duration: Option<Duration>,

// {Reader/Writer}Proxy
pub remote_guid: Option<GUID>,
pub unicast_locator_list: Option<Vec<Locator>>,
pub multicast_locator_list: Option<Vec<Locator>>,
pub unicast_locator_list: Vec<Locator>,
pub multicast_locator_list: Vec<Locator>,
// expects_inline_qos // only ReaderProxy
pub data_max_size_serialized: Option<i32>, // only ReaderProxy

Expand Down Expand Up @@ -101,15 +101,15 @@ impl SDPBuiltinData {
vendor_id: Option<VendorId>,
expects_inline_qos: Option<bool>,
available_builtin_endpoint: Option<BitFlags<BuiltinEndpoint>>,
metarraffic_unicast_locator_list: Option<Vec<Locator>>,
metarraffic_multicast_locator_list: Option<Vec<Locator>>,
default_unicast_locator_list: Option<Vec<Locator>>,
default_multicast_locator_list: Option<Vec<Locator>>,
metarraffic_unicast_locator_list: Vec<Locator>,
metarraffic_multicast_locator_list: Vec<Locator>,
default_unicast_locator_list: Vec<Locator>,
default_multicast_locator_list: Vec<Locator>,
manual_liveliness_count: Option<Count>,
lease_duration: Option<Duration>,
remote_guid: Option<GUID>,
unicast_locator_list: Option<Vec<Locator>>,
multicast_locator_list: Option<Vec<Locator>>,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
data_max_size_serialized: Option<i32>,
type_name: Option<String>,
topic_name: Option<String>,
Expand Down Expand Up @@ -181,10 +181,10 @@ impl SDPBuiltinData {
let vendor_id = self.vendor_id?;
let expects_inline_qos = self.expects_inline_qos.unwrap_or(false);
let available_builtin_endpoint = self.available_builtin_endpoint?;
let metarraffic_unicast_locator_list = self.metarraffic_unicast_locator_list.take()?;
let metarraffic_multicast_locator_list = self.metarraffic_multicast_locator_list.take()?;
let default_unicast_locator_list = self.default_unicast_locator_list.take()?;
let default_multicast_locator_list = self.default_multicast_locator_list.take()?;
let metarraffic_unicast_locator_list = self.metarraffic_unicast_locator_list.clone();
let metarraffic_multicast_locator_list = self.metarraffic_multicast_locator_list.clone();
let default_unicast_locator_list = self.default_unicast_locator_list.clone();
let default_multicast_locator_list = self.default_multicast_locator_list.clone();
let manual_liveliness_count = self.manual_liveliness_count;
let lease_duration = self.lease_duration.unwrap_or(Duration {
seconds: 100,
Expand Down Expand Up @@ -223,11 +223,13 @@ impl SDPBuiltinData {
pub fn gen_readerpoxy(
&mut self,
history_cache: Arc<RwLock<HistoryCache>>,
default_unicast_locator_list: Vec<Locator>,
default_multicast_locator_list: Vec<Locator>,
) -> Option<ReaderProxy> {
let remote_guid = self.remote_guid?;
let expects_inline_qos = self.expects_inline_qos.unwrap_or(false);
let unicast_locator_list = self.unicast_locator_list.take()?;
let multicast_locator_list = self.multicast_locator_list.take()?;
let unicast_locator_list = self.unicast_locator_list.clone();
let multicast_locator_list = self.multicast_locator_list.clone();
let dr_qos_builder = DataReaderQosBuilder::new();
let qos = dr_qos_builder
.durability(self.durability.unwrap_or_default())
Expand All @@ -248,6 +250,8 @@ impl SDPBuiltinData {
expects_inline_qos,
unicast_locator_list,
multicast_locator_list,
default_unicast_locator_list,
default_multicast_locator_list,
qos,
history_cache,
))
Expand All @@ -256,6 +260,8 @@ impl SDPBuiltinData {
pub fn gen_writerproxy(
&mut self,
history_cache: Arc<RwLock<HistoryCache>>,
default_unicast_locator_list: Vec<Locator>,
default_multicast_locator_list: Vec<Locator>,
) -> Option<WriterProxy> {
let remote_guid = match self.remote_guid {
Some(rg) => rg,
Expand All @@ -267,26 +273,8 @@ impl SDPBuiltinData {
return None;
}
};
let unicast_locator_list = match self.unicast_locator_list.take() {
Some(ull) => ull,
None => {
eprintln!(
"<{}>: couldn't gen WriterProxy, not found unicast_locator_list",
"SDPBuiltinData: Err".red()
);
return None;
}
};
let multicast_locator_list = match self.multicast_locator_list.take() {
Some(mll) => mll,
None => {
eprintln!(
"<{}>: couldn't gen WriterProxy, not found multicast_locator_list",
"SDPBuiltinData: Error".red()
);
return None;
}
};
let unicast_locator_list = self.unicast_locator_list.clone();
let multicast_locator_list = self.multicast_locator_list.clone();
let data_max_size_serialized = self.data_max_size_serialized.unwrap_or(0); // TODO: Which default value should I set?
let dw_qos_builder = DataWriterQosBuilder::new();
let qos = dw_qos_builder
Expand All @@ -306,6 +294,8 @@ impl SDPBuiltinData {
remote_guid,
unicast_locator_list,
multicast_locator_list,
default_unicast_locator_list,
default_multicast_locator_list,
data_max_size_serialized,
qos,
history_cache,
Expand Down Expand Up @@ -470,15 +460,15 @@ impl<'de> Deserialize<'de> for SDPBuiltinData {
let mut vendor_id: Option<VendorId> = None;
let mut expects_inline_qos: Option<bool> = None;
let mut available_builtin_endpoint: Option<BitFlags<BuiltinEndpoint>> = None;
let mut metarraffic_unicast_locator_list: Option<Vec<Locator>> = Some(Vec::new());
let mut metarraffic_multicast_locator_list: Option<Vec<Locator>> = Some(Vec::new());
let mut default_unicast_locator_list: Option<Vec<Locator>> = Some(Vec::new());
let mut default_multicast_locator_list: Option<Vec<Locator>> = Some(Vec::new());
let mut metarraffic_unicast_locator_list: Vec<Locator> = Vec::new();
let mut metarraffic_multicast_locator_list: Vec<Locator> = Vec::new();
let mut default_unicast_locator_list: Vec<Locator> = Vec::new();
let mut default_multicast_locator_list: Vec<Locator> = Vec::new();
let mut manual_liveliness_count: Option<Count> = None;
let mut lease_duration: Option<Duration> = None;
let mut remote_guid: Option<GUID> = None;
let mut unicast_locator_list: Option<Vec<Locator>> = Some(Vec::new());
let mut multicast_locator_list: Option<Vec<Locator>> = Some(Vec::new());
let mut unicast_locator_list: Vec<Locator> = Vec::new();
let mut multicast_locator_list: Vec<Locator> = Vec::new();
let mut data_max_size_serialized: Option<i32> = None;
let mut type_name: Option<String> = None;
let mut topic_name: Option<String> = None;
Expand Down Expand Up @@ -520,12 +510,7 @@ impl<'de> Deserialize<'de> for SDPBuiltinData {
let address: [u8; 16] = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
match &mut $ll {
Some(v) => {
v.push(Locator::new(kind, port, address));
}
None => unreachable!(),
}
$ll.push(Locator::new(kind, port, address));
}};
}
loop {
Expand Down
48 changes: 40 additions & 8 deletions src/message/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,29 @@ impl MessageReceiver {
"<{}>: successed for deserialize sedp(w)",
"MessageReceiver: Info".green()
);
let writer_proxy =
match deserialized.gen_writerproxy(Arc::new(RwLock::new(HistoryCache::new()))) {
let writer_proxy = if let Some(participant_data) =
disc_db.read_participant_data(self.source_guid_prefix)
{
let default_unicast_locator_list = participant_data.default_unicast_locator_list;
let default_multicast_locator_list =
participant_data.default_multicast_locator_list;
match deserialized.gen_writerproxy(
Arc::new(RwLock::new(HistoryCache::new())),
default_unicast_locator_list,
default_multicast_locator_list,
) {
Some(wp) => wp,
None => {
return Err(MessageError(
"failed generate writer_proxy form received DATA(w)".to_string(),
));
}
};
}
} else {
return Err(MessageError(
"received sedp(w) from unknown participant".to_string(),
));
};
let (topic_name, data_type) = match deserialized.topic_info() {
Some((tn, dt)) => (tn, dt),
None => {
Expand All @@ -433,10 +447,12 @@ impl MessageReceiver {
"<{}>: matched writer add to reader",
"MessageReceiver: Info".green()
);
reader.matched_writer_add(
reader.matched_writer_add_with_default_locator(
writer_proxy.remote_writer_guid,
writer_proxy.unicast_locator_list.clone(),
writer_proxy.multicast_locator_list.clone(),
writer_proxy.default_unicast_locator_list.clone(),
writer_proxy.default_multicast_locator_list.clone(),
writer_proxy.data_max_size_serialized,
writer_proxy.qos.clone(),
);
Expand Down Expand Up @@ -479,15 +495,29 @@ impl MessageReceiver {
"<{}>: successed for deserialize sedp(r)",
"MessageReceiver: Info".green()
);
let reader_proxy =
match deserialized.gen_readerpoxy(Arc::new(RwLock::new(HistoryCache::new()))) {
let reader_proxy = if let Some(participant_data) =
disc_db.read_participant_data(self.source_guid_prefix)
{
let default_unicast_locator_list = participant_data.default_unicast_locator_list;
let default_multicast_locator_list =
participant_data.default_multicast_locator_list;
match deserialized.gen_readerpoxy(
Arc::new(RwLock::new(HistoryCache::new())),
default_unicast_locator_list,
default_multicast_locator_list,
) {
Some(rp) => rp,
None => {
return Err(MessageError(
"failed generate reader_proxy form received DATA(r)".to_string(),
));
}
};
}
} else {
return Err(MessageError(
"received sedp(r) from unknown participant".to_string(),
));
};
let (topic_name, data_type) = match deserialized.topic_info() {
Some((tn, dt)) => (tn, dt),
None => {
Expand All @@ -502,11 +532,13 @@ impl MessageReceiver {
"<{}>: matched reader add to writer",
"MessageReceiver: Info".green()
);
writer.matched_reader_add(
writer.matched_reader_add_with_default_locator(
reader_proxy.remote_reader_guid,
reader_proxy.expects_inline_qos,
reader_proxy.unicast_locator_list.clone(),
reader_proxy.multicast_locator_list.clone(),
reader_proxy.default_unicast_locator_list.clone(),
reader_proxy.default_multicast_locator_list.clone(),
reader_proxy.qos.clone(),
)
}
Expand Down
35 changes: 31 additions & 4 deletions src/rtps/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ impl Reader {
self.expectsinline_qos,
self.unicast_locator_list.clone(),
self.multicast_locator_list.clone(),
Vec::new(),
Vec::new(),
self.qos.clone(),
Arc::new(RwLock::new(HistoryCache::new())),
);
Expand All @@ -111,11 +113,11 @@ impl Reader {
for (eid, reader) in self.matched_writers.iter() {
eprintln!("\t\treader guid: {:?}", eid);
eprintln!("\tunicast locators");
for loc in &reader.unicast_locator_list {
for loc in reader.get_unicast_locator_list() {
eprintln!("\t\t{:?}", loc)
}
eprintln!("\tmulticast locators");
for loc in &reader.multicast_locator_list {
for loc in reader.get_multicast_locator_list() {
eprintln!("\t\t{:?}", loc)
}
}
Expand Down Expand Up @@ -202,6 +204,28 @@ impl Reader {
multicast_locator_list: Vec<Locator>,
data_max_size_serialized: i32,
qos: DataWriterQosPolicies,
) {
self.matched_writer_add_with_default_locator(
remote_writer_guid,
unicast_locator_list,
multicast_locator_list,
Vec::new(),
Vec::new(),
data_max_size_serialized,
qos,
);
}

#[allow(clippy::too_many_arguments)]
pub fn matched_writer_add_with_default_locator(
&mut self,
remote_writer_guid: GUID,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
default_unicast_locator_list: Vec<Locator>,
default_multicast_locator_list: Vec<Locator>,
data_max_size_serialized: i32,
qos: DataWriterQosPolicies,
) {
eprintln!(
"<{}>: add matched Writer which has {:?}",
Expand All @@ -227,6 +251,8 @@ impl Reader {
remote_writer_guid,
unicast_locator_list,
multicast_locator_list,
default_unicast_locator_list,
default_multicast_locator_list,
data_max_size_serialized,
qos,
self.reader_cache.clone(),
Expand All @@ -246,6 +272,7 @@ impl Reader {
))
.expect("couldn't send reader_state_notifier");
}

pub fn is_writer_match(&self, topic_name: &str, data_type: &str) -> bool {
self.topic.name() == topic_name && self.topic.type_desc() == data_type
}
Expand Down Expand Up @@ -400,7 +427,7 @@ impl Reader {
let message_buf = message
.write_to_vec_with_ctx(self.endianness)
.expect("couldn't serialize message");
for uni_loc in &writer_proxy.unicast_locator_list {
for uni_loc in writer_proxy.get_unicast_locator_list() {
if uni_loc.kind == Locator::KIND_UDPV4 {
let port = uni_loc.port;
let addr = uni_loc.address;
Expand All @@ -421,7 +448,7 @@ impl Reader {
}
}
// if there is Participant on same host, umber_dds need to send acknack to multicast
for mul_loc in &writer_proxy.multicast_locator_list {
for mul_loc in writer_proxy.get_multicast_locator_list() {
if mul_loc.kind == Locator::KIND_UDPV4 {
let port = mul_loc.port;
let addr = mul_loc.address;
Expand Down
Loading

0 comments on commit 3da3e7f

Please sign in to comment.