Skip to content

Commit

Permalink
Reimplement for Multi-thread and remove write_set
Browse files Browse the repository at this point in the history
  • Loading branch information
davidBar-On committed Dec 10, 2023
1 parent 9970208 commit dbe8d68
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 68 deletions.
1 change: 0 additions & 1 deletion src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ struct iperf_test
/* Select related parameters */
int max_fd;
fd_set read_set; /* set of read sockets */
fd_set write_set; /* set of write sockets */

/* Interval related members */
int omitting;
Expand Down
74 changes: 46 additions & 28 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1678,9 +1678,6 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv)
} else if (test->role == 'c' && (test->server_skew_threshold != 0)){
i_errno = IESERVERONLY;
return -1;
} else if (test->role == 'c' && rcv_timeout_flag && test->mode == SENDER){
i_errno = IERVRSONLYRCVTIMEOUT;
return -1;
} else if (test->role == 's' && (server_rsa_private_key || test->server_authorized_users) &&
!(server_rsa_private_key && test->server_authorized_users)) {
i_errno = IESETSERVERAUTH;
Expand Down Expand Up @@ -1837,13 +1834,28 @@ void iperf_close_logfile(struct iperf_test *test)
int
iperf_set_send_state(struct iperf_test *test, signed char state)
{
int l;
const char c = 0;

if (test->debug_level >= DEBUG_LEVEL_INFO)
fprintf(stderr, "Setting and sending new test state %d (changed from %d).\n", state, test->state);

if (test->ctrl_sck >= 0) {
test->state = state;
if (Nwrite(test->ctrl_sck, (char*) &state, sizeof(state), Ptcp) < 0) {
i_errno = IESENDMESSAGE;
return -1;
}

if (state == IPERF_DONE || state == CLIENT_TERMINATE) {
// Send additional bytes to complete the sent size to JSON length prefix,
// in case the other size is waiting for JSON.
l = sizeof(uint32_t) - sizeof(state);
while (l-- > 0)
Nwrite(test->ctrl_sck, &c, 1, Ptcp);
}
}

return 0;
}

Expand Down Expand Up @@ -2366,6 +2378,9 @@ get_parameters(struct iperf_test *test)
static int
send_results(struct iperf_test *test)
{
if (test->debug_level >= DEBUG_LEVEL_INFO)
fprintf(stderr, "Sending results.\n");

int r = 0;
cJSON *j;
cJSON *j_streams;
Expand Down Expand Up @@ -2467,6 +2482,7 @@ send_results(struct iperf_test *test)
}
cJSON_Delete(j);
}

return r;
}

Expand All @@ -2475,6 +2491,9 @@ send_results(struct iperf_test *test)
static int
get_results(struct iperf_test *test)
{
if (test->debug_level >= DEBUG_LEVEL_INFO)
fprintf(stderr, "Getting results.\n");

int r = 0;
cJSON *j;
cJSON *j_cpu_util_total;
Expand Down Expand Up @@ -3057,14 +3076,7 @@ iperf_free_test(struct iperf_test *test)
free(test->remote_congestion_used);
if (test->timestamp_format)
free(test->timestamp_format);
if (test->omit_timer != NULL)
tmr_cancel(test->omit_timer);
if (test->timer != NULL)
tmr_cancel(test->timer);
if (test->stats_timer != NULL)
tmr_cancel(test->stats_timer);
if (test->reporter_timer != NULL)
tmr_cancel(test->reporter_timer);
iperf_cancel_test_timers(test);

/* Free protocol list */
while (!SLIST_EMPTY(&test->protocols)) {
Expand Down Expand Up @@ -3144,22 +3156,9 @@ iperf_reset_test(struct iperf_test *test)
SLIST_REMOVE_HEAD(&test->streams, streams);
iperf_free_stream(sp);
}
if (test->omit_timer != NULL) {
tmr_cancel(test->omit_timer);
test->omit_timer = NULL;
}
if (test->timer != NULL) {
tmr_cancel(test->timer);
test->timer = NULL;
}
if (test->stats_timer != NULL) {
tmr_cancel(test->stats_timer);
test->stats_timer = NULL;
}
if (test->reporter_timer != NULL) {
tmr_cancel(test->reporter_timer);
test->reporter_timer = NULL;
}

iperf_cancel_test_timers(test);

test->done = 0;

SLIST_INIT(&test->streams);
Expand Down Expand Up @@ -3203,7 +3202,6 @@ iperf_reset_test(struct iperf_test *test)
test->no_delay = 0;

FD_ZERO(&test->read_set);
FD_ZERO(&test->write_set);

test->num_streams = 1;
test->settings->socket_bufsize = 0;
Expand Down Expand Up @@ -3289,6 +3287,26 @@ iperf_reset_stats(struct iperf_test *test)
}
}

void
iperf_cancel_test_timers(struct iperf_test *test)
{
if (test->stats_timer != NULL) {
tmr_cancel(test->stats_timer);
test->stats_timer = NULL;
}
if (test->reporter_timer != NULL) {
tmr_cancel(test->reporter_timer);
test->reporter_timer = NULL;
}
if (test->omit_timer != NULL) {
tmr_cancel(test->omit_timer);
test->omit_timer = NULL;
}
if (test->timer != NULL) {
tmr_cancel(test->timer);
test->timer = NULL;
}
}

