Skip to content

Commit

Permalink
Merge remote-tracking branch 'centreon-gorgone/gorgone-fix-fd-leak' i…
Browse files Browse the repository at this point in the history
…nto gorgone-fix-fd-leak-migrate
  • Loading branch information
kduret committed Jul 11, 2024
2 parents c82befa + 904635e commit 30e8c82
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 39 deletions.
103 changes: 65 additions & 38 deletions gorgone/gorgone/class/clientzmq.pm
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,20 @@ sub init {
$callbacks->{ $self->{identity} } = $options{callback} if (defined($options{callback}));
}

sub cleanup {
my ($self, %options) = @_;

delete $callbacks->{ $self->{identity} };
delete $connectors->{ $self->{identity} };
delete $sockets->{ $self->{identity} };
}

sub close {
my ($self, %options) = @_;

$sockets->{ $self->{identity} }->close() if (defined($sockets->{ $self->{identity} }));
$self->{core_watcher}->stop() if (defined($self->{core_watcher}));
delete $self->{core_watcher};
$sockets->{ $self->{identity} }->close();
}

sub get_connect_identity {
Expand All @@ -122,20 +131,49 @@ sub get_connect_identity {
return $self->{identity} . '-' . $self->{extra_identity};
}

sub connect_wait_response {
my ($self, %options) = @_;

my $w1 = $self->{connect_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
);

# Soemtimes EV library sent timer event directly (it doesn't wait the timeout)
my $timeout = 10;
my $ctime = time();
while (1) {
my $w2 = $self->{connect_loop}->timer(
1,
0,
sub {}
);
$self->{connect_loop}->run(EV::RUN_ONCE);

last if ($self->{handshake} != $options{handshake_continue});
if (time() > ($ctime + $timeout)) {
$self->{verbose_last_message} = "$options{event} timeout";
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - $options{event} timeout [1]");
last;
}
}

# we try a last time
if ($self->{handshake} == $options{handshake_continue}) {
$self->event(identity => $self->{identity});
}
}

sub get_server_pubkey {
my ($self, %options) = @_;

$sockets->{ $self->{identity} }->send('[GETPUBKEY]', ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});

my $w1 = $self->{connect_loop}->timer(
10,
0,
sub {
$self->{connect_loop}->break();
}
);
$self->{connect_loop}->run();
$self->connect_wait_response(event => 'get_server_pubkey', handshake_continue => 0);
}

sub read_key_protocol {
Expand Down Expand Up @@ -268,6 +306,7 @@ sub ping {
if ($self->{ping} > 0 && $self->{ping_progress} == 0 &&
time() - $self->{ping_time} > $self->{ping}) {
$self->{ping_progress} = 1;
$self->{ping_time} = time();
$self->{ping_timeout_time} = time();
my $action = defined($options{action}) ? $options{action} : 'PING';
$self->send_message(action => $action, data => $options{data}, json_encode => $options{json_encode});
Expand All @@ -278,9 +317,9 @@ sub ping {
time() - $self->{ping_timeout_time} > $self->{ping_timeout}) {
$self->{logger}->writeLogError("[clientzmq] No ping response") if (defined($self->{logger}));
$self->{ping_progress} = 0;
$sockets->{ $self->{identity} }->close();
delete $self->{core_watcher};

$self->close();
# new identity for a new handshake (for module pull)
$self->{extra_identity} = gorgone::standard::library::generate_token(length => 12);
$self->init();
$status = 1;
}
Expand All @@ -303,27 +342,26 @@ sub add_watcher {
sub event {
my ($self, %options) = @_;

$connectors->{ $options{identity} }->{ping_time} = time();

while ($sockets->{ $options{identity} }->has_pollin()) {
my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} });
next if ($connectors->{ $options{identity} }->{handshake} == -1);
next if ($rv);

# We have a response. So it's ok :)
if ($connectors->{ $options{identity} }->{ping_progress} == 1) {
$connectors->{ $options{identity} }->{ping_time} = time();
$connectors->{ $options{identity} }->{ping_progress} = 0;
}

my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} });
last if ($rv);

