Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2152 Remove own server and client socket implementations #1599

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/BuildTests.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ endfunction()
enable_testing(test)

SET(TEST_BASE_LIB test_base)
set(TEST_BASE_SOURCES "TestBase.cpp" "RandomServerSocket.cpp" "StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp" "ReadFromFlowFileTestProcessor.cpp" "DummyProcessor.cpp")
set(TEST_BASE_SOURCES "TestBase.cpp" "StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp" "ReadFromFlowFileTestProcessor.cpp" "DummyProcessor.cpp")
list(TRANSFORM TEST_BASE_SOURCES PREPEND "${TEST_DIR}/")
add_library(${TEST_BASE_LIB} STATIC "${TEST_BASE_SOURCES}")
target_link_libraries(${TEST_BASE_LIB} core-minifi)
Expand Down
131 changes: 27 additions & 104 deletions controller/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,79 +29,9 @@

namespace org::apache::nifi::minifi::controller {

namespace {

class ClientConnection {
public:
explicit ClientConnection(const ControllerSocketData& socket_data) {
if (socket_data.ssl_context_service) {
connectTcpSocketOverSsl(socket_data);
} else {
connectTcpSocket(socket_data);
}
}

[[nodiscard]] io::BaseStream* getStream() const {
return stream_.get();
}

private:
void connectTcpSocketOverSsl(const ControllerSocketData& socket_data) {
auto ssl_context = utils::net::getSslContext(*socket_data.ssl_context_service);
asio::ssl::stream<asio::ip::tcp::socket> socket(io_context_, ssl_context);

asio::ip::tcp::resolver resolver(io_context_);
asio::error_code err;
asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port), err);
if (err) {
logger_->log_error("Resolving host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}

asio::connect(socket.lowest_layer(), endpoints, err);
if (err) {
logger_->log_error("Connecting to host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}
socket.handshake(asio::ssl::stream_base::client, err);
if (err) {
logger_->log_error("SSL handshake failed while connecting to host '%s' on port '%s' with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}
stream_ = std::make_unique<io::AsioStream<asio::ssl::stream<asio::ip::tcp::socket>>>(std::move(socket));
}

void connectTcpSocket(const ControllerSocketData& socket_data) {
asio::ip::tcp::socket socket(io_context_);

asio::ip::tcp::resolver resolver(io_context_);
asio::error_code err;
asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port));
if (err) {
logger_->log_error("Resolving host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}

asio::connect(socket, endpoints, err);
if (err) {
logger_->log_error("Connecting to host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}
stream_ = std::make_unique<io::AsioStream<asio::ip::tcp::socket>>(std::move(socket));
}

asio::io_context io_context_;
std::unique_ptr<io::BaseStream> stream_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ClientConnection>::getLogger()};
};

} // namespace


bool sendSingleCommand(const ControllerSocketData& socket_data, uint8_t op, const std::string& value) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool sendSingleCommand(const utils::net::SocketData& socket_data, uint8_t op, const std::string& value) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -110,22 +40,21 @@ bool sendSingleCommand(const ControllerSocketData& socket_data, uint8_t op, cons
return connection_stream->write(buffer.getBuffer()) == buffer.size();
}

bool stopComponent(const ControllerSocketData& socket_data, const std::string& component) {
bool stopComponent(const utils::net::SocketData& socket_data, const std::string& component) {
return sendSingleCommand(socket_data, static_cast<uint8_t>(c2::Operation::stop), component);
}

bool startComponent(const ControllerSocketData& socket_data, const std::string& component) {
bool startComponent(const utils::net::SocketData& socket_data, const std::string& component) {
return sendSingleCommand(socket_data, static_cast<uint8_t>(c2::Operation::start), component);
}

bool clearConnection(const ControllerSocketData& socket_data, const std::string& connection) {
bool clearConnection(const utils::net::SocketData& socket_data, const std::string& connection) {
return sendSingleCommand(socket_data, static_cast<uint8_t>(c2::Operation::clear), connection);
}

bool updateFlow(const ControllerSocketData& socket_data, std::ostream &out, const std::string& file) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& file) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
auto op = static_cast<uint8_t>(c2::Operation::update);
Expand All @@ -152,10 +81,9 @@ bool updateFlow(const ControllerSocketData& socket_data, std::ostream &out, cons
return true;
}

bool getFullConnections(const ControllerSocketData& socket_data, std::ostream &out) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
auto op = static_cast<uint8_t>(c2::Operation::describe);
Expand All @@ -181,10 +109,9 @@ bool getFullConnections(const ControllerSocketData& socket_data, std::ostream &o
return true;
}

bool getConnectionSize(const ControllerSocketData& socket_data, std::ostream &out, const std::string& connection) {
ClientConnection client_connection(socket_data);
auto connection_stream = client_connection.getStream();
if (!connection_stream) {
bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& connection) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
auto op = static_cast<uint8_t>(c2::Operation::describe);
Expand All @@ -206,10 +133,9 @@ bool getConnectionSize(const ControllerSocketData& socket_data, std::ostream &ou
return true;
}

bool listComponents(const ControllerSocketData& socket_data, std::ostream &out, bool show_header) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -235,10 +161,9 @@ bool listComponents(const ControllerSocketData& socket_data, std::ostream &out,
return true;
}

bool listConnections(const ControllerSocketData& socket_data, std::ostream &out, bool show_header) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -262,10 +187,9 @@ bool listConnections(const ControllerSocketData& socket_data, std::ostream &out,
return true;
}

bool printManifest(const ControllerSocketData& socket_data, std::ostream &out) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -282,10 +206,9 @@ bool printManifest(const ControllerSocketData& socket_data, std::ostream &out) {
return true;
}

bool getJstacks(const ControllerSocketData& socket_data, std::ostream &out) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand Down
30 changes: 12 additions & 18 deletions controller/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,20 @@
#include <memory>
#include <string>

#include "controllers/SSLContextService.h"
#include "utils/net/AsioSocketUtils.h"

namespace org::apache::nifi::minifi::controller {

struct ControllerSocketData {
std::string host = "localhost";
int port = -1;
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service;
};

bool sendSingleCommand(const ControllerSocketData& socket_data, uint8_t op, const std::string& value);
bool stopComponent(const ControllerSocketData& socket_data, const std::string& component);
bool startComponent(const ControllerSocketData& socket_data, const std::string& component);
bool clearConnection(const ControllerSocketData& socket_data, const std::string& connection);
bool updateFlow(const ControllerSocketData& socket_data, std::ostream &out, const std::string& file);
bool getFullConnections(const ControllerSocketData& socket_data, std::ostream &out);
bool getConnectionSize(const ControllerSocketData& socket_data, std::ostream &out, const std::string& connection);
bool listComponents(const ControllerSocketData& socket_data, std::ostream &out, bool show_header = true);
bool listConnections(const ControllerSocketData& socket_data, std::ostream &out, bool show_header = true);
bool printManifest(const ControllerSocketData& socket_data, std::ostream &out);
bool getJstacks(const ControllerSocketData& socket_data, std::ostream &out);
bool sendSingleCommand(const utils::net::SocketData& socket_data, uint8_t op, const std::string& value);
bool stopComponent(const utils::net::SocketData& socket_data, const std::string& component);
bool startComponent(const utils::net::SocketData& socket_data, const std::string& component);
bool clearConnection(const utils::net::SocketData& socket_data, const std::string& connection);
bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& file);
bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream &out);
bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& connection);
bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out);
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);

} // namespace org::apache::nifi::minifi::controller
6 changes: 1 addition & 5 deletions controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "c2/ControllerSocketProtocol.h"
#include "core/controller/ControllerService.h"
#include "core/extension/ExtensionManager.h"
#include "io/StreamFactory.h"
#include "core/ConfigurationFactory.h"
#include "Exception.h"

Expand All @@ -39,12 +38,10 @@ std::shared_ptr<minifi::core::controller::ControllerService> getControllerServic
minifi::core::extension::ExtensionManager::get().initialize(configuration);

configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
const auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto flow_configuration = minifi::core::createFlowConfiguration(
minifi::core::ConfigurationContext{
.flow_file_repo = nullptr,
.content_repo = nullptr,
.stream_factory = stream_factory,
.configuration = configuration,
.path = configuration->get(minifi::Configure::nifi_flow_configuration_file)},
nifi_configuration_class_name);
Expand Down Expand Up @@ -104,14 +101,13 @@ int main(int argc, char **argv) {
log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);

minifi::controller::ControllerSocketData socket_data;
minifi::utils::net::SocketData socket_data;
try {
socket_data.ssl_context_service = getSSLContextService(configuration);
} catch(const minifi::Exception& ex) {
logger->log_error(ex.what());
exit(1);
}
auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);

cxxopts::Options options("MiNiFiController", "MiNiFi local agent controller");
options.positional_help("[optional args]").show_positional_help();
Expand Down
2 changes: 1 addition & 1 deletion controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class ControllerTestFixture {
std::unique_ptr<minifi::c2::ControllerSocketProtocol> controller_socket_protocol_;
std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
std::unique_ptr<TestControllerServiceProvider> controller_service_provider_;
minifi::controller::ControllerSocketData controller_socket_data_;
minifi::utils::net::SocketData controller_socket_data_;
};

TEST_CASE_METHOD(ControllerTestFixture, "Test listComponents", "[controllerTests]") {
Expand Down
1 change: 0 additions & 1 deletion extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "CivetServer.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
Expand Down
5 changes: 2 additions & 3 deletions extensions/coap/tests/CoapIntegrationBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ class CoapIntegrationBase : public IntegrationBase {

std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, test_file_location});
auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, configuration, test_file_location});

core::YamlConfiguration yaml_config({test_repo, content_repo, stream_factory, configuration, test_file_location});
core::YamlConfiguration yaml_config({test_repo, content_repo, configuration, test_file_location});

std::shared_ptr<core::ProcessGroup> pg{ yaml_config.getRoot() };

Expand Down
1 change: 0 additions & 1 deletion extensions/http-curl/processors/InvokeHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "core/Relationship.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
Expand Down
6 changes: 2 additions & 4 deletions extensions/http-curl/tests/C2PauseResumeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "io/StreamFactory.h"
#include "integration/IntegrationBase.h"

class VerifyC2PauseResume : public VerifyC2Base {
Expand Down Expand Up @@ -128,18 +127,17 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
configuration->set(minifi::Configure::nifi_flow_configuration_file, args.test_file);

std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);

auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, args.test_file});
auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, configuration, args.test_file});

