Skip to content

Commit

Permalink
MINIFICPP-2152 Remove own server and client socket implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jul 3, 2023
1 parent 22b1ef4 commit f0caa49
Show file tree
Hide file tree
Showing 116 changed files with 296 additions and 4,655 deletions.
2 changes: 1 addition & 1 deletion cmake/BuildTests.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ endfunction()
enable_testing(test)

SET(TEST_BASE_LIB test_base)
set(TEST_BASE_SOURCES "TestBase.cpp" "RandomServerSocket.cpp" "StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp" "ReadFromFlowFileTestProcessor.cpp" "DummyProcessor.cpp")
set(TEST_BASE_SOURCES "TestBase.cpp" "StatefulProcessor.cpp" "WriteToFlowFileTestProcessor.cpp" "ReadFromFlowFileTestProcessor.cpp" "DummyProcessor.cpp")
list(TRANSFORM TEST_BASE_SOURCES PREPEND "${TEST_DIR}/")
add_library(${TEST_BASE_LIB} STATIC "${TEST_BASE_SOURCES}")
target_link_libraries(${TEST_BASE_LIB} core-minifi)
Expand Down
131 changes: 27 additions & 104 deletions controller/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,79 +29,9 @@

namespace org::apache::nifi::minifi::controller {

namespace {

class ClientConnection {
public:
explicit ClientConnection(const ControllerSocketData& socket_data) {
if (socket_data.ssl_context_service) {
connectTcpSocketOverSsl(socket_data);
} else {
connectTcpSocket(socket_data);
}
}

[[nodiscard]] io::BaseStream* getStream() const {
return stream_.get();
}

private:
void connectTcpSocketOverSsl(const ControllerSocketData& socket_data) {
auto ssl_context = utils::net::getSslContext(*socket_data.ssl_context_service);
asio::ssl::stream<asio::ip::tcp::socket> socket(io_context_, ssl_context);

asio::ip::tcp::resolver resolver(io_context_);
asio::error_code err;
asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port), err);
if (err) {
logger_->log_error("Resolving host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}

asio::connect(socket.lowest_layer(), endpoints, err);
if (err) {
logger_->log_error("Connecting to host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}
socket.handshake(asio::ssl::stream_base::client, err);
if (err) {
logger_->log_error("SSL handshake failed while connecting to host '%s' on port '%s' with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}
stream_ = std::make_unique<io::AsioStream<asio::ssl::stream<asio::ip::tcp::socket>>>(std::move(socket));
}

void connectTcpSocket(const ControllerSocketData& socket_data) {
asio::ip::tcp::socket socket(io_context_);

asio::ip::tcp::resolver resolver(io_context_);
asio::error_code err;
asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port));
if (err) {
logger_->log_error("Resolving host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}

asio::connect(socket, endpoints, err);
if (err) {
logger_->log_error("Connecting to host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
return;
}
stream_ = std::make_unique<io::AsioStream<asio::ip::tcp::socket>>(std::move(socket));
}

asio::io_context io_context_;
std::unique_ptr<io::BaseStream> stream_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ClientConnection>::getLogger()};
};

} // namespace


bool sendSingleCommand(const ControllerSocketData& socket_data, uint8_t op, const std::string& value) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool sendSingleCommand(const utils::net::SocketData& socket_data, uint8_t op, const std::string& value) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -110,22 +40,21 @@ bool sendSingleCommand(const ControllerSocketData& socket_data, uint8_t op, cons
return connection_stream->write(buffer.getBuffer()) == buffer.size();
}

bool stopComponent(const ControllerSocketData& socket_data, const std::string& component) {
bool stopComponent(const utils::net::SocketData& socket_data, const std::string& component) {
return sendSingleCommand(socket_data, c2::Operation::STOP, component);
}

bool startComponent(const ControllerSocketData& socket_data, const std::string& component) {
bool startComponent(const utils::net::SocketData& socket_data, const std::string& component) {
return sendSingleCommand(socket_data, c2::Operation::START, component);
}

bool clearConnection(const ControllerSocketData& socket_data, const std::string& connection) {
bool clearConnection(const utils::net::SocketData& socket_data, const std::string& connection) {
return sendSingleCommand(socket_data, c2::Operation::CLEAR, connection);
}

bool updateFlow(const ControllerSocketData& socket_data, std::ostream &out, const std::string& file) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& file) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
uint8_t op = c2::Operation::UPDATE;
Expand All @@ -152,10 +81,9 @@ bool updateFlow(const ControllerSocketData& socket_data, std::ostream &out, cons
return true;
}

bool getFullConnections(const ControllerSocketData& socket_data, std::ostream &out) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
uint8_t op = c2::Operation::DESCRIBE;
Expand All @@ -181,10 +109,9 @@ bool getFullConnections(const ControllerSocketData& socket_data, std::ostream &o
return true;
}

bool getConnectionSize(const ControllerSocketData& socket_data, std::ostream &out, const std::string& connection) {
ClientConnection client_connection(socket_data);
auto connection_stream = client_connection.getStream();
if (!connection_stream) {
bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& connection) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
uint8_t op = c2::Operation::DESCRIBE;
Expand All @@ -206,10 +133,9 @@ bool getConnectionSize(const ControllerSocketData& socket_data, std::ostream &ou
return true;
}

bool listComponents(const ControllerSocketData& socket_data, std::ostream &out, bool show_header) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -235,10 +161,9 @@ bool listComponents(const ControllerSocketData& socket_data, std::ostream &out,
return true;
}

bool listConnections(const ControllerSocketData& socket_data, std::ostream &out, bool show_header) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -262,10 +187,9 @@ bool listConnections(const ControllerSocketData& socket_data, std::ostream &out,
return true;
}

bool printManifest(const ControllerSocketData& socket_data, std::ostream &out) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand All @@ -282,10 +206,9 @@ bool printManifest(const ControllerSocketData& socket_data, std::ostream &out) {
return true;
}

bool getJstacks(const ControllerSocketData& socket_data, std::ostream &out) {
ClientConnection connection(socket_data);
auto connection_stream = connection.getStream();
if (!connection_stream) {
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
Expand Down
30 changes: 12 additions & 18 deletions controller/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,20 @@
#include <memory>
#include <string>

#include "controllers/SSLContextService.h"
#include "utils/net/AsioSocketUtils.h"

namespace org::apache::nifi::minifi::controller {

struct ControllerSocketData {
std::string host = "localhost";
int port = -1;
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service;
};

bool sendSingleCommand(const ControllerSocketData& socket_data, uint8_t op, const std::string& value);
bool stopComponent(const ControllerSocketData& socket_data, const std::string& component);
bool startComponent(const ControllerSocketData& socket_data, const std::string& component);
bool clearConnection(const ControllerSocketData& socket_data, const std::string& connection);
bool updateFlow(const ControllerSocketData& socket_data, std::ostream &out, const std::string& file);
bool getFullConnections(const ControllerSocketData& socket_data, std::ostream &out);
bool getConnectionSize(const ControllerSocketData& socket_data, std::ostream &out, const std::string& connection);
bool listComponents(const ControllerSocketData& socket_data, std::ostream &out, bool show_header = true);
bool listConnections(const ControllerSocketData& socket_data, std::ostream &out, bool show_header = true);
bool printManifest(const ControllerSocketData& socket_data, std::ostream &out);
bool getJstacks(const ControllerSocketData& socket_data, std::ostream &out);
bool sendSingleCommand(const utils::net::SocketData& socket_data, uint8_t op, const std::string& value);
bool stopComponent(const utils::net::SocketData& socket_data, const std::string& component);
bool startComponent(const utils::net::SocketData& socket_data, const std::string& component);
bool clearConnection(const utils::net::SocketData& socket_data, const std::string& connection);
bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& file);
bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream &out);
bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& connection);
bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out);
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);

} // namespace org::apache::nifi::minifi::controller
7 changes: 1 addition & 6 deletions controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "c2/ControllerSocketProtocol.h"
#include "core/controller/ControllerService.h"
#include "core/extension/ExtensionManager.h"
#include "io/StreamFactory.h"
#include "core/ConfigurationFactory.h"
#include "Exception.h"

Expand All @@ -39,12 +38,10 @@ std::shared_ptr<minifi::core::controller::ControllerService> getControllerServic
minifi::core::extension::ExtensionManager::get().initialize(configuration);

configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
const auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto flow_configuration = minifi::core::createFlowConfiguration(
minifi::core::ConfigurationContext{
.flow_file_repo = nullptr,
.content_repo = nullptr,
.stream_factory = stream_factory,
.configuration = configuration,
.path = configuration->get(minifi::Configure::nifi_flow_configuration_file)},
nifi_configuration_class_name);
Expand Down Expand Up @@ -104,15 +101,13 @@ int main(int argc, char **argv) {
log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);

minifi::controller::ControllerSocketData socket_data;
minifi::utils::net::SocketData socket_data;
try {
socket_data.ssl_context_service = getSSLContextService(configuration);
} catch(const minifi::Exception& ex) {
logger->log_error(ex.what());
exit(1);
}
auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);


std::string port_str;
std::string ca_cert;
Expand Down
2 changes: 1 addition & 1 deletion controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class ControllerTestFixture {
std::unique_ptr<minifi::c2::ControllerSocketProtocol> controller_socket_protocol_;
std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
std::unique_ptr<TestControllerServiceProvider> controller_service_provider_;
minifi::controller::ControllerSocketData controller_socket_data_;
minifi::utils::net::SocketData controller_socket_data_;
};

TEST_CASE_METHOD(ControllerTestFixture, "Test listComponents", "[controllerTests]") {
Expand Down
1 change: 0 additions & 1 deletion extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "CivetServer.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
Expand Down
5 changes: 2 additions & 3 deletions extensions/coap/tests/CoapIntegrationBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ class CoapIntegrationBase : public IntegrationBase {

std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, test_file_location});
auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, configuration, test_file_location});

core::YamlConfiguration yaml_config({test_repo, content_repo, stream_factory, configuration, test_file_location});
core::YamlConfiguration yaml_config({test_repo, content_repo, configuration, test_file_location});

std::shared_ptr<core::ProcessGroup> pg{ yaml_config.getRoot() };

Expand Down
1 change: 0 additions & 1 deletion extensions/http-curl/processors/InvokeHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "core/Relationship.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
Expand Down
6 changes: 2 additions & 4 deletions extensions/http-curl/tests/C2PauseResumeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "io/StreamFactory.h"
#include "integration/IntegrationBase.h"

class VerifyC2PauseResume : public VerifyC2Base {
Expand Down Expand Up @@ -129,18 +128,17 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
configuration->set(minifi::Configure::nifi_flow_configuration_file, args.test_file);

std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);

auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, args.test_file});
auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, configuration, args.test_file});

