From 33b52e7151fb2eeb8d107b398e4a583274bb5bc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Hejman?= Date: Thu, 18 Jul 2024 14:54:35 +0200 Subject: [PATCH] Add `_bulk` batch size logging (#508) As we've found out, the size of a bulk ingest payload is crucial to the ingest performance. Hence, we should log this piece of information. --------- Co-authored-by: Piotr Grabowski --- quesma/quesma/functionality/bulk/bulk.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/quesma/quesma/functionality/bulk/bulk.go b/quesma/quesma/functionality/bulk/bulk.go index 1986b20a4..551f54440 100644 --- a/quesma/quesma/functionality/bulk/bulk.go +++ b/quesma/quesma/functionality/bulk/bulk.go @@ -13,6 +13,7 @@ import ( "quesma/stats" "quesma/stats/errorstats" "quesma/telemetry" + "sync" ) type ( @@ -37,7 +38,9 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *cli cfg config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (results []WriteResult) { defer recovery.LogPanic() - indicesWithDocumentsToInsert := make(map[string][]types.JSON, len(bulk)) + bulkSize := len(bulk) + maybeLogBatchSize(bulkSize / 2) // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + indicesWithDocumentsToInsert := make(map[string][]types.JSON, bulkSize) err := bulk.BulkForEach(func(op types.BulkOperation, document types.JSON) { @@ -109,3 +112,17 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *cli } return results } + +// Global set to keep track of logged batch sizes +var loggedBatchSizes = make(map[int]struct{}) +var mutex sync.Mutex + +// maybeLogBatchSize logs only unique batch sizes +func maybeLogBatchSize(batchSize int) { + mutex.Lock() + defer mutex.Unlock() + if _, alreadyLogged := loggedBatchSizes[batchSize]; !alreadyLogged { + logger.Info().Msgf("Ingesting via _bulk API, batch size=%d documents", batchSize) + loggedBatchSizes[batchSize] = struct{}{} + } +}