diff --git a/lib/logproto/logproto-buffered-server.h b/lib/logproto/logproto-buffered-server.h index 55a92f969..1b5019308 100644 --- a/lib/logproto/logproto-buffered-server.h +++ b/lib/logproto/logproto-buffered-server.h @@ -118,6 +118,8 @@ LogProtoPrepareAction log_proto_buffered_server_prepare(LogProtoServer *s, GIOCo gint *timeout G_GNUC_UNUSED); LogProtoBufferedServerState *log_proto_buffered_server_get_state(LogProtoBufferedServer *self); void log_proto_buffered_server_put_state(LogProtoBufferedServer *self); +gboolean log_proto_buffered_server_restart_with_state(LogProtoServer *s, + PersistState *persist_state, const gchar *persist_name); /* LogProtoBufferedServer */ gboolean log_proto_buffered_server_validate_options_method(LogProtoServer *s); diff --git a/lib/logproto/logproto-text-server.c b/lib/logproto/logproto-text-server.c index 674fb686d..15f4136ed 100644 --- a/lib/logproto/logproto-text-server.c +++ b/lib/logproto/logproto-text-server.c @@ -291,6 +291,33 @@ log_proto_text_server_set_multi_line(LogProtoServer *s, MultiLineLogic *multi_li self->multi_line = multi_line; } +static gboolean +log_proto_text_server_restart_with_state(LogProtoServer *s, PersistState *persist_state, const gchar *persist_name) +{ + LogProtoTextServer *self = (LogProtoTextServer *) s; + + gboolean res = log_proto_buffered_server_restart_with_state(s, persist_state, persist_name); + if (!res) + return FALSE; + + if (!self->super.buffer) + return FALSE; + + LogProtoBufferedServerState *state = log_proto_buffered_server_get_state(&self->super); + const guchar *buffer_start = self->super.buffer + state->pending_buffer_pos; + gsize buffer_bytes = state->pending_buffer_end - state->pending_buffer_pos; + + if (buffer_bytes > 0) + { + const guchar *eom = self->find_eom(buffer_start, buffer_bytes); + if (eom) + self->cached_eol_pos = eom - self->super.buffer; + } + log_proto_buffered_server_put_state(&self->super); + + return TRUE; +} + void log_proto_text_server_free(LogProtoServer *s) { @@ -306,6 +333,7 @@ log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, co log_proto_buffered_server_init(&self->super, transport, options); self->super.super.prepare = log_proto_text_server_prepare_method; self->super.super.free_fn = log_proto_text_server_free; + self->super.super.restart_with_state = log_proto_text_server_restart_with_state; self->super.fetch_from_buffer = log_proto_text_server_fetch_from_buffer; self->super.flush = log_proto_text_server_flush; self->find_eom = find_eom; diff --git a/lib/logreader.c b/lib/logreader.c index f59a28c07..8b558f8bf 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -55,14 +55,6 @@ log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr) self->local_addr = g_sockaddr_ref(local_addr); } -void -log_reader_set_immediate_check(LogReader *s) -{ - LogReader *self = (LogReader *) s; - - self->immediate_check = TRUE; -} - void log_reader_set_options(LogReader *s, LogPipe *control, LogReaderOptions *options, const gchar *stats_id, StatsClusterKeyBuilder *kb) @@ -160,7 +152,6 @@ log_reader_disable_watches(LogReader *self) static void log_reader_suspend_until_awoken(LogReader *self) { - self->immediate_check = FALSE; log_reader_disable_watches(self); self->suspended = TRUE; } @@ -168,7 +159,6 @@ log_reader_suspend_until_awoken(LogReader *self) static void log_reader_force_check_in_next_poll(LogReader *self) { - self->immediate_check = FALSE; log_reader_disable_watches(self); self->suspended = FALSE; @@ -326,12 +316,6 @@ log_reader_update_watches(LogReader *self) iv_timer_register(&self->idle_timer); } - if (self->immediate_check) - { - log_reader_force_check_in_next_poll(self); - return; - } - switch (prepare_action) { case LPPA_POLL_IO: @@ -574,8 +558,6 @@ log_reader_fetch_log(LogReader *self) } log_transport_aux_data_destroy(aux); - if (msg_count == self->options->fetch_limit) - self->immediate_check = TRUE; return 0; } @@ -785,7 +767,6 @@ log_reader_new(GlobalConfig *cfg) self->super.wakeup = log_reader_wakeup; self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc; self->super.metrics.raw_bytes_enabled = TRUE; - self->immediate_check = FALSE; self->handshake_in_progress = TRUE; log_reader_init_watches(self); g_mutex_init(&self->pending_close_lock); diff --git a/lib/logreader.h b/lib/logreader.h index fa0ed3422..563ffdcaa 100644 --- a/lib/logreader.h +++ b/lib/logreader.h @@ -62,7 +62,7 @@ struct _LogReader { LogSource super; LogProtoServer *proto; - gboolean immediate_check, handshake_in_progress; + gboolean handshake_in_progress; LogPipe *control; LogReaderOptions *options; PollEvents *poll_events; @@ -99,7 +99,6 @@ void log_reader_set_follow_filename(LogReader *self, const gchar *follow_filenam void log_reader_set_name(LogReader *s, const gchar *name); void log_reader_set_peer_addr(LogReader *s, GSockAddr *peer_addr); void log_reader_set_local_addr(LogReader *s, GSockAddr *local_addr); -void log_reader_set_immediate_check(LogReader *s); void log_reader_disable_bookmark_saving(LogReader *s); void log_reader_open(LogReader *s, LogProtoServer *proto, PollEvents *poll_events); void log_reader_close_proto(LogReader *s); diff --git a/modules/affile/file-reader.c b/modules/affile/file-reader.c index 7439d6dd5..ab9086773 100644 --- a/modules/affile/file-reader.c +++ b/modules/affile/file-reader.c @@ -155,7 +155,7 @@ _deinit_sd_logreader(FileReader *self) } static void -_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gboolean check_immediately) +_setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto) { FileReader *self = (FileReader *) s; @@ -172,25 +172,12 @@ _setup_logreader(LogPipe *s, PollEvents *poll_events, LogProtoServer *proto, gbo self->owner->super.id, kb); - if (check_immediately) - log_reader_set_immediate_check(self->reader); - /* NOTE: if the file could not be opened, we ignore the last * remembered file position, if the file is created in the future * we're going to read from the start. */ log_pipe_append((LogPipe *) self->reader, s); } -static gboolean -_is_immediate_check_needed(gboolean file_opened, gboolean open_deferred) -{ - if (file_opened) - return TRUE; - else if (open_deferred) - return FALSE; - return FALSE; -} - static gboolean _reader_open_file(LogPipe *s, gboolean recover_state) { @@ -214,7 +201,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state) { LogProtoServer *proto; PollEvents *poll_events; - gboolean check_immediately; poll_events = _construct_poll_events(self, fd); if (!poll_events) @@ -224,8 +210,9 @@ _reader_open_file(LogPipe *s, gboolean recover_state) } proto = _construct_proto(self, fd); - check_immediately = _is_immediate_check_needed(file_opened, open_deferred); - _setup_logreader(s, poll_events, proto, check_immediately); + _setup_logreader(s, poll_events, proto); + if (recover_state) + _recover_state(s, cfg, proto); if (!log_pipe_init((LogPipe *) self->reader)) { msg_error("Error initializing log_reader, closing fd", @@ -235,8 +222,6 @@ _reader_open_file(LogPipe *s, gboolean recover_state) close(fd); return FALSE; } - if (recover_state) - _recover_state(s, cfg, proto); } else { diff --git a/modules/affile/poll-file-changes.c b/modules/affile/poll-file-changes.c index b8669497b..cb76affda 100644 --- a/modules/affile/poll-file-changes.c +++ b/modules/affile/poll-file-changes.c @@ -169,11 +169,11 @@ poll_file_changes_stop_watches(PollEvents *s) } static void -poll_file_changes_rearm_timer(PollFileChanges *self) +poll_file_changes_rearm_timer(PollFileChanges *self, glong delay) { iv_validate_now(); self->follow_timer.expires = iv_now; - timespec_add_msec(&self->follow_timer.expires, self->follow_freq); + timespec_add_msec(&self->follow_timer.expires, delay); iv_timer_register(&self->follow_timer); } @@ -181,8 +181,6 @@ static gboolean poll_file_changes_check_eof(PollFileChanges *self) { gint fd = self->fd; - if (fd < 0) - return FALSE; off_t pos = lseek(fd, 0, SEEK_CUR); if (pos == (off_t) -1) @@ -202,22 +200,32 @@ void poll_file_changes_update_watches(PollEvents *s, GIOCondition cond) { PollFileChanges *self = (PollFileChanges *) s; - gboolean check_again = TRUE; /* we can only provide input events */ g_assert((cond & ~G_IO_IN) == 0); poll_file_changes_stop_watches(s); + if (self->fd < 0) + { + /* file does not exist yet, go back checking after follow_freq */ + poll_file_changes_rearm_timer(self, self->follow_freq); + return; + } + if (poll_file_changes_check_eof(self)) { msg_trace("End of file, following file", evt_tag_str("follow_filename", self->follow_filename)); - check_again = poll_file_changes_on_eof(self); + if (poll_file_changes_on_eof(self)) + poll_file_changes_rearm_timer(self, self->follow_freq); + } + else + { + msg_trace("File exists and contains data", + evt_tag_str("follow_filename", self->follow_filename)); + poll_file_changes_rearm_timer(self, 0); } - - if (check_again) - poll_file_changes_rearm_timer(self); } void