Skip to content

Commit

Permalink
Revert "Merge safe features branch"
Browse files Browse the repository at this point in the history
  • Loading branch information
redwan-islam-rumman authored Sep 29, 2024
1 parent 1bef6df commit bc0bfca
Show file tree
Hide file tree
Showing 39 changed files with 207 additions and 1,010 deletions.
20 changes: 10 additions & 10 deletions adnl/adnl-channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ void AdnlChannelImpl::send_message(td::uint32 priority, td::actor::ActorId<AdnlN
}

void AdnlChannelImpl::receive(td::IPAddress addr, td::BufferSlice data) {
auto P = td::PromiseCreator::lambda([peer = peer_pair_, channel_id = channel_in_id_, addr, id = print_id(),
size = data.size()](td::Result<AdnlPacket> R) {
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: can not decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer, &AdnlPeerPair::receive_packet_from_channel, channel_id, std::move(packet), size);
}
});
auto P = td::PromiseCreator::lambda(
[peer = peer_pair_, channel_id = channel_in_id_, addr, id = print_id()](td::Result<AdnlPacket> R) {
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: can not decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer, &AdnlPeerPair::receive_packet_from_channel, channel_id, std::move(packet));
}
});

decrypt(std::move(data), std::move(P));
}
Expand Down
97 changes: 11 additions & 86 deletions adnl/adnl-local-id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,18 @@ AdnlAddressList AdnlLocalId::get_addr_list() const {
}

void AdnlLocalId::receive(td::IPAddress addr, td::BufferSlice data) {
InboundRateLimiter& rate_limiter = inbound_rate_limiter_[addr];
if (!rate_limiter.rate_limiter.take()) {
VLOG(ADNL_NOTICE) << this << ": dropping IN message: rate limit exceeded";
add_dropped_packet_stats(addr);
return;
}
++rate_limiter.currently_decrypting_packets;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), peer_table = peer_table_, dst = short_id_, addr,
id = print_id(), size = data.size()](td::Result<AdnlPacket> R) {
td::actor::send_closure(SelfId, &AdnlLocalId::decrypt_packet_done, addr);
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: cannot decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer_table, &AdnlPeerTable::receive_decrypted_packet, dst, std::move(packet), size);
}
});
decrypt(std::move(data), std::move(P));
}
auto P = td::PromiseCreator::lambda(
[peer_table = peer_table_, dst = short_id_, addr, id = print_id()](td::Result<AdnlPacket> R) {
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: cannot decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer_table, &AdnlPeerTable::receive_decrypted_packet, dst, std::move(packet));
}
});

void AdnlLocalId::decrypt_packet_done(td::IPAddress addr) {
auto it = inbound_rate_limiter_.find(addr);
CHECK(it != inbound_rate_limiter_.end());
--it->second.currently_decrypting_packets;
add_decrypted_packet_stats(addr);
decrypt(std::move(data), std::move(P));
}

void AdnlLocalId::deliver(AdnlNodeIdShort src, td::BufferSlice data) {
Expand Down Expand Up @@ -306,67 +292,6 @@ void AdnlLocalId::update_packet(AdnlPacket packet, bool update_id, bool sign, td
}
}

void AdnlLocalId::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise) {
auto stats = create_tl_object<ton_api::adnl_stats_localId>();
stats->short_id_ = short_id_.bits256_value();
for (auto &[ip, x] : inbound_rate_limiter_) {
if (x.currently_decrypting_packets != 0) {
stats->current_decrypt_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", x.currently_decrypting_packets));
}
}
prepare_packet_stats();
stats->packets_recent_ = packet_stats_prev_.tl();
stats->packets_total_ = packet_stats_total_.tl();
stats->packets_total_->ts_start_ = (double)Adnl::adnl_start_time();
stats->packets_total_->ts_end_ = td::Clocks::system();
promise.set_result(std::move(stats));
}

void AdnlLocalId::add_decrypted_packet_stats(td::IPAddress addr) {
prepare_packet_stats();
++packet_stats_cur_.decrypted_packets[addr];
++packet_stats_total_.decrypted_packets[addr];
}

