Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2174 Send all cached compressed log files through C2 #1631

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions libminifi/include/core/logging/LoggerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ class LoggerConfiguration {
*/
void initialize(const std::shared_ptr<LoggerProperties> &logger_properties);

static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) {
return getCompressedLog(std::chrono::milliseconds{0}, flush);
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs() {
return getCompressedLogs(std::chrono::milliseconds{0});
}

void initializeAlertSinks(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<AgentIdentificationProvider>& agent_id);

template<class Rep, class Period>
static std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
return getConfiguration().compression_manager_.getCompressedLog(time, flush);
static std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time) {
return getConfiguration().compression_manager_.getCompressedLogs(time);
}

/**
Expand Down
7 changes: 4 additions & 3 deletions libminifi/include/core/logging/internal/CompressionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <functional>
#include <utility>
#include <string>
#include <vector>

#include "core/logging/Logger.h"
#include "LogCompressorSink.h"
Expand All @@ -50,12 +51,12 @@ class CompressionManager {
std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory);

template<class Rep, class Period>
std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
std::vector<std::unique_ptr<io::InputStream>> getCompressedLogs(const std::chrono::duration<Rep, Period>& time) {
std::shared_ptr<internal::LogCompressorSink> sink = getSink();
if (sink) {
return sink->getContent(time, flush);
return sink->getContent(time);
}
return nullptr;
return {};
}

static constexpr const char* compression_cached_log_max_size_ = "compression.cached.log.max.size";
Expand Down
33 changes: 25 additions & 8 deletions libminifi/include/core/logging/internal/LogCompressorSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <atomic>
#include <utility>
#include <vector>

#include "spdlog/common.h"
#include "spdlog/details/log_msg.h"
Expand Down Expand Up @@ -57,26 +58,42 @@ class LogCompressorSink : public spdlog::sinks::base_sink<std::mutex> {
~LogCompressorSink() override;

template<class Rep, class Period>
std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
if (flush) {
cached_logs_.commit();
compress(true);
std::vector<std::unique_ptr<io::InputStream>> getContent(const std::chrono::duration<Rep, Period>& time) {
cached_logs_.commit();
compress(true);

std::vector<std::unique_ptr<io::InputStream>> log_segments;
const auto segment_count = compressed_logs_.itemCount();
for (size_t i = 0; i < segment_count; ++i) {
LogBuffer compressed;
if (!compressed_logs_.tryDequeue(compressed, time)) {
break;
}
log_segments.push_back(std::move(compressed.buffer_));
}
LogBuffer compressed;
if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
return createEmptyArchive();

if (log_segments.empty()) {
log_segments.push_back(createEmptyArchive());
}
return std::move(compressed.buffer_);
return log_segments;
}

size_t getMaxCacheSize() const {
return cached_logs_.getMaxSize();
}

size_t getMaxCacheSegmentSize() const {
return cached_logs_.getMaxItemSize();
}

size_t getMaxCompressedSize() const {
return compressed_logs_.getMaxSize();
}

size_t getMaxCompressedSegmentSize() const {
return compressed_logs_.getMaxItemSize();
}

private:
enum class CompressionResult {
Success,
Expand Down
8 changes: 8 additions & 0 deletions libminifi/include/utils/StagingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class StagingQueue {
return max_size_;
}

size_t getMaxItemSize() const {
return max_item_size_;
}

void discardOverflow() {
while (total_size_ > max_size_) {
Item item;
Expand All @@ -157,6 +161,10 @@ class StagingQueue {
return total_size_;
}

size_t itemCount() const {
return queue_.size();
}

private:
void commit(std::unique_lock<std::mutex>& /*lock*/) {
queue_.enqueue(active_item_.commit());
Expand Down
6 changes: 4 additions & 2 deletions libminifi/src/FlowController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,10 @@ std::vector<BackTrace> FlowController::getTraces() {

std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() {
std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) {
debug_info["minifi.log.gz"] = std::move(logs);
auto logs = core::logging::LoggerConfiguration::getCompressedLogs();
for (size_t i = 0; i < logs.size(); ++i) {
std::string index_str = i == logs.size() - 1 ? "" : "." + std::to_string(logs.size() - 1 - i);
debug_info["minifi.log" + index_str + ".gz"] = std::move(logs[i]);
szaszm marked this conversation as resolved.
Show resolved Hide resolved
}
if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) {
debug_info["config.yml"] = std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false);
Expand Down
3 changes: 2 additions & 1 deletion libminifi/src/core/logging/internal/CompressionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ std::shared_ptr<LogCompressorSink> CompressionManager::initialize(
return sink_;
}
// do not create new sink if all relevant parameters match
if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size) {
if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size ||
sink_->getMaxCacheSegmentSize() != cache_segment_size || sink_->getMaxCompressedSegmentSize() != compressed_segment_size) {
sink_ = std::make_shared<internal::LogCompressorSink>(
LogQueueSize{cached_log_max_size, cache_segment_size},
LogQueueSize{compressed_log_max_size, compressed_segment_size},
Expand Down
78 changes: 57 additions & 21 deletions libminifi/test/unit/LoggerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <vector>
#include <ctime>
#include <random>
#include "../TestBase.h"
#include "../Catch.h"
#include "core/logging/LoggerConfiguration.h"
Expand Down Expand Up @@ -189,7 +190,7 @@ TEST_CASE("Test ShortenNames", "[ttl8]") {

using namespace minifi::io;

std::string decompress(const std::shared_ptr<InputStream>& input) {
std::string decompress(const std::unique_ptr<InputStream>& input) {
auto output = std::make_unique<BufferStream>();
auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
minifi::internal::pipe(*input, *decompressor);
Expand All @@ -214,9 +215,9 @@ TEST_CASE("Test Compression", "[ttl9]") {
log_config.initialize(properties);
auto logger = log_config.getLogger(className);
logger->log_error("Hi there");
std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi there") != std::string::npos);
}

Expand All @@ -234,6 +235,9 @@ class LoggerTestAccessor {
static size_t getCompressedSize(logging::LoggerConfiguration& log_config) {
return log_config.compression_manager_.getSink()->compressed_logs_.size();
}
static void runCompression(logging::LoggerConfiguration& log_config) {
while (logging::internal::LogCompressorSink::CompressionResult::Success == log_config.compression_manager_.getSink()->compress()){}
}
};

TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl10]") {
Expand All @@ -258,25 +262,25 @@ TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl10
TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "[ttl11]") {
auto& log_config = logging::LoggerConfiguration::getConfiguration();
auto properties = std::make_shared<logging::LoggerProperties>();
bool is_nullptr = false;
bool is_empty = false;
SECTION("Cached log size is set to 0") {
is_nullptr = true;
is_empty = true;
properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "0");
}
SECTION("Compressed log size is set to 0") {
is_nullptr = true;
is_empty = true;
properties->set(logging::internal::CompressionManager::compression_compressed_log_max_size_, "0");
}
SECTION("Sanity check") {
is_nullptr = false;
is_empty = false;
// pass
}
// by default the root logger is OFF
properties->set("logger.root", "INFO");
log_config.initialize(properties);
auto logger = log_config.getLogger("DisableCompressionTestLogger");
logger->log_error("Hi there");
REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == is_nullptr);
REQUIRE(logging::LoggerConfiguration::getCompressedLogs().empty() == is_empty);
}

TEST_CASE("Setting max log entry length property trims long log entries", "[ttl12]") {
Expand All @@ -288,9 +292,9 @@ TEST_CASE("Setting max log entry length property trims long log entries", "[ttl1
auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
logger->log_error("Hi there");

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi ") == std::string::npos);
REQUIRE(logs.find("Hi") != std::string::npos);
}
Expand All @@ -304,9 +308,9 @@ TEST_CASE("Setting max log entry length property trims long formatted log entrie
auto logger = log_config.getLogger("SetMaxLogEntryLengthTestLogger");
logger->log_error("Hi there %s", "John");

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find("Hi ") == std::string::npos);
REQUIRE(logs.find("Hi") != std::string::npos);
}
Expand All @@ -322,9 +326,9 @@ TEST_CASE("Setting max log entry length to a size larger than the internal buffe
std::string expected_log(1500, 'a');
logger->log_error(log.c_str());

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find(log) == std::string::npos);
REQUIRE(logs.find(expected_log) != std::string::npos);
}
Expand All @@ -344,8 +348,40 @@ TEST_CASE("Setting max log entry length to unlimited results in unlimited log en
std::string log(5000, 'a');
logger->log_error(log.c_str());

std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == 1);
auto logs = decompress(compressed_logs[0]);
REQUIRE(logs.find(log) != std::string::npos);
}

