Skip to content

Commit

Permalink
Support require_data_stream new config option (#180)
Browse files Browse the repository at this point in the history
Adds a new `RequireDataStream` option to the Appender, and BulkIndexer
configurations. When set, it will send `?require_data_stream=true` in
the `_bulk` request to Elasticsearch.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Jun 7, 2024
1 parent 5aa2b40 commit 16dea31
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
1 change: 1 addition & 0 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) {
RetryOnDocumentStatus: cfg.RetryOnDocumentStatus,
CompressionLevel: cfg.CompressionLevel,
Pipeline: cfg.Pipeline,
RequireDataStream: cfg.RequireDataStream,
})
if err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %w", err)
Expand Down
30 changes: 30 additions & 0 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,36 @@ func TestAppenderPipeline(t *testing.T) {
assert.Equal(t, expected, actual)
}

func TestAppenderRequireDataStream(t *testing.T) {
const expected = "true"
var actual string
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
actual = r.URL.Query().Get("require_data_stream")
_, result := docappendertest.DecodeBulkRequest(r)
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
RequireDataStream: true,
})
require.NoError(t, err)
defer indexer.Close(context.Background())

err = indexer.Add(context.Background(), "logs-foo-testing", newJSONReader(map[string]any{
"@timestamp": time.Unix(123, 456789111).UTC().Format(docappendertest.TimestampFormat),
"data_stream.type": "logs",
"data_stream.dataset": "foo",
"data_stream.namespace": "testing",
}))
require.NoError(t, err)

// Closing the indexer flushes enqueued documents.
err = indexer.Close(context.Background())
require.NoError(t, err)

assert.Equal(t, expected, actual)
}

func TestAppenderScaling(t *testing.T) {
newIndexer := func(t *testing.T, cfg docappender.Config) *docappender.Appender {
t.Helper()
Expand Down
19 changes: 17 additions & 2 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ type BulkIndexerConfig struct {
//
// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
Pipeline string

// RequireDataStream, If set to true, an index will be created only if a
// matching index template is found and it contains a data stream template.
// When true, `require_data_stream=true` is set in the bulk request.
// When false or not set, `require_data_stream` is not set in the bulk request.
// Which could cause a classic index to be created if no data stream template
// matches the index in the request.
//
// RequireDataStream is disabled by default.
RequireDataStream bool
}

// BulkIndexer issues bulk requests to Elasticsearch. It is NOT safe for concurrent use
Expand All @@ -86,6 +96,7 @@ type BulkIndexer struct {
copyBuf []byte
buf bytes.Buffer
retryCounts map[int]int
requireDataStream bool
}

type BulkIndexerResponseStat struct {
Expand Down Expand Up @@ -195,8 +206,9 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) {
}

b := &BulkIndexer{
config: cfg,
retryCounts: make(map[int]int),
config: cfg,
retryCounts: make(map[int]int),
requireDataStream: cfg.RequireDataStream,
}

// use a len check instead of a nil check because document level retries
Expand Down Expand Up @@ -328,6 +340,9 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
FilterPath: []string{"items.*._index", "items.*.status", "items.*.error.type", "items.*.error.reason"},
Pipeline: b.config.Pipeline,
}
if b.requireDataStream {
req.RequireDataStream = &b.requireDataStream
}
if b.gzipw != nil {
req.Header.Set("Content-Encoding", "gzip")
}
Expand Down
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ type Config struct {
// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
Pipeline string

// RequireDataStream, If set to true, an index will be created only if a
// matching index template is found and it contains a data stream template.
// When true, `require_data_stream=true` is set in the bulk request.
// When false or not set, `require_data_stream` is not set in the bulk request.
// Which could cause a classic index to be created if no data stream template
// matches the index in the request.
//
// RequireDataStream is disabled by default.
RequireDataStream bool

// Scaling configuration for the docappender.
//
// If unspecified, scaling is enabled by default.
Expand Down

0 comments on commit 16dea31

Please sign in to comment.