std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo};
auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, yaml_ptr);
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration,
std::move(yaml_ptr), content_repo, std::move(metrics_publisher_store));

core::YamlConfiguration yaml_config({test_repo, content_repo, stream_factory, configuration, args.test_file});
core::YamlConfiguration yaml_config({test_repo, content_repo, configuration, args.test_file});

auto root = yaml_config.getRoot();
const auto proc = root->findProcessorByName("invoke");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,17 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase);
configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);

std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, args.test_file});
core::ConfigurationContext{test_repo, content_repo, configuration, args.test_file});

const auto controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo);

disabled = false;
std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();

core::YamlConfiguration yaml_config({test_repo, content_repo, stream_factory, configuration, args.test_file});
core::YamlConfiguration yaml_config({test_repo, content_repo, configuration, args.test_file});

auto pg = yaml_config.getRoot();

Expand Down
3 changes: 2 additions & 1 deletion extensions/http-curl/tests/HTTPHandlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "range/v3/algorithm/contains.hpp"
#include "range/v3/view/filter.hpp"
#include "range/v3/view/view.hpp"
#include "utils/net/DNS.h"

static std::atomic<int> transaction_id;
static std::atomic<int> transaction_id_output;
Expand Down Expand Up @@ -100,7 +101,7 @@ class PeerResponder : public ServerAwareHandler {

bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override {
#ifdef WIN32
std::string hostname = org::apache::nifi::minifi::io::Socket::getMyHostName();
std::string hostname = org::apache::nifi::minifi::utils::net::getMyHostName();
#else
std::string hostname = "localhost";
#endif
Expand Down
1 change: 0 additions & 1 deletion extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "TestBase.h"
#include "Catch.h"
#include "FlowController.h"
#include "io/StreamFactory.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
#include "HTTPIntegrationBase.h"
Expand Down
Loading

0 comments on commit f0caa49

Please sign in to comment.