Skip to content

Commit

Permalink
test(udp): add most of the unit tests for udp socket
Browse files Browse the repository at this point in the history
Signed-off-by: Max SCHMELLER <[email protected]>
  • Loading branch information
mojomex committed Nov 22, 2024
1 parent dabdb09 commit df40395
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 6 deletions.
21 changes: 15 additions & 6 deletions nebula_hw_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ cmake_minimum_required(VERSION 3.14)
project(nebula_hw_interfaces)

# Default to C++17
if (NOT CMAKE_CXX_STANDARD)
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif ()
endif()

if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic -Wunused-function)
endif ()
endif()

find_package(ament_cmake_auto REQUIRED)
find_package(boost_tcp_driver)
Expand Down Expand Up @@ -53,7 +53,6 @@ target_link_libraries(nebula_hw_interfaces_velodyne PUBLIC
${boost_tcp_driver_LIBRARIES}
${boost_udp_driver_LIBRARIES}
${velodyne_msgs_TARGETS}

)
target_include_directories(nebula_hw_interfaces_velodyne PUBLIC
${boost_udp_driver_INCLUDE_DIRS}
Expand All @@ -68,7 +67,6 @@ target_link_libraries(nebula_hw_interfaces_robosense PUBLIC
${boost_tcp_driver_LIBRARIES}
${boost_udp_driver_LIBRARIES}
${robosense_msgs_TARGETS}

)
target_include_directories(nebula_hw_interfaces_robosense PUBLIC
${boost_udp_driver_INCLUDE_DIRS}
Expand Down Expand Up @@ -100,6 +98,17 @@ install(DIRECTORY include/ DESTINATION include/${PROJECT_NAME})
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()

find_package(ament_cmake_gtest REQUIRED)

ament_add_gtest(test_udp
test/common/test_udp.cpp
)

target_include_directories(test_udp PUBLIC
${nebula_common_INCLUDE_DIRS}
include
test)
endif()

ament_export_include_directories("include/${PROJECT_NAME}")
Expand Down
184 changes: 184 additions & 0 deletions nebula_hw_interfaces/test/common/test_udp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2024 TIER IV, Inc.

#include "common/test_udp/utils.hpp"
#include "nebula_common/util/expected.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp"

#include <gtest/gtest.h>
#include <gtest/internal/gtest-internal.h>

#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <fstream>
#include <optional>
#include <string>
#include <vector>

namespace nebula::drivers::connections
{

using std::chrono_literals::operator""ms;

static const char localhost_ip[] = "127.0.0.1";
static const char broadcast_ip[] = "255.255.255.255";
static const char any_ip[] = "0.0.0.0";
static const char multicast_group[] = "230.1.2.3";

static const char sender_ip[] = "192.168.201";
static const uint16_t sender_port = 7373;
static const uint16_t host_port = 6262;

static const std::chrono::duration send_receive_timeout = 100ms;

UdpSocket::callback_t empty_cb()
{
return [](const auto &, const auto &) {};
}

util::expected<uint64_t, std::string> read_sys_param(const std::string & param_fqn)
{
std::string path = "/proc/sys/" + param_fqn;
std::replace(path.begin(), path.end(), '.', '/');
std::ifstream ifs{path};
if (!ifs) return "could not read " + param_fqn;

size_t param{};
if (!(ifs >> param)) return param_fqn + " has unrecognized format";
return param;
}

TEST(test_udp, test_basic_lifecycle)
{
ASSERT_NO_THROW(
UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()).unsubscribe());
}

TEST(test_udp, test_special_addresses_bind)
{
ASSERT_NO_THROW(UdpSocket().init(broadcast_ip, host_port).bind());
ASSERT_NO_THROW(UdpSocket().init(any_ip, host_port).bind());
}

TEST(test_udp, test_wildcard_multicast_join_error)
{
ASSERT_THROW(
UdpSocket().init(broadcast_ip, host_port).join_multicast_group(multicast_group).bind(),
SocketError);
}

TEST(test_udp, test_buffer_resize)
{
auto rmem_max_maybe = read_sys_param("net.core.rmem_max");
if (!rmem_max_maybe.has_value()) GTEST_SKIP() << rmem_max_maybe.error();
size_t rmem_max = rmem_max_maybe.value();

// Setting buffer sizes up to and including rmem_max shall succeed
ASSERT_NO_THROW(
UdpSocket().init(localhost_ip, host_port).set_socket_buffer_size(rmem_max).bind());

// Linux only supports sizes up to INT32_MAX
ASSERT_THROW(
UdpSocket()
.init(localhost_ip, host_port)
.set_socket_buffer_size(static_cast<size_t>(INT32_MAX) + 1)
.bind(),
UsageError);
}

TEST(test_udp, test_correct_usage_is_enforced)
{
// These functions require other functions (e.g. `init()`) to be called beforehand
ASSERT_THROW(UdpSocket().bind(), UsageError);
ASSERT_THROW(UdpSocket().join_multicast_group(multicast_group), UsageError);
ASSERT_THROW(UdpSocket().subscribe(empty_cb()), UsageError);

// The following functions can be called in any order, any number of times
ASSERT_NO_THROW(UdpSocket().limit_to_sender(sender_ip, sender_port));
ASSERT_NO_THROW(UdpSocket().init(localhost_ip, host_port));
ASSERT_NO_THROW(UdpSocket()
.limit_to_sender(sender_ip, sender_port)
.init(localhost_ip, host_port)
.limit_to_sender(sender_ip, sender_port)
.init(localhost_ip, host_port)
.bind());

// Sockets cannot be re-bound
ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).bind().bind(), UsageError);

ASSERT_THROW(
UdpSocket().init(localhost_ip, host_port).bind().init(localhost_ip, host_port), UsageError);
ASSERT_NO_THROW(
UdpSocket().init(localhost_ip, host_port).bind().limit_to_sender(sender_ip, sender_port));

// Only bound sockets can be subscribed
ASSERT_THROW(UdpSocket().init(localhost_ip, host_port).subscribe(empty_cb()), UsageError);
ASSERT_NO_THROW(UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()));