TEST_CASE("Test sending multiple segments at once", "[ttl16]") {
auto& log_config = logging::LoggerConfiguration::getConfiguration();
LoggerTestAccessor::setCompressionCompressedSegmentSize(log_config, 100);
LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 100);
auto properties = std::make_shared<logging::LoggerProperties>();
// by default the root logger is OFF
properties->set("logger.root", "INFO");
log_config.initialize(properties);
auto logger = log_config.getLogger("CompressionTestMultiSegment");

std::random_device rd;
std::mt19937 eng(rd());
constexpr const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
const int index_of_last_char = gsl::narrow<int>(strlen(TEXT_CHARS)) - 1;
std::uniform_int_distribution<> distr(0, index_of_last_char);
std::vector<char> data(100);
std::string log_str;
const size_t SEGMENT_COUNT = 5;
for (size_t idx = 0; idx < SEGMENT_COUNT; ++idx) {
std::generate_n(data.begin(), data.size(), [&] { return TEXT_CHARS[static_cast<uint8_t>(distr(eng))]; });
log_str = std::string{data.begin(), data.end()} + "." + std::to_string(idx);
logger->log_error(log_str.c_str());
}

LoggerTestAccessor::runCompression(log_config);

auto compressed_logs = logging::LoggerConfiguration::getCompressedLogs();
REQUIRE(compressed_logs.size() == SEGMENT_COUNT);
auto logs = decompress(compressed_logs[SEGMENT_COUNT - 1]);
REQUIRE(logs.find(log_str) != std::string::npos);
}
Loading