diff --git a/quesma/quesma/functionality/bulk/bulk.go b/quesma/quesma/functionality/bulk/bulk.go index 1986b20a4..551f54440 100644 --- a/quesma/quesma/functionality/bulk/bulk.go +++ b/quesma/quesma/functionality/bulk/bulk.go @@ -13,6 +13,7 @@ import ( "quesma/stats" "quesma/stats/errorstats" "quesma/telemetry" + "sync" ) type ( @@ -37,7 +38,9 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *cli cfg config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (results []WriteResult) { defer recovery.LogPanic() - indicesWithDocumentsToInsert := make(map[string][]types.JSON, len(bulk)) + bulkSize := len(bulk) + maybeLogBatchSize(bulkSize / 2) // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + indicesWithDocumentsToInsert := make(map[string][]types.JSON, bulkSize) err := bulk.BulkForEach(func(op types.BulkOperation, document types.JSON) { @@ -109,3 +112,17 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *cli } return results } + +// Global set to keep track of logged batch sizes +var loggedBatchSizes = make(map[int]struct{}) +var mutex sync.Mutex + +// maybeLogBatchSize logs only unique batch sizes +func maybeLogBatchSize(batchSize int) { + mutex.Lock() + defer mutex.Unlock() + if _, alreadyLogged := loggedBatchSizes[batchSize]; !alreadyLogged { + logger.Info().Msgf("Ingesting via _bulk API, batch size=%d documents", batchSize) + loggedBatchSizes[batchSize] = struct{}{} + } +}