Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append MetricA logging to transmission log #15

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 61 additions & 33 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{
time::Duration,
};

use crate::external::MetricA;
use crate::util::TcpInfo;
use crate::TransmissionType;
use crate::external::ClientMetrics;
Expand Down Expand Up @@ -89,6 +90,7 @@ pub struct TcpStatsLog {
pub set_initital_cwnd: Option<u32>,
pub set_upper_cwnd: Option<u32>,
pub set_direct_cwnd: Option<u32>,
pub set_used_metric: Option<MetricA>,
pub tcp_info: TcpInfo,
}

Expand Down Expand Up @@ -200,56 +202,68 @@ fn patch_initial_cwnd(
socket_file_descriptor: i32,
client_metrics: &Option<Arc<Mutex<ClientMetrics>>>,
client_addr: &str,
) -> Result<Option<u32>> {
) -> Result<(Option<u32>, Option<MetricA>)> {
let mut metric_option: Option<MetricA> = None;
let initial_cwnd: u32 = match initial_cwnd_dyn {
DynamicValue::Dynamic => {
let latest_metric: MetricTypes =
unpack_latest_rate(&client_metrics.clone().unwrap(), client_addr)?;
match latest_metric {
MetricTypes::A(a_metric) => metric_option = Some(a_metric)
}
let rate: u64 = latest_metric.get_rate();
rate_to_cwnd(socket_file_descriptor, rate)?
}
DynamicValue::Fixed(fixed_cwnd) => fixed_cwnd,
};
sockopt_patch_cwnd(socket_file_descriptor, initial_cwnd, TCP_SET_INIT_CWND)?;
Ok(Some(initial_cwnd))
Ok((Some(initial_cwnd), metric_option))
}

fn patch_upper_cwnd(
upper_cwnd_dyn: DynamicValue<u32>,
socket_file_descriptor: i32,
client_metrics: &Option<Arc<Mutex<ClientMetrics>>>,
client_addr: &str,
) -> Result<Option<u32>> {
) -> Result<(Option<u32>, Option<MetricA>)> {
let mut metric_option: Option<MetricA> = None;
let upper_cwnd: u32 = match upper_cwnd_dyn {
DynamicValue::Dynamic => {
let latest_metric: MetricTypes =
unpack_latest_rate(&client_metrics.clone().unwrap(), client_addr)?;
match latest_metric {
MetricTypes::A(a_metric) => metric_option = Some(a_metric)
}
let rate: u64 = latest_metric.get_rate();
rate_to_cwnd(socket_file_descriptor, rate)?
}
DynamicValue::Fixed(fixed_cwnd) => fixed_cwnd,
};
sockopt_patch_cwnd(socket_file_descriptor, upper_cwnd, TCP_SET_UPPER_CWND)?;
Ok(Some(upper_cwnd))
Ok((Some(upper_cwnd), metric_option))
}

fn patch_direct_cwnd(
direct_cwnd_dyn: DynamicValue<u32>,
socket_file_descriptor: i32,
client_metrics: &Option<Arc<Mutex<ClientMetrics>>>,
client_addr: &str,
) -> Result<Option<u32>> {
) -> Result<(Option<u32>, Option<MetricA>)> {
let mut metric_option: Option<MetricA> = None;
let direct_cwnd: u32 = match direct_cwnd_dyn {
DynamicValue::Dynamic => {
let latest_metric: MetricTypes =
unpack_latest_rate(&client_metrics.clone().unwrap(), client_addr)?;
match latest_metric {
MetricTypes::A(a_metric) => metric_option = Some(a_metric)
}
let rate: u64 = latest_metric.get_rate();
rate_to_cwnd(socket_file_descriptor, rate)?
}
DynamicValue::Fixed(fixed_cwnd) => fixed_cwnd,
};
sockopt_patch_cwnd(socket_file_descriptor, direct_cwnd, TCP_SET_DIRECT_CWND)?;
Ok(Some(direct_cwnd))
Ok((Some(direct_cwnd), metric_option))
}