std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo};
auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, yaml_ptr);
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration,
std::move(yaml_ptr), content_repo, std::move(metrics_publisher_store));

core::YamlConfiguration yaml_config({test_repo, content_repo, stream_factory, configuration, args.test_file});
core::YamlConfiguration yaml_config({test_repo, content_repo, configuration, args.test_file});

auto root = yaml_config.getRoot();
const auto proc = root->findProcessorByName("invoke");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,17 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase);
configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);

std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, args.test_file});
core::ConfigurationContext{test_repo, content_repo, configuration, args.test_file});

const auto controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo);

disabled = false;
std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();

core::YamlConfiguration yaml_config({test_repo, content_repo, stream_factory, configuration, args.test_file});
core::YamlConfiguration yaml_config({test_repo, content_repo, configuration, args.test_file});

auto pg = yaml_config.getRoot();

Expand Down
3 changes: 2 additions & 1 deletion extensions/http-curl/tests/HTTPHandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "range/v3/algorithm/contains.hpp"
#include "range/v3/view/filter.hpp"
#include "range/v3/view/view.hpp"
#include "utils/net/DNS.h"

static std::atomic<int> transaction_id;
static std::atomic<int> transaction_id_output;
Expand Down Expand Up @@ -99,7 +100,7 @@ class PeerResponder : public ServerAwareHandler {

bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override {
#ifdef WIN32
std::string hostname = org::apache::nifi::minifi::io::Socket::getMyHostName();
std::string hostname = org::apache::nifi::minifi::utils::net::getMyHostName();
#else
std::string hostname = "localhost";
#endif
Expand Down
1 change: 0 additions & 1 deletion extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "InvokeHTTP.h"
#include "TestBase.h"
#include "FlowController.h"
#include "io/StreamFactory.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
#include "HTTPIntegrationBase.h"
Expand Down
Loading