Skip to content

Commit

Permalink
allow several <graphite> targets (ClickHouse#603)
Browse files Browse the repository at this point in the history
* allow several <graphite> targets

* fix

* fix

* Adjustable parts

* changelog version

* fix

* changelog

* Style fixes

* attachSystemTables

* config describe

* fixes

* fixes
  • Loading branch information
proller authored and alexey-milovidov committed Mar 21, 2017
1 parent daefb87 commit 670e98f
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 96 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@

## [1.1.54189](https://github.com/yandex/Clickhouse/tree/v1.1.54189-testing) (2017-03-17)
[Full Changelog](https://github.com/yandex/Clickhouse/compare/v1.1.54188-stable...v1.1.54189-testing)

- Config: Allow define several graphite blocks, graphite.interval=60 option added. use_graphite option deleted.


## [1.1.54181](https://github.com/yandex/Clickhouse/tree/v1.1.54181-testing) (2017-03-10)
[Full Changelog](https://github.com/yandex/Clickhouse/compare/v1.1.54165-stable...v1.1.54181-testing)

Expand Down
16 changes: 16 additions & 0 deletions dbms/include/DB/Common/getMultipleKeysFromConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once
#include <string>
#include <vector>

namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
/// get all internal key names for given key
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
}
13 changes: 13 additions & 0 deletions dbms/include/DB/Storages/System/attachSystemTables.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include <DB/Databases/IDatabase.h>

namespace DB
{
class Context;
class AsynchronousMetrics;

void attachSystemTablesServer(DatabasePtr system_database, Context * global_context, bool has_zookeeper);
void attachSystemTablesLocal(DatabasePtr system_database);
void attachSystemTablesAsync(DatabasePtr system_database, AsynchronousMetrics & async_metrics);
}
13 changes: 0 additions & 13 deletions dbms/include/DB/Storages/System/attach_system_tables.h

This file was deleted.

21 changes: 21 additions & 0 deletions dbms/src/Common/getMultipleKeysFromConfig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include <DB/Common/getMultipleKeysFromConfig.h>

#include <Poco/Util/AbstractConfiguration.h>
#include <DB/Common/StringUtils.h>

namespace DB
{
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
{
std::vector<std::string> values;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(root, config_keys);
for (const auto & key : config_keys)
{
if (key != name && !(startsWith(key.data(), name + "[") && endsWith(key.data(), "]")))
continue;
values.emplace_back(key);
}
return values;
}
}
4 changes: 2 additions & 2 deletions dbms/src/Server/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <Poco/Util/OptionCallback.h>
#include <Poco/String.h>
#include <DB/Databases/DatabaseOrdinary.h>
#include <DB/Storages/System/attach_system_tables.h>
#include <DB/Storages/System/attachSystemTables.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
Expand Down Expand Up @@ -379,7 +379,7 @@ void LocalServer::attachSystemTables()
context->addDatabase("system", system_database);
}

attach_system_tables_local(system_database);
attachSystemTablesLocal(system_database);
}


Expand Down
67 changes: 42 additions & 25 deletions dbms/src/Server/MetricsTransmitter.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#include "MetricsTransmitter.h"

#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <daemon/BaseDaemon.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Interpreters/AsynchronousMetrics.h>


namespace DB
{

MetricsTransmitter::~MetricsTransmitter()
{
try
Expand All @@ -32,13 +32,19 @@ MetricsTransmitter::~MetricsTransmitter()

void MetricsTransmitter::run()
{
setThreadName("MetricsTransmit");

/// Next minute at 00 seconds. To avoid time drift and transmit values exactly each minute.
const auto get_next_minute = []
{
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1));
auto & config = Poco::Util::Application::instance().config();
auto interval = config.getInt(config_name + ".interval", 60);

const std::string thread_name = "MericsTrns " + std::to_string(interval) + "s";
setThreadName(thread_name.c_str());

const auto get_next_time = [](size_t seconds) {
/// To avoid time drift and transmit values exactly each interval:
/// next time aligned to system seconds
/// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
return std::chrono::system_clock::time_point(
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
+ std::chrono::seconds(seconds));
};

std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());
Expand All @@ -47,7 +53,7 @@ void MetricsTransmitter::run()

while (true)
{
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
if (cond.wait_until(lock, get_next_time(interval), [this] { return quit; }))
break;

transmit(prev_counters);
Expand All @@ -57,35 +63,46 @@ void MetricsTransmitter::run()

void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters)
{
auto & config = Poco::Util::Application::instance().config();
auto async_metrics_values = async_metrics.getValues();

GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());

for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)

if (config.getBool(config_name + ".events", true))
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;

std::string key {ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
std::string key{ProfileEvents::getDescription(static_cast<ProfileEvents::Event>(i))};
key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
}
}

for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
if (config.getBool(config_name + ".metrics", true))
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);

std::string key {CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(current_metrics_path_prefix + key, value);
std::string key{CurrentMetrics::getDescription(static_cast<CurrentMetrics::Metric>(i))};
key_vals.emplace_back(current_metrics_path_prefix + key, value);
}
}

for (const auto & name_value : async_metrics_values)
if (config.getBool(config_name + ".asynchronous_metrics", true))
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
for (const auto & name_value : async_metrics_values)
{
key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
}
}

