Skip to content

Commit

Permalink
Remove flush argument
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Sep 19, 2023
1 parent cfbf781 commit 84abc4d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 20 deletions.
8 changes: 4 additions & 4 deletions libminifi/include/core/logging/LoggerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ class LoggerConfiguration {
*/
void initialize(const std::shared_ptr<LoggerProperties> &logger_properties);

static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(bool flush = false) {
return getCompressedLogs(std::chrono::milliseconds{0}, flush);
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs() {
return getCompressedLogs(std::chrono::milliseconds{0});
}

void initializeAlertSinks(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id);

template<class Rep, class Period>
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
return getConfiguration().compression_manager_.getCompressedLogs(time, flush);
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time) {
return getConfiguration().compression_manager_.getCompressedLogs(time);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions libminifi/include/core/logging/internal/CompressionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ class CompressionManager {
std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory);

template<class Rep, class Period>
std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time) {
std::shared_ptr<internal::LogCompressorSink> sink = getSink();
if (sink) {
return sink->getContent(time, flush);
return sink->getContent(time);
}
return {};
}
Expand Down
10 changes: 4 additions & 6 deletions libminifi/include/core/logging/internal/LogCompressorSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,15 @@ class LogCompressorSink : public spdlog::sinks::base_sink<std::mutex> {
~LogCompressorSink() override;

template<class Rep, class Period>
std::vector<std::unique_ptr<io::InputStream>> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
if (flush) {
cached_logs_.commit();
compress(true);
}
std::vector<std::unique_ptr<io::InputStream>> getContent(const std::chrono::duration<Rep, Period>& time) {
cached_logs_.commit();
compress(true);

std::vector<std::unique_ptr<io::InputStream>> log_segments;
const auto segment_count = compressed_logs_.itemCount();
for (size_t i = 0; i < segment_count; ++i) {
LogBuffer compressed;
if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
if (!compressed_logs_.tryDequeue(compressed, time)) {
break;
}
log_segments.push_back(std::move(compressed.buffer_));
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/FlowController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ std::vector<BackTrace> FlowController::getTraces() {

std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() {
std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
auto logs = core::logging::LoggerConfiguration::getCompressedLogs(true);
auto logs = core::logging::LoggerConfiguration::getCompressedLogs();
for (size_t i = 0; i < logs.size(); ++i) {
std::string index_str = i == logs.size() - 1 ? "" : "." + std::to_string(logs.size() - 1 - i);
debug_info["minifi.log" + index_str + ".gz"] = std::move(logs[i]);
Expand Down
14 changes: 7 additions & 7 deletions libminifi/test/unit/LoggerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ TEST_CASE("Test Compression", "[ttl9]") {
log_config.initialize(properties);
auto logger = log_config.getLogger(className);
logger->log_error("Hi there");
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi there") != std::string::npos);
Expand Down Expand Up @@ -280,7 +280,7 @@ TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "
log_config.initialize(properties);
auto logger = log_config.getLogger("DisableCompressionTestLogger");
logger->log_error("Hi there");
REQUIRE(logging::LoggerConfiguration::getCompressedLogs(true).empty() == is_nullptr);
REQUIRE(logging::LoggerConfiguration::getCompressedLogs().empty() == is_nullptr);
}

TEST_CASE("Setting max log entry length property trims long log entries", "[ttl12]") {
Expand All @@ -292,7 +292,7 @@ TEST_CASE("Setting max log entry length property trims long log entries", "[ttl1
auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
logger->log_error("Hi there");

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi ") == std::string::npos);
Expand All @@ -308,7 +308,7 @@ TEST_CASE("Setting max log entry length property trims long formatted log entrie
auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
logger->log_error("Hi there %s", "John");

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi ") == std::string::npos);
Expand All @@ -326,7 +326,7 @@ TEST_CASE("Setting max log entry length to a size larger than the internal buffe
std::string expected_log(1500, 'a');
logger->log_error(log.c_str());

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find(log) == std::string::npos);
Expand All @@ -348,7 +348,7 @@ TEST_CASE("Setting max log entry length to unlimited results in unlimited log en
std::string log(5000, 'a');
logger->log_error(log.c_str());

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find(log) != std::string::npos);
Expand Down Expand Up @@ -380,7 +380,7 @@ TEST_CASE("Test sending multiple segments at once", "[ttl16]") {

LoggerTestAccessor::runCompression(log_config);

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == SEGMENT_COUNT);
auto logs = decompress(compressed_logs[SEGMENT_COUNT - 1]);
REQUIRE(logs.find(log_str) != std::string::npos);
Expand Down

0 comments on commit 84abc4d

Please sign in to comment.