void AdnlLocalId::add_dropped_packet_stats(td::IPAddress addr) {
prepare_packet_stats();
++packet_stats_cur_.dropped_packets[addr];
++packet_stats_total_.dropped_packets[addr];
}

void AdnlLocalId::prepare_packet_stats() {
double now = td::Clocks::system();
if (now >= packet_stats_cur_.ts_end) {
packet_stats_prev_ = std::move(packet_stats_cur_);
packet_stats_cur_ = {};
auto now_int = (int)td::Clocks::system();
packet_stats_cur_.ts_start = (double)(now_int / 60 * 60);
packet_stats_cur_.ts_end = packet_stats_cur_.ts_start + 60.0;
if (packet_stats_prev_.ts_end < now - 60.0) {
packet_stats_prev_ = {};
packet_stats_prev_.ts_end = packet_stats_cur_.ts_start;
packet_stats_prev_.ts_start = packet_stats_prev_.ts_end - 60.0;
}
}
}

tl_object_ptr<ton_api::adnl_stats_localIdPackets> AdnlLocalId::PacketStats::tl() const {
auto obj = create_tl_object<ton_api::adnl_stats_localIdPackets>();
obj->ts_start_ = ts_start;
obj->ts_end_ = ts_end;
for (const auto &[ip, packets] : decrypted_packets) {
obj->decrypted_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets));
}
for (const auto &[ip, packets] : dropped_packets) {
obj->dropped_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets));
}
return obj;
}


} // namespace adnl

} // namespace ton
19 changes: 0 additions & 19 deletions adnl/adnl-local-id.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class AdnlLocalId : public td::actor::Actor {
void deliver(AdnlNodeIdShort src, td::BufferSlice data);
void deliver_query(AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise);
void receive(td::IPAddress addr, td::BufferSlice data);
void decrypt_packet_done(td::IPAddress addr);

void subscribe(std::string prefix, std::unique_ptr<AdnlPeerTable::Callback> callback);
void unsubscribe(std::string prefix);
Expand All @@ -78,8 +77,6 @@ class AdnlLocalId : public td::actor::Actor {
void update_packet(AdnlPacket packet, bool update_id, bool sign, td::int32 update_addr_list_if,
td::int32 update_priority_addr_list_if, td::Promise<AdnlPacket> promise);

void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise);

td::uint32 get_mode() {
return mode_;
}
Expand All @@ -104,22 +101,6 @@ class AdnlLocalId : public td::actor::Actor {

td::uint32 mode_;

struct InboundRateLimiter {
RateLimiter rate_limiter = RateLimiter(75, 0.33);
td::uint64 currently_decrypting_packets = 0;
};
std::map<td::IPAddress, InboundRateLimiter> inbound_rate_limiter_;
struct PacketStats {
double ts_start = 0.0, ts_end = 0.0;
std::map<td::IPAddress, td::uint64> decrypted_packets;
std::map<td::IPAddress, td::uint64> dropped_packets;

tl_object_ptr<ton_api::adnl_stats_localIdPackets> tl() const;
} packet_stats_cur_, packet_stats_prev_, packet_stats_total_;
void add_decrypted_packet_stats(td::IPAddress addr);
void add_dropped_packet_stats(td::IPAddress addr);
void prepare_packet_stats();

void publish_address_list();
};

Expand Down
86 changes: 2 additions & 84 deletions adnl/adnl-peer-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void AdnlPeerTableImpl::receive_packet(td::IPAddress addr, AdnlCategoryMask cat_
<< " (len=" << (data.size() + 32) << ")";
}

void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet, td::uint64 serialized_size) {
void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet) {
packet.run_basic_checks().ensure();

if (!packet.inited_from_short()) {
Expand Down Expand Up @@ -119,7 +119,7 @@ void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket
return;
}
td::actor::send_closure(it->second, &AdnlPeer::receive_packet, dst, it2->second.mode, it2->second.local_id.get(),
std::move(packet), serialized_size);
std::move(packet));
}

