-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConnection.h
99 lines (78 loc) · 2.58 KB
/
Connection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#ifndef __CONNECTION_H__
#define __CONNECTION_H__
#include <boost/asio.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
#include <boost/optional.hpp>
#include "Endpoint.h"
#include "DestroyGuard.h"
#include "ID.h"
#include "PeriodicTimer.h"
#include "protocol.h"
class Node;
class Connection {
using MessagePtr = std::unique_ptr<Message>;
public:
Connection(Node&, Endpoint remote_endpoint);
Connection(const Connection&) = delete;
const Connection& operator=(const Connection&) = delete;
ID id() const { return ID(_remote_endpoint); }
ID node_id() const;
template<class Msg, class... Args>
void schedule_send(Args... args) {
bool was_empty = _tx_messages.empty();
Msg* msg = new Msg(++_tx_sequence_id, _rx_sequence_id, args...);
log(node_id(), " -> ", id(), " ", msg->label(), " ", *msg);
_tx_messages.push_back(msg);
if (was_empty) send_front_message();
}
template<class Msg> void receive(const Msg& msg) {
assert(msg.sequence_number <= _rx_sequence_id + 1);
ack_message(msg.ack_sequence_number);
if (msg.sequence_number == _rx_sequence_id + 1) {
// DEBUG
if (msg.label() != "ping") {
log(node_id(), " <- ", id(), " ", msg.label(), " ", msg);
}
keep_alive();
_rx_sequence_id = msg.sequence_number;
use_message(msg);
}
else if (msg.sequence_number == _rx_sequence_id) {
keep_alive();
}
}
private:
void keep_alive();
void on_tick();
void use_message(const PingMsg&);
void use_message(const StartMsg&);
void use_message(const NumberMsg&);
void use_message(const Update1Msg&);
void use_message(const Update2Msg&);
void use_message(const ResultMsg&);
void ack_message(uint32_t ack_sequence_number);
void send(const Message& msg);
void send_front_message();
private:
Node& _node;
const Endpoint _remote_endpoint;
PeriodicTimer _periodic_timer;
unsigned int _missed_ping_count;
bool _is_sending;
boost::ptr_deque<Message> _tx_messages;
uint32_t _rx_sequence_id;
uint32_t _tx_sequence_id;
DestroyGuard _destroy_guard;
// Increment geometrically, decrement linearly.
void increment_timer_duration();
void decrement_timer_duration();
public:
// FastMIS related data.
bool knows_my_result;
boost::optional<float> random_number;
boost::optional<LeaderStatus> update1;
boost::optional<LeaderStatus> update2;
boost::optional<LeaderStatus> result;
bool is_contender;
};
#endif // ifndef __CONNECTION_H__