BaseDaemon::instance().writeToGraphite(key_vals);
if (key_vals.size())
BaseDaemon::instance().writeToGraphite(key_vals, config_name);
}

}
18 changes: 10 additions & 8 deletions dbms/src/Server/MetricsTransmitter.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
#pragma once

#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>

#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <DB/Common/ProfileEvents.h>


namespace DB
{

class AsynchronousMetrics;

/** Automatically sends
Expand All @@ -22,23 +21,26 @@ class AsynchronousMetrics;
class MetricsTransmitter
{
public:
MetricsTransmitter(const AsynchronousMetrics & async_metrics_) : async_metrics(async_metrics_) {}
MetricsTransmitter(const AsynchronousMetrics & async_metrics, const std::string & config_name)
: async_metrics{async_metrics}, config_name{config_name}
{
}
~MetricsTransmitter();

private:
void run();
void transmit(std::vector<ProfileEvents::Count> & prev_counters);

const AsynchronousMetrics & async_metrics;
const std::string config_name;

bool quit = false;
std::mutex mutex;
std::condition_variable cond;
std::thread thread {&MetricsTransmitter::run, this};
std::thread thread{&MetricsTransmitter::run, this};

static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";
static constexpr auto asynchronous_metrics_path_prefix = "ClickHouse.AsynchronousMetrics.";
};

}
45 changes: 24 additions & 21 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@
#include <common/ErrorHandlers.h>
#include <ext/scope_guard.hpp>
#include <zkutil/ZooKeeper.h>
#include <zkutil/ZooKeeperNodeCache.h>
#include <DB/Common/Macros.h>
#include <DB/Common/StringUtils.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/getMultipleKeysFromConfig.h>
#include <DB/Databases/DatabaseOrdinary.h>
#include <DB/IO/HTTPCommon.h>
#include <DB/Interpreters/AsynchronousMetrics.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Storages/MergeTree/ReshardingWorker.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/System/attach_system_tables.h>
#include <DB/Storages/System/attachSystemTables.h>
#include "ConfigReloader.h"
#include "HTTPHandler.h"
#include "InterserverIOHTTPHandler.h"
Expand All @@ -32,6 +34,7 @@
#include "StatusFile.h"
#include "TCPHandler.h"


namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -221,7 +224,7 @@ int Server::main(const std::vector<std::string> & args)
{
auto old_configuration = loaded_config.configuration;
loaded_config = ConfigProcessor().loadConfigWithZooKeeperIncludes(
config_path, main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
config_path, main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
config().removeConfiguration(old_configuration.get());
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}
Expand Down Expand Up @@ -321,11 +324,11 @@ int Server::main(const std::vector<std::string> & args)

/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
auto main_config_reloader = std::make_unique<ConfigReloader>(
config_path, include_from_path,
std::move(main_config_zk_node_cache),
[&](ConfigurationPtr config) { global_context->setClustersConfig(config); },
/* already_loaded = */ true);
auto main_config_reloader = std::make_unique<ConfigReloader>(config_path,
include_from_path,
std::move(main_config_zk_node_cache),
[&](ConfigurationPtr config) { global_context->setClustersConfig(config); },
/* already_loaded = */ true);

/// Initialize users config reloader.
std::string users_config_path = config().getString("users_config", config_path);
Expand All @@ -337,11 +340,11 @@ int Server::main(const std::vector<std::string> & args)
if (Poco::File(config_dir + users_config_path).exists())
users_config_path = config_dir + users_config_path;
}
auto users_config_reloader = std::make_unique<ConfigReloader>(
users_config_path, include_from_path,
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);
auto users_config_reloader = std::make_unique<ConfigReloader>(users_config_path,
include_from_path,
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);

/// Limit on total number of coucurrently executed queries.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
Expand Down Expand Up @@ -385,7 +388,7 @@ int Server::main(const std::vector<std::string> & args)

DatabasePtr system_database = global_context->getDatabase("system");

attach_system_tables_server(system_database, global_context.get(), has_zookeeper);
attachSystemTablesServer(system_database, global_context.get(), has_zookeeper);

bool has_resharding_worker = false;
if (has_zookeeper && config().has("resharding"))
Expand Down Expand Up @@ -425,12 +428,8 @@ int Server::main(const std::vector<std::string> & args)
std::vector<std::unique_ptr<Poco::Net::TCPServer>> servers;

std::vector<std::string> listen_hosts;
Poco::Util::AbstractConfiguration::Keys config_keys;
config().keys("", config_keys);
for (const auto & key : config_keys)
for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "listen_host"))
{
if (!startsWith(key.data(), "listen_host"))
continue;
listen_hosts.emplace_back(config().getString(key));
}

Expand Down Expand Up @@ -585,10 +584,14 @@ int Server::main(const std::vector<std::string> & args)
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(*global_context);

attach_system_tables_async(system_database, async_metrics);
attachSystemTablesAsync(system_database, async_metrics);

std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(async_metrics, graphite_key));
}

const auto metrics_transmitter
= config().getBool("use_graphite", true) ? std::make_unique<MetricsTransmitter>(async_metrics) : nullptr;

waitForTerminationRequest();
}
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Server/config.d/no_graphite.xml

This file was deleted.

Loading

0 comments on commit 670e98f

Please sign in to comment.