Skip to content

Commit

Permalink
Merge pull request #196 from golos-blockchain/livetest
Browse files Browse the repository at this point in the history
ElasticSearch - add post/comment versions
  • Loading branch information
Lex-Ai authored May 13, 2022
2 parents 9b910f0 + a6ac212 commit 37331d2
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 40 deletions.
3 changes: 2 additions & 1 deletion libraries/api/include/golos/api/comment_api_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace golos { namespace api {
fc::optional<time_point_sec> last_update;
time_point_sec created;
fc::optional<time_point_sec> active;
fc::optional<uint16_t> num_changes;
time_point_sec last_payout;
comment_object::id_type last_reply_id; // not reflected

Expand Down Expand Up @@ -93,7 +94,7 @@ namespace golos { namespace api {
FC_REFLECT(
(golos::api::comment_api_object),
(id)(author)(permlink)(parent_author)(parent_permlink)(category)(title)(body)(json_metadata)(last_update)
(created)(active)(last_payout)(depth)(children)(children_rshares2)(net_rshares)(abs_rshares)
(created)(active)(num_changes)(last_payout)(depth)(children)(children_rshares2)(net_rshares)(abs_rshares)
(vote_rshares)(children_abs_rshares)(cashout_time)(max_cashout_time)(total_vote_weight)
(reward_weight)(donates)(donates_uia)(total_payout_value)(beneficiary_payout_value)(beneficiary_gests_payout_value)(curator_payout_value)(curator_gests_payout_value)
(author_rewards)(author_payout_in_golos)(author_gbg_payout_value)(author_golos_payout_value)(author_gests_payout_value)(net_votes)
Expand Down
15 changes: 8 additions & 7 deletions libraries/api/include/golos/api/discussion.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ namespace golos { namespace api {
} } // golos::api

FC_REFLECT_DERIVED( (golos::api::discussion), ((golos::api::comment_api_object)),
(url)(pending_author_payout_value)(pending_author_payout_in_golos)
(pending_author_payout_gbg_value)(pending_author_payout_gests_value)(pending_author_payout_golos_value)
(pending_benefactor_payout_value)(pending_benefactor_payout_gests_value)
(pending_curator_payout_value)(pending_curator_payout_gests_value)
(pending_payout_value)(total_pending_payout_value)(active_votes)(active_votes_count)(replies)
(author_reputation)(promoted)(body_length)(reblogged_by)(first_reblogged_by)(first_reblogged_on)
(reblog_author)(reblog_title)(reblog_body)(reblog_json_metadata)(reblog_entries)(last_reply))
(url)(pending_author_payout_value)(pending_author_payout_in_golos)
(pending_author_payout_gbg_value)(pending_author_payout_gests_value)(pending_author_payout_golos_value)
(pending_benefactor_payout_value)(pending_benefactor_payout_gests_value)
(pending_curator_payout_value)(pending_curator_payout_gests_value)
(pending_payout_value)(total_pending_payout_value)(active_votes)(active_votes_count)(replies)
(author_reputation)(promoted)(body_length)(reblogged_by)(first_reblogged_by)(first_reblogged_on)
(reblog_author)(reblog_title)(reblog_body)(reblog_json_metadata)(reblog_entries)(last_reply)
)
22 changes: 19 additions & 3 deletions plugins/elastic_search/elastic_search_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ using golos::protocol::signed_block;

class elastic_search_plugin::elastic_search_plugin_impl final {
public:
elastic_search_plugin_impl(const std::string& url, const std::string& login, const std::string& password)
elastic_search_plugin_impl(const std::string& url, const std::string& login, const std::string& password,
uint16_t versions_depth, fc::time_point_sec skip_comments_before)
: _db(appbase::app().get_plugin<golos::plugins::chain::plugin>().db()),
writer(url, login, password, _db) {
writer(url, login, password, _db, versions_depth, skip_comments_before) {
}

~elastic_search_plugin_impl() {
Expand Down Expand Up @@ -47,6 +48,12 @@ void elastic_search_plugin::set_program_options(bpo::options_description& cli, b
) (
"elastic-search-password", bpo::value<string>(),
"Elastic Search Password"
) (
"elastic-search-versions-depth", bpo::value<uint16_t>()->default_value(10),
"Count of post/comment versions stored"
) (
"elastic-search-skip-comments-before", bpo::value<string>()->default_value("2019-01-01T00:00:00"),
"Do not track version history (excl. first version) for comments/posts before this date. Saves disk space"
);
}

Expand All @@ -59,8 +66,17 @@ void elastic_search_plugin::plugin_initialize(const bpo::variables_map &options)

auto login = options.at("elastic-search-login").as<std::string>();
auto password = options.at("elastic-search-password").as<std::string>();
auto versions_depth = options.at("elastic-search-versions-depth").as<uint16_t>();
auto skip_comments_before = options.at("elastic-search-skip-comments-before").as<std::string>();

time_point_sec skb;
try {
skb = time_point_sec::from_iso_string(skip_comments_before);
} catch (...) {
elog("Cannot parse elastic-search-skip-comments-before - use proper format, example: 2019-01-01T00:00:00");
}

my = std::make_unique<elastic_search_plugin::elastic_search_plugin_impl>(uri_str, login, password);
my = std::make_unique<elastic_search_plugin::elastic_search_plugin_impl>(uri_str, login, password, versions_depth, skb);

my->_db.post_apply_operation.connect([&](const operation_notification& note) {
my->on_operation(note);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ namespace golos { namespace plugins { namespace elastic_search {
#define TAG_MAX_LENGTH 512

using boost::locale::conv::utf_to_utf;
using golos::plugins::social_network::comment_last_update_index;
using golos::plugins::social_network::by_comment;

#ifdef STEEMIT_BUILD_TESTNET
#define ELASTIC_WRITE_EACH_N_OP 3
#else
#define ELASTIC_WRITE_EACH_N_OP 100
#endif

using es_buffer_type = std::map<std::string, fc::mutable_variant_object>;

class elastic_search_state_writer {
public:
using result_type = void;
Expand All @@ -32,12 +36,17 @@ class elastic_search_state_writer {
std::string login;
std::string password;
database& _db;
uint16_t versions_depth;
fc::time_point_sec skip_comments_before;
fc::http::connection conn;
std::map<std::string, fc::mutable_variant_object> buffer;
es_buffer_type buffer;
es_buffer_type buffer_versions;
int op_num = 0;

elastic_search_state_writer(const std::string& url, const std::string& login, const std::string& password, database& db)
: url(url), login(login), password(password), _db(db) {
elastic_search_state_writer(const std::string& url, const std::string& login, const std::string& password, database& db,
uint16_t versions_depth, fc::time_point_sec skip_comments_before)
: url(url), login(login), password(password), _db(db),
versions_depth(versions_depth), skip_comments_before(skip_comments_before) {
auto fc_url = fc::url(url);
auto host_port = *fc_url.host() + (fc_url.port() ? ":" + std::to_string(*fc_url.port()) : "");
auto ep = fc::ip::endpoint::from_string(host_port);
Expand All @@ -49,6 +58,10 @@ class elastic_search_state_writer {

}

std::string make_id(std::string author, const std::string& permlink) const {
return author + "." + permlink;
}

std::wstring utf8_to_wstring(const std::string& str) const {
return utf_to_utf<wchar_t>(str.c_str(), str.c_str() + str.size());
}
Expand Down Expand Up @@ -102,40 +115,110 @@ class elastic_search_state_writer {
return false;
}

void save_version(const comment_operation& op, const comment_object& cmt, const fc::mutable_variant_object* prevPtr, fc::time_point_sec now) {
if (_db.has_index<comment_last_update_index>()) {
const auto& clu_idx = _db.get_index<comment_last_update_index, by_comment>();
auto clu_itr = clu_idx.find(cmt.id);
if (clu_itr != clu_idx.end() && clu_itr->num_changes > 0 && clu_itr->num_changes <= versions_depth) {
auto id = make_id(op.author, op.permlink);
auto v = clu_itr->num_changes;
auto vid = id + "," + std::to_string(v);

fc::mutable_variant_object prev_doc;
bool from_buffer;
if (prevPtr == nullptr || !prevPtr->size()) {
bool found = find_post(id, prev_doc, from_buffer);
if (found) {
prevPtr = &prev_doc;
} else { // post not exists because of some mistake in elastic node maintenance
prevPtr = nullptr;
}
}

if (prevPtr) {
fc::mutable_variant_object doc;
const auto& prev = *prevPtr;
auto itr = prev.find("body_patch");
if (itr != prev.end()) {
doc["is_patch"] = true;
doc["body"] = itr->value();
} else {
doc["body"] = prev["body"];
}
auto lu_itr = prev.find("last_update");
if (lu_itr != prev.end()) {
doc["time"] = lu_itr->value();
} else {
doc["time"] = cmt.created;
}
doc["v"] = v;
doc["post"] = id;

buffer_versions[vid] = std::move(doc);
}
}
} else {
wlog("no comment_last_update_index (no social_network plugin or comment-last-update-depth in config is 0), so we will not save comment/post versions");
}
}

bool just_created(const comment_object& cmt) {
if (_db.has_index<comment_last_update_index>()) {
const auto& clu_idx = _db.get_index<comment_last_update_index, by_comment>();
auto clu_itr = clu_idx.find(cmt.id);
if (clu_itr != clu_idx.end()) {
return !clu_itr->num_changes;
}
}
return false;
}

result_type operator()(const comment_operation& op) {
auto id = std::string(op.author) + "." + op.permlink;
auto id = make_id(op.author, op.permlink);

if (!op.body.size()) {
return;
}

const auto& cmt = _db.get_comment(op.author, op.permlink);

const auto now = _db.head_block_time();

fc::mutable_variant_object doc;
bool from_buffer = false;

bool has_patch = false;
std::string body = op.body;
try {
diff_match_patch<std::wstring> dmp;
auto patch = dmp.patch_fromText(utf8_to_wstring(body));
if (patch.size()) {
find_post(id, doc, from_buffer);
std::string base_body = doc["body"].as_string();
if (base_body.size()) {
auto result = dmp.patch_apply(patch, utf8_to_wstring(base_body));
auto patched_body = wstring_to_utf8(result.first);
if(!fc::is_utf8(patched_body)) {
body = fc::prune_invalid_utf8(patched_body);
} else {
body = patched_body;
}

std::string base_body;
if (!just_created(cmt) && doc.size()) {
base_body = doc["body"].as_string();
}
auto result = dmp.patch_apply(patch, utf8_to_wstring(base_body));
auto patched_body = wstring_to_utf8(result.first);
if(!fc::is_utf8(patched_body)) {
body = fc::prune_invalid_utf8(patched_body);
} else {
body = patched_body;
}
if (now >= skip_comments_before) {
has_patch = true;
}
}
} catch ( ... ) {
}

const auto& cmt = _db.get_comment(op.author, op.permlink);
if (now >= skip_comments_before) {
save_version(op, cmt, &doc, now);
}

doc["id"] = cmt.id;
doc["created"] = _db.head_block_time();
doc["created"] = now;
doc["author"] = op.author;
doc["permlink"] = op.permlink;
doc["parent_author"] = op.parent_author;
Expand Down Expand Up @@ -164,14 +247,21 @@ class elastic_search_state_writer {

doc["author_reputation"] = std::string(_db.get_account_reputation(op.author));

if (has_patch) {
doc["body_patch"] = op.body;
}
if (now >= skip_comments_before) {
doc["last_update"] = now;
}

buffer[id] = std::move(doc);

++op_num;
if (op_num % ELASTIC_WRITE_EACH_N_OP != 0) {
return;
}

write_buffer();
write_buffers();
}

result_type operator()(const comment_reward_operation& op) {
Expand All @@ -181,7 +271,7 @@ class elastic_search_state_writer {
}
#endif

auto id = std::string(op.author) + "." + op.permlink;
auto id = make_id(op.author, op.permlink);

fc::mutable_variant_object doc;
bool from_buffer = false;
Expand All @@ -206,7 +296,7 @@ class elastic_search_state_writer {
return;
}

write_buffer();
write_buffers();
}

result_type operator()(const donate_operation& op) {
Expand All @@ -219,7 +309,7 @@ class elastic_search_state_writer {

const auto* comment = _db.find_comment(author, permlink);
if (comment) {
auto id = author_str + "." + permlink;
auto id = make_id(author_str, permlink);

fc::mutable_variant_object doc;
bool from_buffer = false;
Expand Down Expand Up @@ -252,26 +342,30 @@ class elastic_search_state_writer {
return;
}

write_buffer();
write_buffers();
}

void write_buffer() {
op_num = 0;

void _write_buffer(const std::string& _index, const std::string& _type, es_buffer_type& buf, bool auto_increment = false) {
std::string bulk;
for (auto& obj : buffer) {
for (auto& obj : buf) {
fc::mutable_variant_object idx;
idx["_index"] = "blog";
idx["_type"] = "post";
idx["_id"] = obj.first;
idx["_index"] = _index;
idx["_type"] = _type;
if (!auto_increment) {
idx["_id"] = obj.first;
}
fc::mutable_variant_object idx2;
idx2["index"] = idx;
bulk += fc::json::to_string(idx2) + "\r\n";
bulk += fc::json::to_string(obj.second) + "\r\n";
}
buffer.clear();
buf.clear();

auto bulk_url = url + "/blog/_bulk";
if (bulk.empty()) {
return;
}

auto bulk_url = url + "/" + _index + "/_bulk";

auto headers = get_es_headers();
//headers.emplace_back("Content-Type", "application/json"); // already set - hardcoded
Expand All @@ -281,6 +375,13 @@ class elastic_search_state_writer {
wlog("status: " + std::to_string(reply.status) + ", " + reply_body);
}
}

void write_buffers() {
op_num = 0;

_write_buffer("blog", "post", buffer);
_write_buffer("blog_versions", "version", buffer_versions);
}
};

} } } // golos::plugins::elastic_search
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ namespace golos { namespace plugins { namespace social_network {
time_point_sec last_update; ///< the last time this post was "touched" by modify
time_point_sec active; ///< the last time this post was "touched" by reply
comment_id_type last_reply;
uint16_t num_changes;

uint32_t block_number;
};
Expand Down
5 changes: 5 additions & 0 deletions plugins/social_network/social_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ namespace golos { namespace plugins { namespace social_network {
}
if (set_last_update) {
clu.last_update = active;
if (clu.num_changes < UINT16_MAX) {
++clu.num_changes;
}
}
if (last_reply != comment_id_type()) {
clu.last_reply = last_reply;
Expand All @@ -321,6 +324,7 @@ namespace golos { namespace plugins { namespace social_network {
if (set_last_update) {
clu.last_update = active;
}
clu.num_changes = 0;
clu.last_reply = last_reply;
});
return false;
Expand Down Expand Up @@ -1351,6 +1355,7 @@ namespace golos { namespace plugins { namespace social_network {
con.active = last_update->active;
con.last_update = last_update->last_update;
con.last_reply_id = last_update->last_reply;
con.num_changes = last_update->num_changes;
} else {
con.active = time_point_sec::min();
con.last_update = time_point_sec::min();
Expand Down

0 comments on commit 37331d2

Please sign in to comment.