void AdnlPeerTableImpl::add_peer(AdnlNodeIdShort local_id, AdnlNodeIdFull id, AdnlAddressList addr_list) {
Expand Down Expand Up @@ -385,88 +385,6 @@ void AdnlPeerTableImpl::get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_
td::actor::send_closure(it->second, &AdnlPeer::get_conn_ip_str, l_id, std::move(promise));
}

void AdnlPeerTableImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) {
class Cb : public td::actor::Actor {
public:
explicit Cb(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) : promise_(std::move(promise)) {
}

void got_local_id_stats(tl_object_ptr<ton_api::adnl_stats_localId> local_id) {
auto &local_id_stats = local_id_stats_[local_id->short_id_];
if (local_id_stats) {
local_id->peers_ = std::move(local_id_stats->peers_);
}
local_id_stats = std::move(local_id);
dec_pending();
}

void got_peer_stats(std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>> peer_pairs) {
for (auto &peer_pair : peer_pairs) {
auto &local_id_stats = local_id_stats_[peer_pair->local_id_];
if (local_id_stats == nullptr) {
local_id_stats = create_tl_object<ton_api::adnl_stats_localId>();
local_id_stats->short_id_ = peer_pair->local_id_;
}
local_id_stats->peers_.push_back(std::move(peer_pair));
}
dec_pending();
}

void inc_pending() {
++pending_;
}

void dec_pending() {
CHECK(pending_ > 0);
--pending_;
if (pending_ == 0) {
auto stats = create_tl_object<ton_api::adnl_stats>();
stats->timestamp_ = td::Clocks::system();
for (auto &[id, local_id_stats] : local_id_stats_) {
stats->local_ids_.push_back(std::move(local_id_stats));
}
promise_.set_result(std::move(stats));
stop();
}
}

private:
td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise_;
size_t pending_ = 1;

std::map<td::Bits256, tl_object_ptr<ton_api::adnl_stats_localId>> local_id_stats_;
};
auto callback = td::actor::create_actor<Cb>("adnlstats", std::move(promise)).release();

for (auto &[id, local_id] : local_ids_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(local_id.local_id, &AdnlLocalId::get_stats,
[id = id, callback](td::Result<tl_object_ptr<ton_api::adnl_stats_localId>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE)
<< "failed to get stats for local id " << id << " : " << R.move_as_error();
td::actor::send_closure(callback, &Cb::dec_pending);
} else {
td::actor::send_closure(callback, &Cb::got_local_id_stats, R.move_as_ok());
}
});
}
for (auto &[id, peer] : peers_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(
peer, &AdnlPeer::get_stats,
[id = id, callback](td::Result<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE) << "failed to get stats for peer " << id << " : " << R.move_as_error();
td::actor::send_closure(callback, &Cb::dec_pending);
} else {
td::actor::send_closure(callback, &Cb::got_peer_stats, R.move_as_ok());
}
});
}
td::actor::send_closure(callback, &Cb::dec_pending);
}

} // namespace adnl

} // namespace ton
2 changes: 1 addition & 1 deletion adnl/adnl-peer-table.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class AdnlPeerTable : public Adnl {
virtual void answer_query(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlQueryId query_id, td::BufferSlice data) = 0;

virtual void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) = 0;
virtual void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet, td::uint64 serialized_size) = 0;
virtual void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet) = 0;
virtual void send_message_in(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlMessage message, td::uint32 flags) = 0;

virtual void register_channel(AdnlChannelIdShort id, AdnlNodeIdShort local_id,
Expand Down
4 changes: 1 addition & 3 deletions adnl/adnl-peer-table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
void add_static_nodes_from_config(AdnlNodesList nodes) override;

void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) override;
void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket data, td::uint64 serialized_size) override;
void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket data) override;
void send_message_in(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlMessage message, td::uint32 flags) override;
void send_message(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::BufferSlice data) override {
send_message_ex(src, dst, std::move(data), 0);
Expand Down Expand Up @@ -108,8 +108,6 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) override;
void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;

void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) override;

struct PrintId {};
PrintId print_id() const {
return PrintId{};
Expand Down
Loading

0 comments on commit bc0bfca

Please sign in to comment.