Skip to content

Commit

Permalink
Merge pull request #407 from bazsi/fix-handshake-regression
Browse files Browse the repository at this point in the history
Fix handshake regression
  • Loading branch information
MrAnno authored Dec 10, 2024
2 parents 00625f4 + 0cac6bf commit 2032241
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 49 deletions.
2 changes: 2 additions & 0 deletions lib/logproto/logproto-buffered-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ LogProtoPrepareAction log_proto_buffered_server_prepare(LogProtoServer *s, GIOCo
gint *timeout G_GNUC_UNUSED);
LogProtoBufferedServerState *log_proto_buffered_server_get_state(LogProtoBufferedServer *self);
void log_proto_buffered_server_put_state(LogProtoBufferedServer *self);
gboolean log_proto_buffered_server_restart_with_state(LogProtoServer *s,
PersistState *persist_state, const gchar *persist_name);

/* LogProtoBufferedServer */
gboolean log_proto_buffered_server_validate_options_method(LogProtoServer *s);
Expand Down
28 changes: 28 additions & 0 deletions lib/logproto/logproto-text-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,33 @@ log_proto_text_server_set_multi_line(LogProtoServer *s, MultiLineLogic *multi_li
self->multi_line = multi_line;
}

static gboolean
log_proto_text_server_restart_with_state(LogProtoServer *s, PersistState *persist_state, const gchar *persist_name)
{
LogProtoTextServer *self = (LogProtoTextServer *) s;

gboolean res = log_proto_buffered_server_restart_with_state(s, persist_state, persist_name);
if (!res)
return FALSE;

if (!self->super.buffer)
return FALSE;

LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super);
const guchar *buffer_start = self->super.buffer + state->pending_buffer_pos;
gsize buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos;

if (buffer_bytes > 0)
{
const guchar *eom = self->find_eom(buffer_start, buffer_bytes);
if (eom)
self->cached_eol_pos = eom - self->super.buffer;
}
log_proto_buffered_server_put_state(&self->super);

return TRUE;
}

void
log_proto_text_server_free(LogProtoServer *s)
{
Expand All @@ -306,6 +333,7 @@ log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, co
log_proto_buffered_server_init(&self->super, transport, options);
self->super.super.prepare = log_proto_text_server_prepare_method;
self->super.super.free_fn = log_proto_text_server_free;
self->super.super.restart_with_state = log_proto_text_server_restart_with_state;
self->super.fetch_from_buffer = log_proto_text_server_fetch_from_buffer;
self->super.flush = log_proto_text_server_flush;
self->find_eom = find_eom;
Expand Down
19 changes: 0 additions & 19 deletions lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@ log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr)
self->local_addr = g_sockaddr_ref(local_addr);
}

void
log_reader_set_immediate_check(LogReader *s)
{
LogReader *self = (LogReader *) s;

self->immediate_check = TRUE;
}

void
log_reader_set_options(LogReader *s, LogPipe *control, LogReaderOptions *options,
const gchar *stats_id, StatsClusterKeyBuilder *kb)
Expand Down Expand Up @@ -160,15 +152,13 @@ log_reader_disable_watches(LogReader *self)
static void
log_reader_suspend_until_awoken(LogReader *self)
{
self->immediate_check = FALSE;
log_reader_disable_watches(self);
self->suspended = TRUE;
}

static void
log_reader_force_check_in_next_poll(LogReader *self)
{
self->immediate_check = FALSE;
log_reader_disable_watches(self);
self->suspended = FALSE;

Expand Down Expand Up @@ -326,12 +316,6 @@ log_reader_update_watches(LogReader *self)
iv_timer_register(&self->idle_timer);
}

if (self->immediate_check)
{
log_reader_force_check_in_next_poll(self);
return;
}

switch (prepare_action)
{
case LPPA_POLL_IO:
Expand Down Expand Up @@ -574,8 +558,6 @@ log_reader_fetch_log(LogReader *self)
}
log_transport_aux_data_destroy(aux);

if (msg_count == self->options->fetch_limit)
self->immediate_check = TRUE;
return 0;
}

