Skip to content

Commit

Permalink
fix first record missing from journal
Browse files Browse the repository at this point in the history
This change fixes the condition when the first record after telempostd
starts is not inserted in the telemetry journal.

Notice! there is a change in the logic, while previously records
where inserted to juornal as soon the record was processed. After this
change records will be inserted in the journal only when successfully
delivered or record_server_delivery_enabled is set to false.
  • Loading branch information
alexjch committed Feb 24, 2020
1 parent 9db4e1d commit f4f012a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
27 changes: 15 additions & 12 deletions src/telempostdaemon.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ static bool deliver_record(TelemPostDaemon *daemon, char *headers[], char *body,
return ret;
}

bool process_staged_record(char *filename, bool is_retry, TelemPostDaemon *daemon)
bool process_staged_record(char *filename, TelemPostDaemon *daemon)
{
int k;
bool ret = false;
Expand Down Expand Up @@ -530,20 +530,12 @@ bool process_staged_record(char *filename, bool is_retry, TelemPostDaemon *daemo
goto end_processing_file;
}

/* Retries should not be recorded */
if (is_retry == false) {
/** Journal entry **/
save_entry_to_journal(daemon, current_time, headers);
/** Record retention **/
apply_retention_policies(daemon, body);
}

/** Record delivery **/
if (!daemon->record_server_delivery_enabled) {
telem_log(LOG_INFO, "record server delivery disabled\n");
// Not an error condition
ret = true;
goto end_processing_file;
goto end_record_delivery;
}

/** Spool policies **/
Expand Down Expand Up @@ -571,6 +563,17 @@ bool process_staged_record(char *filename, bool is_retry, TelemPostDaemon *daemo
/** Deliver or spool **/
ret = deliver_record(daemon, headers, body, cfg_file);

end_record_delivery:
/** Save record once it is properly delivered, if record
* is spooled the record is not saved to journal until
* delievered on a re-try **/
if (ret) {
/** Save to journal **/
save_entry_to_journal(daemon, current_time, headers);
/** Record retention **/
apply_retention_policies(daemon, body);
}

end_processing_file:
/** Update spool size if record will be removed **/
if (ret) {
Expand Down Expand Up @@ -625,7 +628,7 @@ int staging_records_loop(TelemPostDaemon *daemon)
telem_log(LOG_ERR, "Failed to allocate memory for staging record full path\n");
exit(EXIT_FAILURE);
}
if (process_staged_record(record_path, true, daemon)) {
if (process_staged_record(record_path, daemon)) {
unlink(record_path);
processed++;
}
Expand Down Expand Up @@ -721,7 +724,7 @@ void run_daemon(TelemPostDaemon *daemon)
exit(EXIT_FAILURE);
}
/* Process inotify event */
if (process_staged_record(record_name, false, daemon)) {
if (process_staged_record(record_name, daemon)) {
unlink(record_name);
}
free(record_name);
Expand Down
5 changes: 2 additions & 3 deletions src/telempostdaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ void close_daemon(TelemPostDaemon *daemon);
* Processed record written on disk
*
* @param filename a pointor to record on disk
* @param is_retry a boolean value that indicates if
* the record has been previously processed.
* @param daemon post to telemetry post daemon
*/
bool process_staged_record(char *filename, bool is_retry, TelemPostDaemon *daemon);
bool process_staged_record(char *filename, TelemPostDaemon *daemon);

/**
* Scans staging directory to process files that were
Expand All @@ -107,6 +105,7 @@ int staging_records_loop(TelemPostDaemon *daemon);
* @param body a pointer to the payload
* @param cfg_file a pointer to a non-default configuration
* file to be used.
* @return true if successful, false otherwise
*/
bool post_record_http(char *headers[], char *body, char *cfg_file);

Expand Down
8 changes: 4 additions & 4 deletions tests/check_postd.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ START_TEST(check_handle_client_with_no_data)
bool success;
char *filename = ABSTOPSRCDIR "/tests/telempostd/empty_message";

success = process_staged_record(filename, false, &tdaemon);
success = process_staged_record(filename, &tdaemon);
// Return true to remove corrupted record
ck_assert(success == true);
}
Expand All @@ -75,7 +75,7 @@ START_TEST(check_handle_client_with_incorrect_data)
bool success;
char *filename = ABSTOPSRCDIR "/tests/telempostd/incorrect_message";

success = process_staged_record(filename, false, &tdaemon);
success = process_staged_record(filename, &tdaemon);
// Return true to remove corrupted record
ck_assert(success == true);
}
Expand All @@ -88,7 +88,7 @@ START_TEST(check_process_record_with_correct_size_and_data)
bool success;
char *filename = ABSTOPSRCDIR "/tests/telempostd/correct_message";

success = process_staged_record(filename, false, &tdaemon);
success = process_staged_record(filename, &tdaemon);
ck_assert(success == true);
}
END_TEST
Expand All @@ -100,7 +100,7 @@ START_TEST(check_process_record_with_incorrect_headers)
bool success;
char *filename = ABSTOPSRCDIR "/tests/telempostd/incorrect_headers";

success = process_staged_record(filename, false, &tdaemon);
success = process_staged_record(filename, &tdaemon);
// Return true to remove corrupted record
ck_assert(success == true);
}
Expand Down

0 comments on commit f4f012a

Please sign in to comment.