# in progress
if ($connectors->{ $options{identity} }->{handshake} == 0) {
$self->{connect_loop}->break();
$connectors->{ $options{identity} }->{handshake} = 1;
if ($connectors->{ $options{identity} }->check_server_pubkey(message => $message) == 0) {
$connectors->{ $options{identity} }->{handshake} = -1;

}
} elsif ($connectors->{ $options{identity} }->{handshake} == 1) {
$self->{connect_loop}->break();

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret recv [3]");
my ($status, $verbose, $symkey, $hostname) = $connectors->{ $options{identity} }->client_get_secret(
message => $message
Expand All @@ -332,7 +370,7 @@ sub event {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret $verbose [3]");
$connectors->{ $options{identity} }->{handshake} = -1;
$connectors->{ $options{identity} }->{verbose_last_message} = $verbose;
return ;
next;
}
$connectors->{ $options{identity} }->{handshake} = 2;
if (defined($connectors->{ $options{identity} }->{logger})) {
Expand All @@ -348,7 +386,7 @@ sub event {
if ($rv == -1 || $data !~ /^\[([a-zA-Z0-9:\-_]+?)\]\s+/) {
$connectors->{ $options{identity} }->{handshake} = -1;
$connectors->{ $options{identity} }->{verbose_last_message} = 'decrypt issue: ' . $data;
return ;
next;
}

if ($1 eq 'KEY') {
Expand Down Expand Up @@ -389,14 +427,7 @@ sub send_message {
my ($self, %options) = @_;

if ($self->{handshake} == 0) {
$self->{connect_loop} = new EV::Loop();
$self->{connect_watcher} = $self->{connect_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
);
$self->{connect_loop} = EV::Loop->new();

if (!defined($self->{server_pubkey})) {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - get_server_pubkey sent [1]");
Expand All @@ -414,25 +445,21 @@ sub send_message {
);
if ($status == -1) {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_helo crypt handshake issue [2]");
$self->{verbose_last_message} = 'crypt handshake issue';
$self->{verbose_last_message} = 'client_helo crypt handshake issue';
return (-1, $self->{verbose_last_message});
}

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_helo sent [2]");

$self->{verbose_last_message} = 'Handshake timeout';
$sockets->{ $self->{identity} }->send($ciphertext, ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});

my $w1 = $self->{connect_loop}->timer(
10,
0,
sub { $self->{connect_loop}->break(); }
);
$self->{connect_loop}->run();
$self->connect_wait_response(event => 'client_helo', handshake_continue => 1);
}

undef $self->{connect_loop} if (defined($self->{connect_loop}));
if (defined($self->{connect_loop})) {
delete $self->{connect_loop};
}

if ($self->{handshake} < 2) {
$self->{handshake} = 0;
Expand Down
8 changes: 7 additions & 1 deletion gorgone/gorgone/modules/core/proxy/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ sub action_proxyaddnode {
});

$self->{clients}->{ $data->{id} }->{class}->close();
$self->{clients}->{ $data->{id} }->{class}->cleanup();
} else {
$self->{internal_channels}->{ $data->{id} } = gorgone::standard::library::connect_com(
context => $self->{zmq_context},
Expand Down Expand Up @@ -282,6 +283,7 @@ sub action_proxycloseconnection {
$self->{logger}->writeLogInfo("[proxy] Close connectionn for $data->{id}");

$self->{clients}->{ $data->{id} }->{class}->close();
$self->{clients}->{ $data->{id} }->{class}->cleanup();
$self->{clients}->{ $data->{id} }->{delete} = 0;
$self->{clients}->{ $data->{id} }->{class} = undef;
}
Expand All @@ -293,6 +295,7 @@ sub close_connections {
if (defined($self->{clients}->{$_}->{class}) && $self->{clients}->{$_}->{type} eq 'push_zmq') {
$self->{logger}->writeLogInfo("[proxy] Close connection for $_");
$self->{clients}->{$_}->{class}->close();
$self->{clients}->{$_}->{class}->cleanup();
}
}
}
Expand Down Expand Up @@ -497,7 +500,10 @@ sub periodic_exec {
token => $connector->generate_token(),
target => ''
});
$connector->{clients}->{$_}->{class}->close() if (defined($connector->{clients}->{$_}->{class}));
if (defined($connector->{clients}->{$_}->{class})) {
$connector->{clients}->{$_}->{class}->close();
$connector->{clients}->{$_}->{class}->cleanup();
}
$connector->{clients}->{$_}->{class} = undef;
$connector->{clients}->{$_}->{delete} = 0;
$connector->{clients}->{$_}->{com_read_internal} = 0;
Expand Down
2 changes: 2 additions & 0 deletions gorgone/gorgone/modules/core/proxy/sshclient.pm
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,6 @@ sub close {
$self->disconnect();
}

sub cleanup {}

1;

0 comments on commit 30e8c82

Please sign in to comment.