Expand Down Expand Up @@ -785,7 +767,6 @@ log_reader_new(GlobalConfig *cfg)
self->super.wakeup = log_reader_wakeup;
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->immediate_check = FALSE;
self->handshake_in_progress = TRUE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
Expand Down
3 changes: 1 addition & 2 deletions lib/logreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct _LogReader
{
LogSource super;
LogProtoServer *proto;
gboolean immediate_check, handshake_in_progress;
gboolean handshake_in_progress;
LogPipe *control;
LogReaderOptions *options;
PollEvents *poll_events;
Expand Down Expand Up @@ -99,7 +99,6 @@ void log_reader_set_follow_filename(LogReader *self, const gchar *follow_filenam
void log_reader_set_name(LogReader *s, const gchar *name);
void log_reader_set_peer_addr(LogReader *s, GSockAddr *peer_addr);
void log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr);
void log_reader_set_immediate_check(LogReader *s);
void log_reader_disable_bookmark_saving(LogReader *s);
void log_reader_open(LogReader *s, LogProtoServer *proto, PollEvents *poll_events);
void log_reader_close_proto(LogReader *s);
Expand Down
23 changes: 4 additions & 19 deletions modules/affile/file-reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ _deinit_sd_logreader(FileReader *self)
}

static void
_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gboolean check_immediately)
_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto)
{
FileReader *self = (FileReader *) s;

Expand All @@ -172,25 +172,12 @@ _setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gbo
self->owner->super.id,
kb);

if (check_immediately)
log_reader_set_immediate_check(self->reader);

/* NOTE: if the file could not be opened, we ignore the last
* remembered file position, if the file is created in the future
* we're going to read from the start. */
log_pipe_append((LogPipe *) self->reader, s);
}

static gboolean
_is_immediate_check_needed(gboolean file_opened, gboolean open_deferred)
{
if (file_opened)
return TRUE;
else if (open_deferred)
return FALSE;
return FALSE;
}

static gboolean
_reader_open_file(LogPipe *s, gboolean recover_state)
{
Expand All @@ -214,7 +201,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
{
LogProtoServer *proto;
PollEvents *poll_events;
gboolean check_immediately;

poll_events = _construct_poll_events(self, fd);
if (!poll_events)
Expand All @@ -224,8 +210,9 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
}
proto = _construct_proto(self, fd);

check_immediately = _is_immediate_check_needed(file_opened, open_deferred);
_setup_logreader(s, poll_events, proto, check_immediately);
_setup_logreader(s, poll_events, proto);
if (recover_state)
_recover_state(s, cfg, proto);
if (!log_pipe_init((LogPipe *) self->reader))
{
msg_error("Error initializing log_reader, closing fd",
Expand All @@ -235,8 +222,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state)
close(fd);
return FALSE;
}
if (recover_state)
_recover_state(s, cfg, proto);
}
else
{
Expand Down
26 changes: 17 additions & 9 deletions modules/affile/poll-file-changes.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,18 @@ poll_file_changes_stop_watches(PollEvents *s)
}

static void
poll_file_changes_rearm_timer(PollFileChanges *self)
poll_file_changes_rearm_timer(PollFileChanges *self, glong delay)
{
iv_validate_now();
self->follow_timer.expires = iv_now;
timespec_add_msec(&self->follow_timer.expires, self->follow_freq);
timespec_add_msec(&self->follow_timer.expires, delay);
iv_timer_register(&self->follow_timer);
}

static gboolean
poll_file_changes_check_eof(PollFileChanges *self)
{
gint fd = self->fd;
if (fd < 0)
return FALSE;

off_t pos = lseek(fd, 0, SEEK_CUR);
if (pos == (off_t) -1)
Expand All @@ -202,22 +200,32 @@ void
poll_file_changes_update_watches(PollEvents *s, GIOCondition cond)
{
PollFileChanges *self = (PollFileChanges *) s;
gboolean check_again = TRUE;

/* we can only provide input events */
g_assert((cond & ~G_IO_IN) == 0);

poll_file_changes_stop_watches(s);

if (self->fd < 0)
{
/* file does not exist yet, go back checking after follow_freq */
poll_file_changes_rearm_timer(self, self->follow_freq);
return;
}

if (poll_file_changes_check_eof(self))
{
msg_trace("End of file, following file",
evt_tag_str("follow_filename", self->follow_filename));
check_again = poll_file_changes_on_eof(self);
if (poll_file_changes_on_eof(self))
poll_file_changes_rearm_timer(self, self->follow_freq);
}
else
{
msg_trace("File exists and contains data",
evt_tag_str("follow_filename", self->follow_filename));
poll_file_changes_rearm_timer(self, 0);
}

if (check_again)
poll_file_changes_rearm_timer(self);
}

void
Expand Down

0 comments on commit 2032241

Please sign in to comment.