fn patch_upper_cwnd_if_new_metric(
Expand All @@ -258,23 +272,27 @@ fn patch_upper_cwnd_if_new_metric(
client_metrics: &Option<Arc<Mutex<ClientMetrics>>>,
client_addr: &str,
last_metric_timestamp_us: &mut u64,
) -> Result<Option<u32>> {
let upper_cwnd: u32 = match upper_cwnd_dyn {
) -> Result<(Option<u32>, Option<MetricA>)> {
let mut metric_option: Option<MetricA> = None;
let upper_cwnd: u32 = match upper_cwnd_dyn {
DynamicValue::Dynamic => {
let latest_metric: MetricTypes =
unpack_latest_rate(&client_metrics.clone().unwrap(), client_addr)?;
match latest_metric {
MetricTypes::A(a_metric) => metric_option = Some(a_metric)
}
if latest_metric.get_timestamp_us() > *last_metric_timestamp_us {
*last_metric_timestamp_us = latest_metric.get_timestamp_us();
let rate: u64 = latest_metric.get_rate();
rate_to_cwnd(socket_file_descriptor, rate)?
} else {
return Ok(None)
return Ok((None, metric_option))
}
}
DynamicValue::Fixed(fixed_cwnd) => fixed_cwnd,
};
sockopt_patch_cwnd(socket_file_descriptor, upper_cwnd, TCP_SET_UPPER_CWND)?;
Ok(Some(upper_cwnd))
Ok((Some(upper_cwnd), metric_option))

}

Expand All @@ -284,23 +302,27 @@ fn patch_direct_cwnd_if_new_metric(
client_metrics: &Option<Arc<Mutex<ClientMetrics>>>,
client_addr: &str,
last_metric_timestamp_us: &mut u64,
) -> Result<Option<u32>> {
) -> Result<(Option<u32>, Option<MetricA>)> {
let mut metric_option: Option<MetricA> = None;
let direct_cwnd: u32 = match direct_cwnd_dyn {
DynamicValue::Dynamic => {
let latest_metric: MetricTypes =
unpack_latest_rate(&client_metrics.clone().unwrap(), client_addr)?;
match latest_metric {
MetricTypes::A(a_metric) => metric_option = Some(a_metric)
}
if latest_metric.get_timestamp_us() > *last_metric_timestamp_us {
*last_metric_timestamp_us = latest_metric.get_timestamp_us();
let rate: u64 = latest_metric.get_rate();
rate_to_cwnd(socket_file_descriptor, rate)?
} else {
return Ok(None)
return Ok((None, metric_option))
}
}
DynamicValue::Fixed(fixed_cwnd) => fixed_cwnd,
};
sockopt_patch_cwnd(socket_file_descriptor, direct_cwnd, TCP_SET_DIRECT_CWND)?;
Ok(Some(direct_cwnd))
Ok((Some(direct_cwnd), metric_option))
}

pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
Expand Down Expand Up @@ -351,26 +373,26 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
wait_until_client_metric(&client_metrics.clone().unwrap(), &client_addr);
}

let initial_cwnd_option = if let Some(initial_cwnd_dyn) = set_initial_cwnd.clone() {
patch_initial_cwnd(initial_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
} else {
None
};
let upper_cwnd_option = if let Some(upper_cwnd_dyn) = set_upper_bound_cwnd.clone() {
patch_upper_cwnd(upper_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
} else {
None
};
let direct_cwnd_option = if let Some(direct_cwnd_dyn) = set_direct_cwnd.clone() {
patch_direct_cwnd(direct_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
} else {
None
};
let mut metric_option: Option<MetricA> = None;
let mut initial_cwnd_option: Option<u32> = None;
let mut upper_cwnd_option: Option<u32> = None;
let mut direct_cwnd_option: Option<u32> = None;

if let Some(initial_cwnd_dyn) = set_initial_cwnd.clone() {
(initial_cwnd_option, metric_option) = patch_initial_cwnd(initial_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
}
if let Some(upper_cwnd_dyn) = set_upper_bound_cwnd.clone() {
(upper_cwnd_option, metric_option) = patch_upper_cwnd(upper_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
}
if let Some(direct_cwnd_dyn) = set_direct_cwnd.clone() {
(direct_cwnd_option, metric_option) = patch_direct_cwnd(direct_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
}
append_tcp_info_to_stats_log(socket_file_descriptor,
&mut timedata,
initial_cwnd_option,
upper_cwnd_option,
direct_cwnd_option)?;
direct_cwnd_option,
metric_option)?;


let min_rtt_us: u64 = sockopt_get_tcp_info(socket_file_descriptor)?.tcpi_rtt as u64;
Expand Down Expand Up @@ -400,12 +422,13 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
&mut timedata,
None,
None,
None,
None)?;
last_logging_timestamp_us = now_us;
}
if transmission_type.is_l2b() {
if let Some(upper_cwnd_dyn) = set_upper_bound_cwnd.clone() {
let upper_cwnd_option = patch_upper_cwnd_if_new_metric(upper_cwnd_dyn,
let (upper_cwnd_option, metric_option) = patch_upper_cwnd_if_new_metric(upper_cwnd_dyn,
socket_file_descriptor,
&client_metrics,
&client_addr,
Expand All @@ -415,11 +438,12 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
&mut timedata,
None,
upper_cwnd_option,
None)?;
None,
metric_option)?;
}
}
if let Some(direct_cwnd_dyn) = set_direct_cwnd.clone() {
let direct_cwnd_option = patch_direct_cwnd_if_new_metric(direct_cwnd_dyn,
let (direct_cwnd_option, metric_option) = patch_direct_cwnd_if_new_metric(direct_cwnd_dyn,
socket_file_descriptor,
&client_metrics,
&client_addr,
Expand All @@ -429,7 +453,8 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
&mut timedata,
None,
None,
direct_cwnd_option)?;
direct_cwnd_option,
metric_option)?;
}
}
}
Expand All @@ -440,6 +465,7 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
&mut timedata,
None,
None,
None,
None)?;

finish_transmission(&mut joined_stream)?;
Expand Down Expand Up @@ -568,6 +594,7 @@ fn append_tcp_info_to_stats_log(
initial_cwnd_option: Option<u32>,
upper_cwnd_option: Option<u32>,
direct_cwnd_option: Option<u32>,
metric_option: Option<MetricA>,
) -> Result<()> {
let latest_tcp_info = sockopt_get_tcp_info(socket_file_descriptor)?;

Expand All @@ -583,6 +610,7 @@ fn append_tcp_info_to_stats_log(
set_initital_cwnd: initial_cwnd_option,
set_upper_cwnd: upper_cwnd_option,
set_direct_cwnd: direct_cwnd_option,
set_used_metric: metric_option,
tcp_info: latest_tcp_info,
});
Ok(())
Expand Down
17 changes: 10 additions & 7 deletions src/external.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, Result};
use bus::BusReader;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::mpsc::{SyncSender, TryRecvError};
Expand All @@ -18,7 +19,7 @@ pub const METRIC_INITIAL: [u8; 4] = [0x11, 0x21, 0x12, 0x22];
pub const METRIC_TYPE_INDEX: usize = 4;
pub const METRIC_TYPE_START: usize = 5;
pub const METRIC_TYPE_A: u8 = 1;
pub const METRIC_TYPE_A_SIZE: usize = 48;
pub const METRIC_TYPE_A_SIZE: usize = 72;
pub const METRIC_TYPE_B: u8 = 2;

