diff --git a/appender.go b/appender.go index 80703eb..ea9ba6a 100644 --- a/appender.go +++ b/appender.go @@ -153,6 +153,7 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) { RetryOnDocumentStatus: cfg.RetryOnDocumentStatus, CompressionLevel: cfg.CompressionLevel, Pipeline: cfg.Pipeline, + RequireDataStream: cfg.RequireDataStream, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %w", err) diff --git a/appender_test.go b/appender_test.go index f6cbeaf..4dc4fa4 100644 --- a/appender_test.go +++ b/appender_test.go @@ -1350,6 +1350,36 @@ func TestAppenderPipeline(t *testing.T) { assert.Equal(t, expected, actual) } +func TestAppenderRequireDataStream(t *testing.T) { + const expected = "true" + var actual string + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + actual = r.URL.Query().Get("require_data_stream") + _, result := docappendertest.DecodeBulkRequest(r) + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.New(client, docappender.Config{ + FlushInterval: time.Minute, + RequireDataStream: true, + }) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + err = indexer.Add(context.Background(), "logs-foo-testing", newJSONReader(map[string]any{ + "@timestamp": time.Unix(123, 456789111).UTC().Format(docappendertest.TimestampFormat), + "data_stream.type": "logs", + "data_stream.dataset": "foo", + "data_stream.namespace": "testing", + })) + require.NoError(t, err) + + // Closing the indexer flushes enqueued documents. + err = indexer.Close(context.Background()) + require.NoError(t, err) + + assert.Equal(t, expected, actual) +} + func TestAppenderScaling(t *testing.T) { newIndexer := func(t *testing.T, cfg docappender.Config) *docappender.Appender { t.Helper() diff --git a/bulk_indexer.go b/bulk_indexer.go index 5d0ea85..0efa378 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -71,6 +71,16 @@ type BulkIndexerConfig struct { // // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. Pipeline string + + // RequireDataStream, If set to true, an index will be created only if a + // matching index template is found and it contains a data stream template. + // When true, `require_data_stream=true` is set in the bulk request. + // When false or not set, `require_data_stream` is not set in the bulk request. + // Which could cause a classic index to be created if no data stream template + // matches the index in the request. + // + // RequireDataStream is disabled by default. + RequireDataStream bool } // BulkIndexer issues bulk requests to Elasticsearch. It is NOT safe for concurrent use @@ -86,6 +96,7 @@ type BulkIndexer struct { copyBuf []byte buf bytes.Buffer retryCounts map[int]int + requireDataStream bool } type BulkIndexerResponseStat struct { @@ -195,8 +206,9 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { } b := &BulkIndexer{ - config: cfg, - retryCounts: make(map[int]int), + config: cfg, + retryCounts: make(map[int]int), + requireDataStream: cfg.RequireDataStream, } // use a len check instead of a nil check because document level retries @@ -328,6 +340,9 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error FilterPath: []string{"items.*._index", "items.*.status", "items.*.error.type", "items.*.error.reason"}, Pipeline: b.config.Pipeline, } + if b.requireDataStream { + req.RequireDataStream = &b.requireDataStream + } if b.gzipw != nil { req.Header.Set("Content-Encoding", "gzip") } diff --git a/config.go b/config.go index 83ab497..c7a268e 100644 --- a/config.go +++ b/config.go @@ -90,6 +90,16 @@ type Config struct { // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. Pipeline string + // RequireDataStream, If set to true, an index will be created only if a + // matching index template is found and it contains a data stream template. + // When true, `require_data_stream=true` is set in the bulk request. + // When false or not set, `require_data_stream` is not set in the bulk request. + // Which could cause a classic index to be created if no data stream template + // matches the index in the request. + // + // RequireDataStream is disabled by default. + RequireDataStream bool + // Scaling configuration for the docappender. // // If unspecified, scaling is enabled by default.