/**************************************************************************/

Expand Down
8 changes: 7 additions & 1 deletion src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ int iperf_defaults(struct iperf_test * testp);
*/
void iperf_free_test(struct iperf_test * testp);

/**
* iperf_cancel_test_timers -- cancel test timers
*
*/
void iperf_cancel_test_timers(struct iperf_test * testp);

/**
* iperf_new_stream -- return a net iperf_stream with default values
*
Expand Down Expand Up @@ -414,7 +420,7 @@ enum {
IESKEWTHRESHOLD = 29, // Invalid value specified as skew threshold
IEIDLETIMEOUT = 30, // Invalid value specified as idle state timeout
IERCVTIMEOUT = 31, // Illegal message receive timeout
IERVRSONLYRCVTIMEOUT = 32, // Client receive timeout is valid only in reverse mode
// [DELETED] IERVRSONLYRCVTIMEOUT = 32, // Client receive timeout is valid only in reverse mode
IESNDTIMEOUT = 33, // Illegal message send timeout
IEUDPFILETRANSFER = 34, // Cannot transfer file using UDP
IESERVERAUTHUSERS = 35, // Cannot access authorized users file
Expand Down
35 changes: 26 additions & 9 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ iperf_connect(struct iperf_test *test)
return -1;
}
FD_ZERO(&test->read_set);
FD_ZERO(&test->write_set);

make_cookie(test->cookie);

Expand Down Expand Up @@ -499,6 +498,9 @@ iperf_connect(struct iperf_test *test)
int
iperf_client_end(struct iperf_test *test)
{
if (test->debug_level >= DEBUG_LEVEL_INFO)
fprintf(stderr, "Ending client test.\n");

if (NULL == test)
{
iperf_err(NULL, "No test\n");
Expand Down Expand Up @@ -533,7 +535,7 @@ iperf_run_client(struct iperf_test * test)
{
int startup;
int result = 0;
fd_set read_set, write_set;
fd_set read_set;
struct iperf_time now;
struct timeval* timeout = NULL;
struct iperf_stream *sp;
Expand All @@ -544,6 +546,7 @@ iperf_run_client(struct iperf_test * test)
int64_t t_usecs;
int64_t timeout_us;
int64_t rcv_timeout_us;
int64_t rcv_timeout_value_in_us;
int i_errno_save;

if (NULL == test)
Expand Down Expand Up @@ -580,8 +583,9 @@ iperf_run_client(struct iperf_test * test)

/* Begin calculating CPU utilization */
cpu_util(NULL);
rcv_timeout_value_in_us = (test->settings->rcv_timeout.secs * SEC_TO_US) + test->settings->rcv_timeout.usecs;
if (test->mode != SENDER)
rcv_timeout_us = (test->settings->rcv_timeout.secs * SEC_TO_US) + test->settings->rcv_timeout.usecs;
rcv_timeout_us = rcv_timeout_value_in_us;
else
rcv_timeout_us = 0;