pub struct ExternalInterfaceArgs {
Expand All @@ -35,10 +36,12 @@ pub enum MetricTypes {
}

#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub struct MetricA {
/// Timestamp when the metric was calculated
timestamp_us: u64,
/// Fair (0), less fair (1) , Greedy (2)
fair_share_type: u8,
/// Fair share send rate [bits/subframe] = [bits/ms]
fair_share_send_rate: u64,
/// Timestamp of the latest DCI used to calculate the metric
Expand All @@ -47,12 +50,12 @@ pub struct MetricA {
oldest_dci_timestamp_us: u64,
/// Number of DCIs used to calculate the metric
nof_dci: u16,
/// Number of phy-layer re-transmissions
nof_re_tx: u16,
/// Flag, signalling whether phy_rate was averagerd over all RNTIs or just our UE RNTI
flag_phy_rate_all_rnti: u8,
/// Average bit per PRB (either over all RNTIs or just the UE RNTI)
/// Ratio of no TBS PRBs to PRBs with TBS (~reTx ratio)
no_tbs_prb_ratio: f64,
/// Average bit per PRB
phy_rate: u64,
/// Flag, signalling whether phy_rate was static (0) averagerd over all RNTIs (1) or just our UE RNTI (2)
phy_rate_mode: u8,
}

#[derive(Clone, Debug, PartialEq, Default)]
Expand Down
Loading