diff --git a/test/libsinsp_e2e/tcp_client_server.cpp b/test/libsinsp_e2e/tcp_client_server.cpp index 8558070a82..ad119864c4 100644 --- a/test/libsinsp_e2e/tcp_client_server.cpp +++ b/test/libsinsp_e2e/tcp_client_server.cpp @@ -38,6 +38,7 @@ limitations under the License. #include #include +#include #include #include @@ -71,7 +72,6 @@ void runtest(iotype iot, subprocess server_proc(helper_exe, {"tcp_server", iot_s.c_str(), - "false", stringify_bool(use_shutdown), stringify_bool(use_accept4), ntransactions_s.c_str(), @@ -83,15 +83,14 @@ void runtest(iotype iot, server_in_addr.s_addr = get_server_address(); char* server_address = inet_ntoa(server_in_addr); std::string sport; - subprocess test_proc(helper_exe, - {"tcp_client", - server_address, - iot_s.c_str(), - payload, - stringify_bool(false), - ntransactions_s, - stringify_bool(exit_no_close)}, - false); + subprocess client_proc(helper_exe, + {"tcp_client", + server_address, + iot_s.c_str(), + payload, + ntransactions_s, + stringify_bool(exit_no_close)}, + false); // // FILTER // @@ -100,7 +99,8 @@ void runtest(iotype iot, if(tinfo && tinfo->m_exe == helper_exe) { if(tinfo->m_pid == server_pid) { return server_started_filter(evt); - } else if(tinfo->m_pid == client_pid) { + } + if(tinfo->m_pid == client_pid) { return client_started_filter(evt); } } @@ -117,12 +117,12 @@ void runtest(iotype iot, server_proc.wait_for_start(); server_pid = server_proc.get_pid(); - test_proc.start(); - test_proc.wait_for_start(); - client_pid = test_proc.get_pid(); + client_proc.start(); + client_proc.wait_for_start(); + client_pid = client_proc.get_pid(); - // We use a random call to tee to signal that we're done - tee(-1, -1, 0, 0); + server_proc.wait(); + client_proc.wait(); }; std::function log_param = [](const callback_param& param) { @@ -326,14 +326,15 @@ TEST_F(sys_call_test, tcp_client_server_readv_writev_http_snaplen) { } TEST_F(sys_call_test, tcp_client_server_with_connection_before_capturing_starts) { - std::thread server_thread; - std::thread client_thread; - tcp_server server(SENDRECEIVE, true); + tcp_server server(SENDRECEIVE); uint32_t server_ip_address = get_server_address(); - tcp_client client(server_ip_address, SENDRECEIVE, default_payload, true); + tcp_client client(server_ip_address, SENDRECEIVE, default_payload); int state = 0; + ASSERT_TRUE(server.init()); + ASSERT_TRUE(client.init()); + // // FILTER // @@ -345,10 +346,14 @@ TEST_F(sys_call_test, tcp_client_server_with_connection_before_capturing_starts) // INITIALIZATION // run_callback_t test = [&](sinsp* inspector) { - server.signal_continue(); - client.signal_continue(); - server_thread.join(); - client_thread.join(); + auto future_srv = std::async(&tcp_server::run, &server); + auto future_client = std::async(&tcp_client::run, &client); + if(future_client.get() != 0) { + server.shutdown_server(); + future_srv.wait(); + } else { + ASSERT_EQ(future_srv.get(), 0); + } }; // @@ -361,11 +366,6 @@ TEST_F(sys_call_test, tcp_client_server_with_connection_before_capturing_starts) } }; - server_thread = std::thread(&tcp_server::run, &server); - client_thread = std::thread(&tcp_client::run, &client); - server.wait_till_ready(); - client.wait_till_ready(); - ASSERT_NO_FATAL_FAILURE({ event_capture::run(test, callback, diff --git a/test/libsinsp_e2e/tcp_client_server.h b/test/libsinsp_e2e/tcp_client_server.h index 4a4ae96a1a..78b6af75d8 100644 --- a/test/libsinsp_e2e/tcp_client_server.h +++ b/test/libsinsp_e2e/tcp_client_server.h @@ -31,11 +31,7 @@ limitations under the License. #include #include -#include #include -#include -#include -#include #ifndef HELPER_32 #include @@ -47,59 +43,29 @@ limitations under the License. typedef enum iotype { READWRITE, SENDRECEIVE, READVWRITEV } iotype; -class std_event { -public: - void set() { - std::lock_guard lock(m_mutex); - m_is_set = true; - m_cond.notify_one(); - } - void wait() { - std::unique_lock lock(m_mutex); - if(m_is_set) { - return; - } else { - m_cond.wait(lock, [this]() { return m_is_set; }); - } - } - -private: - std::mutex m_mutex; - std::condition_variable m_cond; - bool m_is_set{false}; -}; - class tcp_server { public: - tcp_server(iotype iot, - bool wait_for_signal_to_continue = false, - bool use_shutdown = false, - bool use_accept4 = false, - uint32_t ntransactions = 1, - bool exit_no_close = false) { + explicit tcp_server(iotype iot, + bool use_shutdown = false, + bool use_accept4 = false, + uint32_t ntransactions = 1, + bool exit_no_close = false) { + m_tid = -1; m_iot = iot; - m_wait_for_signal_to_continue = wait_for_signal_to_continue; m_use_shutdown = use_shutdown; m_use_accept4 = use_accept4; m_ntransactions = ntransactions; m_exit_no_close = exit_no_close; } - void run() { - int servSock; - int clntSock; + bool init() { struct sockaddr_in server_address; - struct sockaddr_in client_address; - unsigned int client_len; - uint32_t j; - int port = (m_exit_no_close) ? SERVER_PORT + 1 : SERVER_PORT; - - m_tid = syscall(SYS_gettid); + const int port = (m_exit_no_close) ? SERVER_PORT + 1 : SERVER_PORT; /* Create socket for incoming connections */ - if((servSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + if((m_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { perror("socket() failed"); - return; + return false; } /* Construct local address structure */ @@ -109,132 +75,124 @@ class tcp_server { server_address.sin_port = htons(port); /* Local port */ int yes = 1; - if(setsockopt(servSock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { -#ifdef FAIL - FAIL() << "setsockopt() failed"; -#endif + if(setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { + perror("setsockopt() failed"); + return false; } /* Bind to the local address */ - if(::bind(servSock, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { -#ifdef FAIL - FAIL() << "bind() failed"; -#endif - return; + if(::bind(m_socket, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { + perror("bind() failed"); + return false; } /* Mark the socket so it will listen for incoming connections */ - if(listen(servSock, 1) < 0) { - close(servSock); -#ifdef FAIL - FAIL() << "listen() failed"; -#endif - return; + if(listen(m_socket, 1) < 0) { + perror("listen() failed"); + return false; } std::cout << "SERVER UP" << std::endl; + return true; + } + + int run() { + int error = 0; + m_tid = syscall(SYS_gettid); + struct sockaddr_in client_address; + socklen_t client_len; do { /* Set the size of the in-out parameter */ client_len = sizeof(client_address); - signal_ready(); /* Wait for a client to connect */ if(m_use_accept4) { - if((clntSock = - accept4(servSock, (struct sockaddr*)&client_address, &client_len, 0)) < + if((m_cl_socket = + accept4(m_socket, (struct sockaddr*)&client_address, &client_len, 0)) < 0) { - close(servSock); -#ifdef FAIL - FAIL() << "accept() failed"; -#endif + perror("accept() failed"); + error++; break; } } else { - if((clntSock = accept(servSock, (struct sockaddr*)&client_address, &client_len)) < - 0) { - close(servSock); -#ifdef FAIL - FAIL() << "accept() failed"; -#endif + if((m_cl_socket = + accept(m_socket, (struct sockaddr*)&client_address, &client_len)) < 0) { + perror("accept() failed"); + error++; break; } } /* clntSock is connected to a client! */ - wait_for_continue(); char echoBuffer[1024]; /* Buffer for echo string */ int recvMsgSize; /* Size of received message */ - for(j = 0; j < m_ntransactions; j++) { + for(uint32_t j = 0; j < m_ntransactions; j++) { if(m_iot == SENDRECEIVE) { - if((recvMsgSize = recv(clntSock, echoBuffer, sizeof(echoBuffer), 0)) < 0) { -#ifdef FAIL - FAIL() << "recv() failed"; -#endif + if((recvMsgSize = recv(m_cl_socket, echoBuffer, sizeof(echoBuffer), 0)) < 0) { + perror("recv() failed"); + error++; break; } - if(send(clntSock, echoBuffer, recvMsgSize, 0) != recvMsgSize) { -#ifdef FAIL - FAIL() << "send() failed"; -#endif + if(send(m_cl_socket, echoBuffer, recvMsgSize, 0) != recvMsgSize) { + perror("send() failed"); + error++; break; } } else if(m_iot == READWRITE || m_iot == READVWRITEV) { - if((recvMsgSize = read(clntSock, echoBuffer, sizeof(echoBuffer))) < 0) { -#ifdef FAIL - FAIL() << "recv() failed"; -#endif + if((recvMsgSize = read(m_cl_socket, echoBuffer, sizeof(echoBuffer))) < 0) { + perror("recv() failed"); + error++; break; } - if(write(clntSock, echoBuffer, recvMsgSize) != recvMsgSize) { -#ifdef FAIL - FAIL() << "send() failed"; -#endif + if(write(m_cl_socket, echoBuffer, recvMsgSize) != recvMsgSize) { + perror("send() failed"); + error++; break; } } } + } while(0); - if(m_exit_no_close) { - return; - } - + if(error) { + // Close the server socket so that client will be notified if(m_use_shutdown) { -#ifdef ASSERT_EQ - ASSERT_EQ(0, shutdown(clntSock, SHUT_WR)); -#endif + shutdown(m_socket, SHUT_RDWR); } else { - close(clntSock); /* Close client socket */ + close(m_socket); } - break; - } while(0); - - if(m_use_shutdown) { -#ifdef ASSERT_EQ - ASSERT_EQ(0, shutdown(servSock, SHUT_RDWR)); -#endif } else { - close(servSock); + if(!m_exit_no_close) { + if(m_use_shutdown) { + if(m_cl_socket != -1) { + shutdown(m_cl_socket, SHUT_WR); + } + if(m_socket != -1) { + shutdown(m_socket, SHUT_RDWR); + } + } else { + if(m_cl_socket != -1) { + close(m_cl_socket); + } + if(m_socket != -1) { + close(m_socket); + } + } + } } + return error; } - void wait_till_ready() { m_ready.wait(); } - - void signal_continue() { m_continue.set(); } + int64_t get_tid() const { return m_tid; } - int64_t get_tid() { return m_tid; } - -private: - void signal_ready() { m_ready.set(); } - - void wait_for_continue() { - if(m_wait_for_signal_to_continue) { - m_continue.wait(); + void shutdown_server() { + if(m_socket != -1) { + shutdown(m_socket, SHUT_RDWR); } } - std_event m_ready; - std_event m_continue; - bool m_wait_for_signal_to_continue; +private: + int m_socket = -1; + int m_cl_socket = -1; int64_t m_tid; iotype m_iot; bool m_use_shutdown; @@ -248,34 +206,24 @@ class tcp_client { tcp_client(uint32_t server_ip_address, iotype iot, const std::string& payload = "0123456789QWERTYUIOPASDFGHJKLZXCVBNM", - bool on_thread = false, uint32_t ntransactions = 1, bool exit_no_close = false) { + m_tid = -1; m_server_ip_address = server_ip_address; m_iot = iot; m_payload = payload; - m_on_thread = on_thread; m_ntransactions = ntransactions; m_exit_no_close = exit_no_close; } - void run() { - int sock; + bool init() { struct sockaddr_in server_address; - char buffer[m_payload.size() + 1]; - int bytes_received; - uint32_t j; - int port = (m_exit_no_close) ? SERVER_PORT + 1 : SERVER_PORT; - - m_tid = syscall(SYS_gettid); + const int port = (m_exit_no_close) ? SERVER_PORT + 1 : SERVER_PORT; /* Create a reliable, stream socket using TCP */ - if((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { -#ifdef FAIL - FAIL() << "socket() failed"; -#endif - signal_ready(); - return; + if((m_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("socket() failed"); + return false; } /* Construct the server address structure */ @@ -285,62 +233,63 @@ class tcp_client { server_address.sin_port = htons(port); /* Server port */ /* Establish the connection to the server */ - if(connect(sock, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { -#ifdef FAIL - FAIL() << "connect() failed"; -#endif - signal_ready(); - return; + if(connect(m_socket, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { + perror("connect() failed"); + return false; } - signal_ready(); - wait_for_continue(); - for(j = 0; j < m_ntransactions; j++) { + std::cout << "CLIENT UP" << std::endl; + return true; + } + + int run() { + m_tid = syscall(SYS_gettid); + char buffer[m_payload.size() + 1]; + int bytes_received; + int error = 0; + + for(uint32_t j = 0; j < m_ntransactions; j++) { /* Send the string to the server */ if(m_iot == SENDRECEIVE) { - if(send(sock, m_payload.c_str(), m_payload.length(), 0) != + if(send(m_socket, m_payload.c_str(), m_payload.length(), 0) != (ssize_t)m_payload.length()) { - close(sock); -#ifdef FAIL - FAIL() << "send() sent a different number of bytes than expected"; -#endif - return; + perror("send() sent a different number of bytes than expected"); + error++; + break; } - if((bytes_received = recv(sock, buffer, m_payload.length(), 0)) <= 0) { - close(sock); -#ifdef FAIL - FAIL() << "recv() failed or connection closed prematurely"; -#endif - return; + if((bytes_received = recv(m_socket, buffer, m_payload.length(), 0)) <= 0) { + perror("recv() failed or connection closed prematurely"); + error++; + break; } buffer[bytes_received] = '\0'; /* Terminate the string! */ -#ifdef ASSERT_STREQ - ASSERT_STREQ(m_payload.c_str(), buffer); -#endif + if(strcmp(m_payload.c_str(), buffer) != 0) { + perror("SENDRECEIVE buffer mismatch"); + error++; + break; + } } else if(m_iot == READWRITE) { - if(write(sock, m_payload.c_str(), m_payload.length()) != + if(write(m_socket, m_payload.c_str(), m_payload.length()) != (ssize_t)m_payload.length()) { - close(sock); -#ifdef FAIL - FAIL() << "send() sent a different number of bytes than expected"; -#endif - return; + perror("send() sent a different number of bytes than expected"); + error++; + break; } - if((bytes_received = read(sock, buffer, m_payload.length())) <= 0) { - close(sock); -#ifdef FAIL - FAIL() << "recv() failed or connection closed prematurely"; -#endif - return; + if((bytes_received = read(m_socket, buffer, m_payload.length())) <= 0) { + perror("recv() failed or connection closed prematurely"); + error++; + break; } buffer[bytes_received] = '\0'; /* Terminate the string! */ -#ifdef ASSERT_STREQ - ASSERT_STREQ(m_payload.c_str(), buffer); -#endif + if(strcmp(m_payload.c_str(), buffer) != 0) { + perror("READWRITE buffer mismatch"); + error++; + break; + } } else if(m_iot == READVWRITEV) { int wv_count; char msg1[m_payload.length() / 3 + 1]; @@ -366,52 +315,33 @@ class tcp_client { wv[2].iov_len = m_payload.length() / 3; wv_count = 3; - if(writev(sock, wv, wv_count) != (ssize_t)m_payload.length()) { - close(sock); -#ifdef FAIL - FAIL() << "send() sent a different number of bytes than expected"; -#endif - return; + if(writev(m_socket, wv, wv_count) != (ssize_t)m_payload.length()) { + perror("send() sent a different number of bytes than expected"); + error++; + break; } - if((bytes_received = readv(sock, wv, wv_count)) <= 0) { - close(sock); -#ifdef FAIL - FAIL() << "recv() failed or connection closed prematurely"; -#endif - return; + if((bytes_received = readv(m_socket, wv, wv_count)) <= 0) { + perror("recv() failed or connection closed prematurely"); + error++; + break; } } } - if(m_exit_no_close) { - return; + if((!m_exit_no_close || error) && m_socket != -1) { + close(m_socket); } - - close(sock); + return 0; } - void wait_till_ready() { m_ready.wait(); } - - void signal_continue() { m_continue.set(); } - - int64_t get_tid() { return m_tid; } + int64_t get_tid() const { return m_tid; } private: - void signal_ready() { m_ready.set(); } - - void wait_for_continue() { - if(m_on_thread) { - m_continue.wait(); - } - } - + int m_socket = -1; uint32_t m_server_ip_address; iotype m_iot; - std_event m_ready; - std_event m_continue; int64_t m_tid; - bool m_on_thread; uint32_t m_ntransactions; bool m_exit_no_close; std::string m_payload; diff --git a/test/libsinsp_e2e/tcp_client_server_ipv4_mapped.cpp b/test/libsinsp_e2e/tcp_client_server_ipv4_mapped.cpp index a7648ba48e..5362250f0a 100644 --- a/test/libsinsp_e2e/tcp_client_server_ipv4_mapped.cpp +++ b/test/libsinsp_e2e/tcp_client_server_ipv4_mapped.cpp @@ -38,6 +38,7 @@ limitations under the License. #include #include +#include #include #include @@ -51,36 +52,28 @@ limitations under the License. class tcp_server_ipv4m { public: - tcp_server_ipv4m(iotype iot, - bool wait_for_signal_to_continue = false, - bool use_shutdown = false, - bool use_accept4 = false, - uint32_t ntransactions = 1, - bool exit_no_close = false) { + explicit tcp_server_ipv4m(iotype iot, + bool use_shutdown = false, + bool use_accept4 = false, + uint32_t ntransactions = 1, + bool exit_no_close = false) { + m_tid = -1; m_iot = iot; - m_wait_for_signal_to_continue = wait_for_signal_to_continue; m_use_shutdown = use_shutdown; m_use_accept4 = use_accept4; m_ntransactions = ntransactions; m_exit_no_close = exit_no_close; } - void run() { - int servSock; - int clntSock; + bool init() { struct sockaddr_in6 server_address; - struct sockaddr_in6 client_address; - unsigned int client_len; - uint32_t j; - - int port = (m_exit_no_close) ? SERVER_PORT + 2 : SERVER_PORT; - m_tid = syscall(SYS_gettid); + const int port = (m_exit_no_close) ? SERVER_PORT + 2 : SERVER_PORT; /* Create socket for incoming connections */ - if((servSock = socket(AF_INET6, SOCK_STREAM, 0)) < 0) { + if((m_socket = socket(AF_INET6, SOCK_STREAM, 0)) < 0) { perror("socket() failed"); - return; + return false; } /* Construct local address structure */ @@ -90,110 +83,125 @@ class tcp_server_ipv4m { server_address.sin6_addr = in6addr_any; int yes = 1; - if(setsockopt(servSock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { - FAIL() << "setsockopt() failed"; + if(setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { + perror("setsockopt() failed"); + return false; } /* Bind to the local address */ - if(::bind(servSock, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { + if(::bind(m_socket, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { perror("bind() failed"); - FAIL(); - return; + return false; } /* Mark the socket so it will listen for incoming connections */ - if(listen(servSock, 1) < 0) { - close(servSock); - FAIL() << "listen() failed"; - return; + if(listen(m_socket, 1) < 0) { + perror("listen() failed"); + return false; } + + std::cout << "SERVER UP" << std::endl; + return true; + } + + int run() { + struct sockaddr_in6 client_address; + unsigned int client_len; + m_tid = syscall(SYS_gettid); + int error = 0; do { /* Set the size of the in-out parameter */ client_len = sizeof(client_address); - signal_ready(); /* Wait for a client to connect */ if(m_use_accept4) { - if((clntSock = - accept4(servSock, (struct sockaddr*)&client_address, &client_len, 0)) < + if((m_cl_socket = + accept4(m_socket, (struct sockaddr*)&client_address, &client_len, 0)) < 0) { - close(servSock); - FAIL() << "accept() failed"; + perror("accept() failed"); + error++; break; } } else { - if((clntSock = accept(servSock, (struct sockaddr*)&client_address, &client_len)) < - 0) { - close(servSock); - FAIL() << "accept() failed"; + if((m_cl_socket = + accept(m_socket, (struct sockaddr*)&client_address, &client_len)) < 0) { + perror("accept() failed"); + error++; break; } } - /* clntSock is connected to a client! */ - wait_for_continue(); + /* m_cl_socket is connected to a client! */ char echoBuffer[BUFFER_LENGTH]; /* Buffer for echo string */ int recvMsgSize; /* Size of received message */ - for(j = 0; j < m_ntransactions; j++) { + for(uint32_t j = 0; j < m_ntransactions; j++) { if(m_iot == SENDRECEIVE) { - if((recvMsgSize = recv(clntSock, echoBuffer, BUFFER_LENGTH, 0)) < 0) { - FAIL() << "recv() failed"; + if((recvMsgSize = recv(m_cl_socket, echoBuffer, BUFFER_LENGTH, 0)) < 0) { + perror("recv() failed"); + error++; break; } - if(send(clntSock, echoBuffer, recvMsgSize, 0) != recvMsgSize) { - FAIL() << "send() failed"; + if(send(m_cl_socket, echoBuffer, recvMsgSize, 0) != recvMsgSize) { + perror("send() failed"); + error++; break; } } else if(m_iot == READWRITE || m_iot == READVWRITEV) { - if((recvMsgSize = read(clntSock, echoBuffer, BUFFER_LENGTH)) < 0) { - FAIL() << "recv() failed"; + if((recvMsgSize = read(m_cl_socket, echoBuffer, BUFFER_LENGTH)) < 0) { + perror("recv() failed"); + error++; break; } - if(write(clntSock, echoBuffer, recvMsgSize) != recvMsgSize) { - FAIL() << "send() failed"; + if(write(m_cl_socket, echoBuffer, recvMsgSize) != recvMsgSize) { + perror("send() failed"); + error++; break; } } } + } while(0); - if(m_exit_no_close) { - return; - } - + if(error) { + // Close the server socket so that client will be notified if(m_use_shutdown) { - ASSERT_EQ(0, shutdown(clntSock, SHUT_WR)); + shutdown(m_socket, SHUT_RDWR); } else { - close(clntSock); /* Close client socket */ + close(m_socket); } - break; - } while(0); - - if(m_use_shutdown) { - ASSERT_EQ(0, shutdown(servSock, SHUT_RDWR)); } else { - close(servSock); + if(!m_exit_no_close) { + if(m_use_shutdown) { + if(m_cl_socket != -1) { + shutdown(m_cl_socket, SHUT_WR); + } + if(m_socket != -1) { + shutdown(m_socket, SHUT_RDWR); + } + } else { + if(m_cl_socket != -1) { + close(m_cl_socket); + } + if(m_socket != -1) { + close(m_socket); + } + } + } } + return error; } - void wait_till_ready() { m_ready.wait(); } - - void signal_continue() { m_continue.set(); } - - int64_t get_tid() { return m_tid; } - -private: - void signal_ready() { m_ready.set(); } + int64_t get_tid() const { return m_tid; } - void wait_for_continue() { - if(m_wait_for_signal_to_continue) { - m_continue.wait(); + void shutdown_server() { + if(m_socket != -1) { + shutdown(m_socket, SHUT_RDWR); } } - std_event m_ready; - std_event m_continue; - bool m_wait_for_signal_to_continue; +private: + int m_socket = -1; + int m_cl_socket = -1; int64_t m_tid; iotype m_iot; bool m_use_shutdown; @@ -206,31 +214,23 @@ class tcp_client_ipv4m { public: tcp_client_ipv4m(uint32_t server_ip_address, iotype iot, - bool on_thread = false, uint32_t ntransactions = 1, bool exit_no_close = false) { + m_tid = -1; m_server_ip_address = server_ip_address; m_iot = iot; - m_on_thread = on_thread; m_ntransactions = ntransactions; m_exit_no_close = exit_no_close; } - void run() { - int sock; + bool init() { struct sockaddr_in server_address; - char buffer[BUFFER_LENGTH]; - int payload_length; - int bytes_received; - uint32_t j; - int port = (m_exit_no_close) ? SERVER_PORT + 2 : SERVER_PORT; - - m_tid = syscall(SYS_gettid); + const int port = (m_exit_no_close) ? SERVER_PORT + 2 : SERVER_PORT; /* Create a reliable, stream socket using TCP */ - if((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - FAIL() << "socket() failed"; - return; + if((m_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("socket() failed"); + return false; } /* Construct the server address structure */ @@ -240,47 +240,66 @@ class tcp_client_ipv4m { server_address.sin_port = htons(port); /* Server port */ /* Establish the connection to the server */ - if(connect(sock, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { + if(connect(m_socket, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { perror("connect() failed"); - FAIL(); - return; + return false; } - signal_ready(); - wait_for_continue(); + + std::cout << "CLIENT UP" << std::endl; + return true; + } + + int run() { + char buffer[BUFFER_LENGTH]; + int payload_length; + int bytes_received; + uint32_t j; + int error = 0; + + m_tid = syscall(SYS_gettid); + payload_length = strlen(PAYLOAD); /* Determine input length */ for(j = 0; j < m_ntransactions; j++) { /* Send the string to the server */ if(m_iot == SENDRECEIVE) { - if(send(sock, PAYLOAD, payload_length, 0) != payload_length) { - close(sock); - FAIL() << "send() sent a different number of bytes than expected"; - return; + if(send(m_socket, PAYLOAD, payload_length, 0) != payload_length) { + perror("send() sent a different number of bytes than expected"); + error++; + break; } - if((bytes_received = recv(sock, buffer, BUFFER_LENGTH - 1, 0)) <= 0) { - close(sock); - FAIL() << "recv() failed or connection closed prematurely"; - return; + if((bytes_received = recv(m_socket, buffer, BUFFER_LENGTH - 1, 0)) <= 0) { + perror("recv() failed or connection closed prematurely"); + error++; + break; } buffer[bytes_received] = '\0'; /* Terminate the string! */ - ASSERT_STREQ(PAYLOAD, buffer); + if(strcmp(PAYLOAD, buffer) != 0) { + perror("SENDRECEIVE buffer mismatch, expected " PAYLOAD); + error++; + break; + } } else if(m_iot == READWRITE) { - if(write(sock, PAYLOAD, payload_length) != payload_length) { - close(sock); - FAIL() << "send() sent a different number of bytes than expected"; - return; + if(write(m_socket, PAYLOAD, payload_length) != payload_length) { + perror("send() sent a different number of bytes than expected"); + error++; + break; } - if((bytes_received = read(sock, buffer, BUFFER_LENGTH - 1)) <= 0) { - close(sock); - FAIL() << "recv() failed or connection closed prematurely"; - return; + if((bytes_received = read(m_socket, buffer, BUFFER_LENGTH - 1)) <= 0) { + perror("recv() failed or connection closed prematurely"); + error++; + break; } buffer[bytes_received] = '\0'; /* Terminate the string! */ - ASSERT_STREQ(PAYLOAD, buffer); + if(strcmp(PAYLOAD, buffer) != 0) { + perror("READWRITE buffer mismatch, expected " PAYLOAD); + error++; + break; + } } else if(m_iot == READVWRITEV) { std::string ps(PAYLOAD); int wv_count; @@ -305,48 +324,32 @@ class tcp_client_ipv4m { wv[2].iov_len = BUFFER_LENGTH / 3; wv_count = 3; - if(writev(sock, wv, wv_count) != payload_length) { - close(sock); - FAIL() << "send() sent a different number of bytes than expected"; - return; + if(writev(m_socket, wv, wv_count) != payload_length) { + perror("send() sent a different number of bytes than expected"); + error++; + break; } - if((bytes_received = readv(sock, wv, wv_count)) <= 0) { - close(sock); - FAIL() << "recv() failed or connection closed prematurely"; - return; + if((bytes_received = readv(m_socket, wv, wv_count)) <= 0) { + perror("recv() failed or connection closed prematurely"); + error++; + break; } } } - - if(m_exit_no_close) { - return; + if((!m_exit_no_close || error) && m_socket != -1) { + close(m_socket); } - - close(sock); + return error; } - void wait_till_ready() { m_ready.wait(); } - - void signal_continue() { m_continue.set(); } - int64_t get_tid() { return m_tid; } private: - void signal_ready() { m_ready.set(); } - - void wait_for_continue() { - if(m_on_thread) { - m_continue.wait(); - } - } - + int m_socket = -1; uint32_t m_server_ip_address; iotype m_iot; - std_event m_ready; - std_event m_continue; int64_t m_tid; - bool m_on_thread; uint32_t m_ntransactions; bool m_exit_no_close; }; @@ -357,18 +360,12 @@ void runtest_ipv4m(iotype iot, uint32_t ntransactions = 1, bool exit_no_close = false) { int callnum = 0; - std::thread server_thread; - std::shared_ptr server = std::make_shared(iot, - false, - use_shutdown, - use_accept4, - ntransactions, - exit_no_close); + tcp_server_ipv4m server(iot, use_shutdown, use_accept4, ntransactions, exit_no_close); uint32_t server_ip_address = get_server_address(); struct in_addr server_in_addr; - server_in_addr.s_addr = get_server_address(); + server_in_addr.s_addr = server_ip_address; char* server_address = inet_ntoa(server_in_addr); std::string sport; @@ -376,11 +373,16 @@ void runtest_ipv4m(iotype iot, int ctid; int tid = -1; + tcp_client_ipv4m client(server_ip_address, iot, ntransactions, exit_no_close); + + ASSERT_TRUE(server.init()); + ASSERT_TRUE(client.init()); + // // FILTER // event_filter_t filter = [&](sinsp_evt* evt) { - return evt->get_tid() == server->get_tid() || evt->get_tid() == tid; + return evt->get_tid() == server.get_tid() || evt->get_tid() == tid; }; // @@ -388,19 +390,20 @@ void runtest_ipv4m(iotype iot, // run_callback_async_t test = [&]() { tid = gettid(); - server_thread = std::thread(&tcp_server_ipv4m::run, server); - server->wait_till_ready(); - tcp_client_ipv4m client(server_ip_address, iot, false, ntransactions, exit_no_close); + // Run the server + auto future_srv = std::async(&tcp_server_ipv4m::run, server); - client.run(); - - ctid = client.get_tid(); - sleep(1); - server_thread.join(); - - // We use a random call to tee to signal that we're done - tee(-1, -1, 0, 0); + // Run the client + if(client.run() != 0) { + server.shutdown_server(); + future_srv.wait(); + } else { + ctid = client.get_tid(); + ASSERT_EQ(future_srv.get(), 0); + // We use a random call to tee to signal that we're done + tee(-1, -1, 0, 0); + } }; // @@ -543,7 +546,7 @@ void runtest_ipv4m(iotype iot, } if((PPME_SYSCALL_CLOSE_X == evt->get_type() || PPME_SOCKET_SHUTDOWN_X == evt->get_type()) && - 0 == state && evt->get_tid() == server->get_tid()) { + 0 == state && evt->get_tid() == server.get_tid()) { if(exit_no_close) { FAIL(); } @@ -555,7 +558,7 @@ void runtest_ipv4m(iotype iot, if(evt->get_type() == PPME_GENERIC_E) { if(std::stoll(evt->get_param_value_str("ID", false)) == PPM_SC_TEE) { sinsp_threadinfo* ti = - param.m_inspector->get_thread_ref(server->get_tid(), false, true).get(); + param.m_inspector->get_thread_ref(server.get_tid(), false, true).get(); ASSERT_NE(ti, nullptr); ti = param.m_inspector->get_thread_ref(ctid, false, true).get(); ASSERT_NE(ti, nullptr); @@ -608,11 +611,9 @@ TEST_F(sys_call_test, tcp_client_server_noclose_ipv4m) { } TEST_F(sys_call_test, tcp_client_server_with_connection_before_capturing_starts_ipv4m) { - std::thread server_thread; - std::thread client_thread; - tcp_server_ipv4m server(SENDRECEIVE, true); + tcp_server_ipv4m server(SENDRECEIVE); uint32_t server_ip_address = get_server_address(); - tcp_client_ipv4m client(server_ip_address, SENDRECEIVE, true); + tcp_client_ipv4m client(server_ip_address, SENDRECEIVE); int state = 0; @@ -623,14 +624,21 @@ TEST_F(sys_call_test, tcp_client_server_with_connection_before_capturing_starts_ return evt->get_tid() == server.get_tid() || evt->get_tid() == client.get_tid(); }; + ASSERT_TRUE(server.init()); + ASSERT_TRUE(client.init()); + // // INITIALIZATION // run_callback_t test = [&](sinsp* inspector) { - server.signal_continue(); - client.signal_continue(); - server_thread.join(); - client_thread.join(); + auto future_srv = std::async(&tcp_server_ipv4m::run, &server); + auto future_client = std::async(&tcp_client_ipv4m::run, &client); + if(future_client.get() != 0) { + server.shutdown_server(); + future_srv.wait(); + } else { + ASSERT_EQ(future_srv.get(), 0); + } }; // @@ -643,11 +651,6 @@ TEST_F(sys_call_test, tcp_client_server_with_connection_before_capturing_starts_ } }; - server_thread = std::thread(&tcp_server_ipv4m::run, &server); - server.wait_till_ready(); - client_thread = std::thread(&tcp_client_ipv4m::run, &client); - client.wait_till_ready(); - ASSERT_NO_FATAL_FAILURE({ event_capture::run(test, callback, filter); }); ASSERT_EQ(1, state); } diff --git a/test/libsinsp_e2e/test_helper.cpp b/test/libsinsp_e2e/test_helper.cpp index 972eafcc7e..8096cd64ba 100644 --- a/test/libsinsp_e2e/test_helper.cpp +++ b/test/libsinsp_e2e/test_helper.cpp @@ -680,9 +680,9 @@ const unordered_map&)>> func_map = { tcp_client client(inet_addr(args.at(0).c_str()), iot, args.at(2), - str_to_bool(args.at(3)), - stoi(args.at(4)), - str_to_bool(args.at(5))); + stoi(args.at(3)), + str_to_bool(args.at(4))); + client.init(); client.run(); }}, {"tcp_server", @@ -692,9 +692,9 @@ const unordered_map&)>> func_map = { tcp_server server(iot, str_to_bool(args.at(1)), str_to_bool(args.at(2)), - str_to_bool(args.at(3)), - stoi(args.at(4)), - str_to_bool(args.at(5))); + stoi(args.at(3)), + str_to_bool(args.at(4))); + server.init(); server.run(); }}, {"pread_pwrite", pread_pwrite},