// Only one callback can exist
ASSERT_THROW(
UdpSocket().init(localhost_ip, host_port).bind().subscribe(empty_cb()).subscribe(empty_cb()),
UsageError);

// But un- and re-subscribing shall be supported
ASSERT_NO_THROW(UdpSocket()
.init(localhost_ip, host_port)
.bind()
.subscribe(empty_cb())
.unsubscribe()
.subscribe(empty_cb()));

// Unsubscribing on a non-subscribed socket shall also be supported
ASSERT_NO_THROW(UdpSocket().unsubscribe());
}

TEST(test_udp, test_receiving)
{
const std::vector<uint8_t> payload{1, 2, 3};
UdpSocket sock{};
sock.init(localhost_ip, host_port).bind();

auto err_no_opt = udp_send(localhost_ip, host_port, payload);
if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value());

auto result_opt = receive_once(sock, send_receive_timeout);

ASSERT_TRUE(result_opt.has_value());
auto const & [recv_payload, metadata] = result_opt.value();
ASSERT_EQ(recv_payload, payload);
ASSERT_FALSE(metadata.truncated);
ASSERT_EQ(metadata.drops_since_last_receive, 0);

// TODO(mojomex): currently cannot test timestamping on loopback interface (no timestamp produced)
}

TEST(test_udp, test_receiving_oversized)
{
const size_t mtu = 1500;
std::vector<uint8_t> payload;
payload.resize(mtu + 1);
UdpSocket sock{};
sock.init(localhost_ip, host_port).set_mtu(mtu).bind();

auto err_no_opt = udp_send(localhost_ip, host_port, payload);
if (err_no_opt.has_value()) GTEST_SKIP() << strerror(err_no_opt.value());

auto result_opt = receive_once(sock, send_receive_timeout);

ASSERT_TRUE(result_opt.has_value());
auto const & [recv_payload, metadata] = result_opt.value();
ASSERT_TRUE(std::equal(recv_payload.begin(), recv_payload.end(), payload.begin()));
ASSERT_TRUE(metadata.truncated);
ASSERT_EQ(metadata.drops_since_last_receive, 0);
}

} // namespace nebula::drivers::connections

int main(int argc, char * argv[])
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
};
78 changes: 78 additions & 0 deletions nebula_hw_interfaces/test/common/test_udp/utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp"

#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>

#include <cerrno>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstring>
#include <optional>
#include <vector>

std::optional<int> udp_send(
const char * to_ip, uint16_t to_port, const std::vector<uint8_t> & bytes)
{
int sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (sock_fd < 0) return errno;

int enable = 1;
int result = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (result < 0) return errno;

sockaddr_in receiver_addr{};
memset(&receiver_addr, 0, sizeof(receiver_addr));
receiver_addr.sin_family = AF_INET;
receiver_addr.sin_port = htons(to_port);
receiver_addr.sin_addr.s_addr = inet_addr(to_ip);

result = sendto(
sock_fd, bytes.data(), bytes.size(), 0, reinterpret_cast<const sockaddr *>(&receiver_addr),
sizeof(receiver_addr));
if (result < 0) return errno;
result = close(sock_fd);
if (result < 0) return errno;
return {};
}

template<typename _T, typename _R>
std::optional<
std::pair<std::vector<uint8_t>, nebula::drivers::connections::UdpSocket::ReceiveMetadata>>
receive_once(
nebula::drivers::connections::UdpSocket & sock, std::chrono::duration<_T, _R> timeout)
{
std::condition_variable cv_received_result;
std::mutex mtx_result;
std::optional<
std::pair<std::vector<uint8_t>, nebula::drivers::connections::UdpSocket::ReceiveMetadata>>
result;

sock.subscribe([&](const auto & data, const auto & metadata) {
std::lock_guard lock(mtx_result);
result.emplace(data, metadata);
cv_received_result.notify_one();
});

std::unique_lock lock(mtx_result);
cv_received_result.wait_for(lock, timeout, [&result]() { return result.has_value(); });
sock.unsubscribe();
return result;
}

0 comments on commit df40395

Please sign in to comment.