Expand All @@ -591,12 +595,17 @@ iperf_run_client(struct iperf_test * test)
startup = 1;
while (test->state != IPERF_DONE) {
memcpy(&read_set, &test->read_set, sizeof(fd_set));
memcpy(&write_set, &test->write_set, sizeof(fd_set));
iperf_time_now(&now);
timeout = tmr_timeout(&now);

// In reverse active mode client ensures data is received
if (test->state == TEST_RUNNING && rcv_timeout_us > 0) {
// In reverse/bidir active mode client ensures data is received.
// Same for receiving control messages at the end of the test.
if ( (rcv_timeout_us > 0 && test->state == TEST_RUNNING)
|| (rcv_timeout_value_in_us > 0
&& (test->state == TEST_END
|| test->state == EXCHANGE_RESULTS
|| test->state == DISPLAY_RESULTS)) )
{
timeout_us = -1;
if (timeout != NULL) {
used_timeout.tv_sec = timeout->tv_sec;
Expand All @@ -621,16 +630,22 @@ iperf_run_client(struct iperf_test * test)

result = select(test->max_fd + 1,
&read_set,
(test->state == TEST_RUNNING && !test->reverse) ? &write_set : NULL,
NULL,
NULL,
timeout);
#else
result = select(test->max_fd + 1, &read_set, &write_set, NULL, timeout);
result = select(test->max_fd + 1, &read_set, NULL, NULL, timeout);
#endif // __vxworks or __VXWORKS__
if (result < 0 && errno != EINTR) {
i_errno = IESELECT;
goto cleanup_and_fail;
} else if (result == 0 && test->state == TEST_RUNNING && rcv_timeout_us > 0) {
} else if ( result == 0 &&
((rcv_timeout_us > 0 && test->state == TEST_RUNNING)
|| (rcv_timeout_value_in_us > 0
&& (test->state == TEST_END
|| test->state == EXCHANGE_RESULTS
|| test->state == DISPLAY_RESULTS))) )
{
/*
* If nothing was received in non-reverse running state
* then probably something got stuck - either client,
Expand Down Expand Up @@ -748,6 +763,8 @@ iperf_run_client(struct iperf_test * test)
test->done = 1;
cpu_util(test->cpu_util);
test->stats_callback(test);
// Timers not needed at test end and may interrupt with select() receive timeout
iperf_cancel_test_timers(test);
if (iperf_set_send_state(test, TEST_END) != 0)
goto cleanup_and_fail;
}
Expand Down
4 changes: 0 additions & 4 deletions src/iperf_error.c
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,6 @@ iperf_strerror(int int_errno)
case IEUDPFILETRANSFER:
snprintf(errstr, len, "cannot transfer file using UDP");
break;
case IERVRSONLYRCVTIMEOUT:
snprintf(errstr, len, "client receive timeout is valid only in receiving mode");
perr = 1;
break;
case IEDAEMON:
snprintf(errstr, len, "unable to become a daemon");
perr = 1;
Expand Down
34 changes: 10 additions & 24 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ iperf_server_listen(struct iperf_test *test)
}

FD_ZERO(&test->read_set);
FD_ZERO(&test->write_set);
FD_SET(test->listener, &test->read_set);
if (test->listener > test->max_fd) test->max_fd = test->listener;

Expand Down Expand Up @@ -238,7 +237,6 @@ iperf_handle_message_server(struct iperf_test *test)
test->stats_callback(test);
SLIST_FOREACH(sp, &test->streams, streams) {
FD_CLR(sp->socket, &test->read_set);
FD_CLR(sp->socket, &test->write_set);
close(sp->socket);
}
test->reporter_callback(test);
Expand Down Expand Up @@ -268,8 +266,8 @@ iperf_handle_message_server(struct iperf_test *test)
iperf_err(test, "the client has terminated");
SLIST_FOREACH(sp, &test->streams, streams) {
FD_CLR(sp->socket, &test->read_set);
FD_CLR(sp->socket, &test->write_set);
close(sp->socket);
sp->socket = -1;
}
test->state = IPERF_DONE;
break;
Expand All @@ -296,6 +294,7 @@ server_timer_proc(TimerClientData client_data, struct iperf_time *nowP)
sp = SLIST_FIRST(&test->streams);
SLIST_REMOVE_HEAD(&test->streams, streams);
close(sp->socket);
sp->socket = -1;
iperf_free_stream(sp);
}
close(test->ctrl_sck);
Expand Down Expand Up @@ -446,7 +445,6 @@ cleanup_server(struct iperf_test *test)
SLIST_FOREACH(sp, &test->streams, streams) {
if (sp->socket > -1) {
FD_CLR(sp->socket, &test->read_set);
FD_CLR(sp->socket, &test->write_set);
close(sp->socket);
sp->socket = -1;
}
Expand All @@ -465,28 +463,13 @@ cleanup_server(struct iperf_test *test)
close(test->prot_listener);
test->prot_listener = -1;
}

/* Cancel any remaining timers. */
if (test->stats_timer != NULL) {
tmr_cancel(test->stats_timer);
test->stats_timer = NULL;
}
if (test->reporter_timer != NULL) {
tmr_cancel(test->reporter_timer);
test->reporter_timer = NULL;
}
if (test->omit_timer != NULL) {
tmr_cancel(test->omit_timer);
test->omit_timer = NULL;
}

iperf_cancel_test_timers(test); /* Cancel any remaining timers. */

if (test->congestion_used != NULL) {
free(test->congestion_used);
test->congestion_used = NULL;
}
if (test->timer != NULL) {
tmr_cancel(test->timer);
test->timer = NULL;
}
}


Expand Down Expand Up @@ -562,7 +545,6 @@ iperf_run_server(struct iperf_test *test)
}

memcpy(&read_set, &test->read_set, sizeof(fd_set));
memcpy(&write_set, &test->write_set, sizeof(fd_set));

iperf_time_now(&now);
timeout = tmr_timeout(&now);
Expand All @@ -574,7 +556,11 @@ iperf_run_server(struct iperf_test *test)
used_timeout.tv_usec = 0;
timeout = &used_timeout;
}
} else if (test->mode != SENDER) { // In non-reverse active mode server ensures data is received
} else if (test->mode != SENDER // In non-reverse active mode server ensures data is received.
|| test->state == TEST_END // Same for receiving control messages at the end of the test.
|| test->state == EXCHANGE_RESULTS
|| test->state == DISPLAY_RESULTS)
{
timeout_us = -1;
if (timeout != NULL) {
used_timeout.tv_sec = timeout->tv_sec;
Expand Down
2 changes: 1 addition & 1 deletion src/iperf_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ iperf_tcp_recv(struct iperf_stream *sp)
}
else {
if (sp->test->debug)
printf("Late receive, state = %d\n", sp->test->state);
printf("Late receive, state = %d, bytes received = %d\n", sp->test->state, r);
}

return r;
Expand Down

0 comments on commit dbe8d68

Please sign in to comment.