From a6ac2121855148a2dcfa41ff5c4571d9ffd7b800 Mon Sep 17 00:00:00 2001 From: 1aerostorm Date: Wed, 4 May 2022 16:47:31 +0000 Subject: [PATCH] ElasticSearch - add post/comment versions --- .../include/golos/api/comment_api_object.hpp | 3 +- .../api/include/golos/api/discussion.hpp | 15 +- .../elastic_search/elastic_search_plugin.cpp | 22 ++- .../elastic_search/elastic_search_state.hpp | 159 ++++++++++++++---- .../social_network/social_network_types.hpp | 1 + plugins/social_network/social_network.cpp | 5 + 6 files changed, 165 insertions(+), 40 deletions(-) diff --git a/libraries/api/include/golos/api/comment_api_object.hpp b/libraries/api/include/golos/api/comment_api_object.hpp index 619d08a690..af8e1a0a3b 100644 --- a/libraries/api/include/golos/api/comment_api_object.hpp +++ b/libraries/api/include/golos/api/comment_api_object.hpp @@ -28,6 +28,7 @@ namespace golos { namespace api { fc::optional last_update; time_point_sec created; fc::optional active; + fc::optional num_changes; time_point_sec last_payout; comment_object::id_type last_reply_id; // not reflected @@ -93,7 +94,7 @@ namespace golos { namespace api { FC_REFLECT( (golos::api::comment_api_object), (id)(author)(permlink)(parent_author)(parent_permlink)(category)(title)(body)(json_metadata)(last_update) - (created)(active)(last_payout)(depth)(children)(children_rshares2)(net_rshares)(abs_rshares) + (created)(active)(num_changes)(last_payout)(depth)(children)(children_rshares2)(net_rshares)(abs_rshares) (vote_rshares)(children_abs_rshares)(cashout_time)(max_cashout_time)(total_vote_weight) (reward_weight)(donates)(donates_uia)(total_payout_value)(beneficiary_payout_value)(beneficiary_gests_payout_value)(curator_payout_value)(curator_gests_payout_value) (author_rewards)(author_payout_in_golos)(author_gbg_payout_value)(author_golos_payout_value)(author_gests_payout_value)(net_votes) diff --git a/libraries/api/include/golos/api/discussion.hpp b/libraries/api/include/golos/api/discussion.hpp index 4d5c799936..489c438f48 100644 --- a/libraries/api/include/golos/api/discussion.hpp +++ b/libraries/api/include/golos/api/discussion.hpp @@ -53,10 +53,11 @@ namespace golos { namespace api { } } // golos::api FC_REFLECT_DERIVED( (golos::api::discussion), ((golos::api::comment_api_object)), - (url)(pending_author_payout_value)(pending_author_payout_in_golos) - (pending_author_payout_gbg_value)(pending_author_payout_gests_value)(pending_author_payout_golos_value) - (pending_benefactor_payout_value)(pending_benefactor_payout_gests_value) - (pending_curator_payout_value)(pending_curator_payout_gests_value) - (pending_payout_value)(total_pending_payout_value)(active_votes)(active_votes_count)(replies) - (author_reputation)(promoted)(body_length)(reblogged_by)(first_reblogged_by)(first_reblogged_on) - (reblog_author)(reblog_title)(reblog_body)(reblog_json_metadata)(reblog_entries)(last_reply)) + (url)(pending_author_payout_value)(pending_author_payout_in_golos) + (pending_author_payout_gbg_value)(pending_author_payout_gests_value)(pending_author_payout_golos_value) + (pending_benefactor_payout_value)(pending_benefactor_payout_gests_value) + (pending_curator_payout_value)(pending_curator_payout_gests_value) + (pending_payout_value)(total_pending_payout_value)(active_votes)(active_votes_count)(replies) + (author_reputation)(promoted)(body_length)(reblogged_by)(first_reblogged_by)(first_reblogged_on) + (reblog_author)(reblog_title)(reblog_body)(reblog_json_metadata)(reblog_entries)(last_reply) +) diff --git a/plugins/elastic_search/elastic_search_plugin.cpp b/plugins/elastic_search/elastic_search_plugin.cpp index 7791386f4b..8099143d54 100644 --- a/plugins/elastic_search/elastic_search_plugin.cpp +++ b/plugins/elastic_search/elastic_search_plugin.cpp @@ -10,9 +10,10 @@ using golos::protocol::signed_block; class elastic_search_plugin::elastic_search_plugin_impl final { public: - elastic_search_plugin_impl(const std::string& url, const std::string& login, const std::string& password) + elastic_search_plugin_impl(const std::string& url, const std::string& login, const std::string& password, + uint16_t versions_depth, fc::time_point_sec skip_comments_before) : _db(appbase::app().get_plugin().db()), - writer(url, login, password, _db) { + writer(url, login, password, _db, versions_depth, skip_comments_before) { } ~elastic_search_plugin_impl() { @@ -47,6 +48,12 @@ void elastic_search_plugin::set_program_options(bpo::options_description& cli, b ) ( "elastic-search-password", bpo::value(), "Elastic Search Password" + ) ( + "elastic-search-versions-depth", bpo::value()->default_value(10), + "Count of post/comment versions stored" + ) ( + "elastic-search-skip-comments-before", bpo::value()->default_value("2019-01-01T00:00:00"), + "Do not track version history (excl. first version) for comments/posts before this date. Saves disk space" ); } @@ -59,8 +66,17 @@ void elastic_search_plugin::plugin_initialize(const bpo::variables_map &options) auto login = options.at("elastic-search-login").as(); auto password = options.at("elastic-search-password").as(); + auto versions_depth = options.at("elastic-search-versions-depth").as(); + auto skip_comments_before = options.at("elastic-search-skip-comments-before").as(); + + time_point_sec skb; + try { + skb = time_point_sec::from_iso_string(skip_comments_before); + } catch (...) { + elog("Cannot parse elastic-search-skip-comments-before - use proper format, example: 2019-01-01T00:00:00"); + } - my = std::make_unique(uri_str, login, password); + my = std::make_unique(uri_str, login, password, versions_depth, skb); my->_db.post_apply_operation.connect([&](const operation_notification& note) { my->on_operation(note); diff --git a/plugins/elastic_search/include/golos/plugins/elastic_search/elastic_search_state.hpp b/plugins/elastic_search/include/golos/plugins/elastic_search/elastic_search_state.hpp index 2192ab770c..51dee1c846 100644 --- a/plugins/elastic_search/include/golos/plugins/elastic_search/elastic_search_state.hpp +++ b/plugins/elastic_search/include/golos/plugins/elastic_search/elastic_search_state.hpp @@ -17,6 +17,8 @@ namespace golos { namespace plugins { namespace elastic_search { #define TAG_MAX_LENGTH 512 using boost::locale::conv::utf_to_utf; +using golos::plugins::social_network::comment_last_update_index; +using golos::plugins::social_network::by_comment; #ifdef STEEMIT_BUILD_TESTNET #define ELASTIC_WRITE_EACH_N_OP 3 @@ -24,6 +26,8 @@ using boost::locale::conv::utf_to_utf; #define ELASTIC_WRITE_EACH_N_OP 100 #endif +using es_buffer_type = std::map; + class elastic_search_state_writer { public: using result_type = void; @@ -32,12 +36,17 @@ class elastic_search_state_writer { std::string login; std::string password; database& _db; + uint16_t versions_depth; + fc::time_point_sec skip_comments_before; fc::http::connection conn; - std::map buffer; + es_buffer_type buffer; + es_buffer_type buffer_versions; int op_num = 0; - elastic_search_state_writer(const std::string& url, const std::string& login, const std::string& password, database& db) - : url(url), login(login), password(password), _db(db) { + elastic_search_state_writer(const std::string& url, const std::string& login, const std::string& password, database& db, + uint16_t versions_depth, fc::time_point_sec skip_comments_before) + : url(url), login(login), password(password), _db(db), + versions_depth(versions_depth), skip_comments_before(skip_comments_before) { auto fc_url = fc::url(url); auto host_port = *fc_url.host() + (fc_url.port() ? ":" + std::to_string(*fc_url.port()) : ""); auto ep = fc::ip::endpoint::from_string(host_port); @@ -49,6 +58,10 @@ class elastic_search_state_writer { } + std::string make_id(std::string author, const std::string& permlink) const { + return author + "." + permlink; + } + std::wstring utf8_to_wstring(const std::string& str) const { return utf_to_utf(str.c_str(), str.c_str() + str.size()); } @@ -102,40 +115,110 @@ class elastic_search_state_writer { return false; } + void save_version(const comment_operation& op, const comment_object& cmt, const fc::mutable_variant_object* prevPtr, fc::time_point_sec now) { + if (_db.has_index()) { + const auto& clu_idx = _db.get_index(); + auto clu_itr = clu_idx.find(cmt.id); + if (clu_itr != clu_idx.end() && clu_itr->num_changes > 0 && clu_itr->num_changes <= versions_depth) { + auto id = make_id(op.author, op.permlink); + auto v = clu_itr->num_changes; + auto vid = id + "," + std::to_string(v); + + fc::mutable_variant_object prev_doc; + bool from_buffer; + if (prevPtr == nullptr || !prevPtr->size()) { + bool found = find_post(id, prev_doc, from_buffer); + if (found) { + prevPtr = &prev_doc; + } else { // post not exists because of some mistake in elastic node maintenance + prevPtr = nullptr; + } + } + + if (prevPtr) { + fc::mutable_variant_object doc; + const auto& prev = *prevPtr; + auto itr = prev.find("body_patch"); + if (itr != prev.end()) { + doc["is_patch"] = true; + doc["body"] = itr->value(); + } else { + doc["body"] = prev["body"]; + } + auto lu_itr = prev.find("last_update"); + if (lu_itr != prev.end()) { + doc["time"] = lu_itr->value(); + } else { + doc["time"] = cmt.created; + } + doc["v"] = v; + doc["post"] = id; + + buffer_versions[vid] = std::move(doc); + } + } + } else { + wlog("no comment_last_update_index (no social_network plugin or comment-last-update-depth in config is 0), so we will not save comment/post versions"); + } + } + + bool just_created(const comment_object& cmt) { + if (_db.has_index()) { + const auto& clu_idx = _db.get_index(); + auto clu_itr = clu_idx.find(cmt.id); + if (clu_itr != clu_idx.end()) { + return !clu_itr->num_changes; + } + } + return false; + } + result_type operator()(const comment_operation& op) { - auto id = std::string(op.author) + "." + op.permlink; + auto id = make_id(op.author, op.permlink); if (!op.body.size()) { return; } + const auto& cmt = _db.get_comment(op.author, op.permlink); + + const auto now = _db.head_block_time(); + fc::mutable_variant_object doc; bool from_buffer = false; + bool has_patch = false; std::string body = op.body; try { diff_match_patch dmp; auto patch = dmp.patch_fromText(utf8_to_wstring(body)); if (patch.size()) { find_post(id, doc, from_buffer); - std::string base_body = doc["body"].as_string(); - if (base_body.size()) { - auto result = dmp.patch_apply(patch, utf8_to_wstring(base_body)); - auto patched_body = wstring_to_utf8(result.first); - if(!fc::is_utf8(patched_body)) { - body = fc::prune_invalid_utf8(patched_body); - } else { - body = patched_body; - } + + std::string base_body; + if (!just_created(cmt) && doc.size()) { + base_body = doc["body"].as_string(); + } + auto result = dmp.patch_apply(patch, utf8_to_wstring(base_body)); + auto patched_body = wstring_to_utf8(result.first); + if(!fc::is_utf8(patched_body)) { + body = fc::prune_invalid_utf8(patched_body); + } else { + body = patched_body; + } + if (now >= skip_comments_before) { + has_patch = true; } } } catch ( ... ) { } - const auto& cmt = _db.get_comment(op.author, op.permlink); + if (now >= skip_comments_before) { + save_version(op, cmt, &doc, now); + } doc["id"] = cmt.id; - doc["created"] = _db.head_block_time(); + doc["created"] = now; doc["author"] = op.author; doc["permlink"] = op.permlink; doc["parent_author"] = op.parent_author; @@ -164,6 +247,13 @@ class elastic_search_state_writer { doc["author_reputation"] = std::string(_db.get_account_reputation(op.author)); + if (has_patch) { + doc["body_patch"] = op.body; + } + if (now >= skip_comments_before) { + doc["last_update"] = now; + } + buffer[id] = std::move(doc); ++op_num; @@ -171,7 +261,7 @@ class elastic_search_state_writer { return; } - write_buffer(); + write_buffers(); } result_type operator()(const comment_reward_operation& op) { @@ -181,7 +271,7 @@ class elastic_search_state_writer { } #endif - auto id = std::string(op.author) + "." + op.permlink; + auto id = make_id(op.author, op.permlink); fc::mutable_variant_object doc; bool from_buffer = false; @@ -206,7 +296,7 @@ class elastic_search_state_writer { return; } - write_buffer(); + write_buffers(); } result_type operator()(const donate_operation& op) { @@ -219,7 +309,7 @@ class elastic_search_state_writer { const auto* comment = _db.find_comment(author, permlink); if (comment) { - auto id = author_str + "." + permlink; + auto id = make_id(author_str, permlink); fc::mutable_variant_object doc; bool from_buffer = false; @@ -252,26 +342,30 @@ class elastic_search_state_writer { return; } - write_buffer(); + write_buffers(); } - void write_buffer() { - op_num = 0; - + void _write_buffer(const std::string& _index, const std::string& _type, es_buffer_type& buf, bool auto_increment = false) { std::string bulk; - for (auto& obj : buffer) { + for (auto& obj : buf) { fc::mutable_variant_object idx; - idx["_index"] = "blog"; - idx["_type"] = "post"; - idx["_id"] = obj.first; + idx["_index"] = _index; + idx["_type"] = _type; + if (!auto_increment) { + idx["_id"] = obj.first; + } fc::mutable_variant_object idx2; idx2["index"] = idx; bulk += fc::json::to_string(idx2) + "\r\n"; bulk += fc::json::to_string(obj.second) + "\r\n"; } - buffer.clear(); + buf.clear(); - auto bulk_url = url + "/blog/_bulk"; + if (bulk.empty()) { + return; + } + + auto bulk_url = url + "/" + _index + "/_bulk"; auto headers = get_es_headers(); //headers.emplace_back("Content-Type", "application/json"); // already set - hardcoded @@ -281,6 +375,13 @@ class elastic_search_state_writer { wlog("status: " + std::to_string(reply.status) + ", " + reply_body); } } + + void write_buffers() { + op_num = 0; + + _write_buffer("blog", "post", buffer); + _write_buffer("blog_versions", "version", buffer_versions); + } }; } } } // golos::plugins::elastic_search diff --git a/plugins/social_network/include/golos/plugins/social_network/social_network_types.hpp b/plugins/social_network/include/golos/plugins/social_network/social_network_types.hpp index 195d7ec240..5664998a26 100644 --- a/plugins/social_network/include/golos/plugins/social_network/social_network_types.hpp +++ b/plugins/social_network/include/golos/plugins/social_network/social_network_types.hpp @@ -75,6 +75,7 @@ namespace golos { namespace plugins { namespace social_network { time_point_sec last_update; ///< the last time this post was "touched" by modify time_point_sec active; ///< the last time this post was "touched" by reply comment_id_type last_reply; + uint16_t num_changes; uint32_t block_number; }; diff --git a/plugins/social_network/social_network.cpp b/plugins/social_network/social_network.cpp index 2002ddf40a..a37774f30c 100644 --- a/plugins/social_network/social_network.cpp +++ b/plugins/social_network/social_network.cpp @@ -305,6 +305,9 @@ namespace golos { namespace plugins { namespace social_network { } if (set_last_update) { clu.last_update = active; + if (clu.num_changes < UINT16_MAX) { + ++clu.num_changes; + } } if (last_reply != comment_id_type()) { clu.last_reply = last_reply; @@ -321,6 +324,7 @@ namespace golos { namespace plugins { namespace social_network { if (set_last_update) { clu.last_update = active; } + clu.num_changes = 0; clu.last_reply = last_reply; }); return false; @@ -1351,6 +1355,7 @@ namespace golos { namespace plugins { namespace social_network { con.active = last_update->active; con.last_update = last_update->last_update; con.last_reply_id = last_update->last_reply; + con.num_changes = last_update->num_changes; } else { con.active = time_point_sec::min(); con.last_update = time_point_sec::min();