Skip to content

Commit

Permalink
MINIFICPP-2174 Send all cached compressed log files through C2
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Sep 19, 2023
1 parent 7c64647 commit b004732
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 31 deletions.
8 changes: 4 additions & 4 deletions libminifi/include/core/logging/LoggerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ class LoggerConfiguration {
*/
void initialize(const std::shared_ptr<LoggerProperties> &logger_properties);

static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) {
return getCompressedLog(std::chrono::milliseconds{0}, flush);
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(bool flush = false) {
return getCompressedLogs(std::chrono::milliseconds{0}, flush);
}

void initializeAlertSinks(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id);

template<class Rep, class Period>
static std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
return getConfiguration().compression_manager_.getCompressedLog(time, flush);
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
return getConfiguration().compression_manager_.getCompressedLogs(time, flush);
}

/**
Expand Down
5 changes: 3 additions & 2 deletions libminifi/include/core/logging/internal/CompressionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <functional>
#include <utility>
#include <string>
#include <vector>

#include "core/logging/Logger.h"
#include "LogCompressorSink.h"
Expand All @@ -50,12 +51,12 @@ class CompressionManager {
std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory);

template<class Rep, class Period>
std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
std::shared_ptr<internal::LogCompressorSink> sink = getSink();
if (sink) {
return sink->getContent(time, flush);
}
return nullptr;
return {};
}

static constexpr const char* compression_cached_log_max_size_ = "compression.cached.log.max.size";
Expand Down
29 changes: 24 additions & 5 deletions libminifi/include/core/logging/internal/LogCompressorSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <atomic>
#include <utility>
#include <vector>

#include "spdlog/common.h"
#include "spdlog/details/log_msg.h"
Expand Down Expand Up @@ -57,26 +58,44 @@ class LogCompressorSink : public spdlog::sinks::base_sink<std::mutex> {
~LogCompressorSink() override;

template<class Rep, class Period>
std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
std::vector<std::unique_ptr<io::InputStream>> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
if (flush) {
cached_logs_.commit();
compress(true);
}
LogBuffer compressed;
if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
return createEmptyArchive();

std::vector<std::unique_ptr<io::InputStream>> log_segments;
const auto segment_count = compressed_logs_.itemCount();
for (size_t i = 0; i < segment_count; ++i) {
LogBuffer compressed;
if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
break;
}
log_segments.push_back(std::move(compressed.buffer_));
}

if (log_segments.empty()) {
log_segments.push_back(createEmptyArchive());
}
return std::move(compressed.buffer_);
return log_segments;
}

size_t getMaxCacheSize() const {
return cached_logs_.getMaxSize();
}

size_t getMaxCacheSegmenSize() const {
return cached_logs_.getMaxItemSize();
}

size_t getMaxCompressedSize() const {
return compressed_logs_.getMaxSize();
}

size_t getMaxCompressedSegmentSize() const {
return compressed_logs_.getMaxItemSize();
}

private:
enum class CompressionResult {
Success,
Expand Down
8 changes: 8 additions & 0 deletions libminifi/include/utils/StagingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class StagingQueue {
return max_size_;
}

size_t getMaxItemSize() const {
return max_item_size_;
}

void discardOverflow() {
while (total_size_ > max_size_) {
Item item;
Expand All @@ -157,6 +161,10 @@ class StagingQueue {
return total_size_;
}

size_t itemCount() const {
return queue_.size();
}

private:
void commit(std::unique_lock<std::mutex>& /*lock*/) {
queue_.enqueue(active_item_.commit());
Expand Down
6 changes: 4 additions & 2 deletions libminifi/src/FlowController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,10 @@ std::vector<BackTrace> FlowController::getTraces() {

std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() {
std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) {
debug_info["minifi.log.gz"] = std::move(logs);
auto logs = core::logging::LoggerConfiguration::getCompressedLogs(true);
for (size_t i = 0; i < logs.size(); ++i) {
std::string index_str = i == logs.size() - 1 ? "" : "." + std::to_string(logs.size() - 1 - i);
debug_info["minifi.log" + index_str + ".gz"] = std::move(logs[i]);
}
if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) {
debug_info["config.yml"] = std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false);
Expand Down
3 changes: 2 additions & 1 deletion libminifi/src/core/logging/internal/CompressionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ std::shared_ptr<LogCompressorSink> CompressionManager::initialize(
return sink_;
}
// do not create new sink if all relevant parameters match
if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size) {
if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size ||
sink_->getMaxCacheSegmenSize() != cache_segment_size || sink_->getMaxCompressedSegmentSize() != compressed_segment_size) {
sink_ = std::make_shared<internal::LogCompressorSink>(
LogQueueSize{cached_log_max_size, cache_segment_size},
LogQueueSize{compressed_log_max_size, compressed_segment_size},
Expand Down
70 changes: 53 additions & 17 deletions libminifi/test/unit/LoggerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <vector>
#include <ctime>
#include <random>
#include "../TestBase.h"
#include "../Catch.h"
#include "core/logging/LoggerConfiguration.h"
Expand Down Expand Up @@ -189,7 +190,7 @@ TEST_CASE("Test ShortenNames", "[ttl8]") {

using namespace minifi::io;

std::string decompress(const std::shared_ptr<InputStream>& input) {
std::string decompress(const std::unique_ptr<InputStream>& input) {
auto output = std::make_unique<BufferStream>();
auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
minifi::internal::pipe(*input, *decompressor);
Expand All @@ -214,9 +215,9 @@ TEST_CASE("Test Compression", "[ttl9]") {
log_config.initialize(properties);
auto logger = log_config.getLogger(className);
logger->log_error("Hi there");
std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi there") != std::string::npos);
}

Expand All @@ -234,6 +235,9 @@ class LoggerTestAccessor {
static size_t getCompressedSize(logging::LoggerConfiguration& log_config) {
return log_config.compression_manager_.getSink()->compressed_logs_.size();
}
static void runCompression(logging::LoggerConfiguration& log_config) {
while (logging::internal::LogCompressorSink::CompressionResult::Success == log_config.compression_manager_.getSink()->compress()){}
}
};

TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl10]") {
Expand Down Expand Up @@ -276,7 +280,7 @@ TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "
log_config.initialize(properties);
auto logger = log_config.getLogger("DisableCompressionTestLogger");
logger->log_error("Hi there");
REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == is_nullptr);
REQUIRE(logging::LoggerConfiguration::getCompressedLogs(true).empty() == is_nullptr);
}

TEST_CASE("Setting max log entry length property trims long log entries", "[ttl12]") {
Expand All @@ -288,9 +292,9 @@ TEST_CASE("Setting max log entry length property trims long log entries", "[ttl1
auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
logger->log_error("Hi there");

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi ") == std::string::npos);
REQUIRE(logs.find("Hi") != std::string::npos);
}
Expand All @@ -304,9 +308,9 @@ TEST_CASE("Setting max log entry length property trims long formatted log entrie
auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
logger->log_error("Hi there %s", "John");

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi ") == std::string::npos);
REQUIRE(logs.find("Hi") != std::string::npos);
}
Expand All @@ -322,9 +326,9 @@ TEST_CASE("Setting max log entry length to a size larger than the internal buffe
std::string expected_log(1500, 'a');
logger->log_error(log.c_str());

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find(log) == std::string::npos);
REQUIRE(logs.find(expected_log) != std::string::npos);
}
Expand All @@ -344,8 +348,40 @@ TEST_CASE("Setting max log entry length to unlimited results in unlimited log en
std::string log(5000, 'a');
logger->log_error(log.c_str());

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find(log) != std::string::npos);
}

TEST_CASE("Test sending multiple segments at once", "[ttl16]") {
auto& log_config = logging::LoggerConfiguration::getConfiguration();
LoggerTestAccessor::setCompressionCompressedSegmentSize(log_config, 100);
LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 100);
auto properties = std::make_shared<logging::LoggerProperties>();
// by default the root logger is OFF
properties->set("logger.root", "INFO");
log_config.initialize(properties);
auto logger = log_config.getLogger("CompressionTestMultiSegment");

std::random_device rd;
std::mt19937 eng(rd());
constexpr const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
const int index_of_last_char = gsl::narrow<int>(strlen(TEXT_CHARS)) - 1;
std::uniform_int_distribution<> distr(0, index_of_last_char);
std::vector<char> data(100);
std::string log_str;
const size_t SEGMENT_COUNT = 5;
for (size_t idx = 0; idx < SEGMENT_COUNT; ++idx) {
std::generate_n(data.begin(), data.size(), [&] { return TEXT_CHARS[static_cast<uint8_t>(distr(eng))]; });
log_str = std::string{data.begin(), data.end()} + "." + std::to_string(idx);
logger->log_error(log_str.c_str());
}

LoggerTestAccessor::runCompression(log_config);

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs(true);
REQUIRE(compressed_logs.size() == SEGMENT_COUNT);
auto logs = decompress(compressed_logs[SEGMENT_COUNT - 1]);
REQUIRE(logs.find(log_str) != std::string::npos);
}

0 comments on commit b004732

Please sign in to comment.