-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ES integration test race conditions #6229
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ import ( | |
"path/filepath" | ||
"strings" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/spf13/viper" | ||
"go.opentelemetry.io/otel" | ||
|
@@ -185,7 +186,9 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { | |
|
||
// CreateSpanWriter implements storage.Factory | ||
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { | ||
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) | ||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
defer cancel() | ||
return createSpanWriter(ctx, f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) | ||
} | ||
|
||
// CreateDependencyReader implements storage.Factory | ||
|
@@ -238,6 +241,7 @@ func createSpanReader( | |
} | ||
|
||
func createSpanWriter( | ||
ctx context.Context, | ||
clientFn func() es.Client, | ||
cfg *config.Configuration, | ||
archive bool, | ||
|
@@ -276,13 +280,36 @@ func createSpanWriter( | |
if err != nil { | ||
return nil, err | ||
} | ||
if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.Indices.IndexPrefix); err != nil { | ||
if err := CreateTemplatesWithRetry(ctx, writer, spanMapping, serviceMapping, cfg.Indices.IndexPrefix, logger); err != nil { | ||
return nil, err | ||
} | ||
} | ||
return writer, nil | ||
} | ||
|
||
func CreateTemplatesWithRetry( | ||
ctx context.Context, | ||
writer *esSpanStore.SpanWriter, | ||
spanMapping string, | ||
serviceMapping string, | ||
indexPrefix cfg.IndexPrefix, | ||
logger *zap.Logger, | ||
) error { | ||
for i := 0; i < 3; i++ { | ||
select { | ||
case <-ctx.Done(): | ||
return fmt.Errorf("template creation timeout: %w", ctx.Err()) | ||
default: | ||
if err := writer.CreateTemplates(ctx, spanMapping, serviceMapping, indexPrefix); err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not seeing how this addresses the original issue. The assumption was that when we call CreateTemplates it may return before the templates are actually created in the ES, and the immediately-after span writes might fail. Doing retries does not address that. I would also like to understand if that assumption is correct in the first place and why it might happen - is it indeed that ES acks a success before the template changes are actually applied to the db? Or could it be our fault, e.g. by using an async connection? We do something funky with batch inserts that happens in the background, that could be the source of a race. |
||
return nil | ||
} | ||
logger.Warn("Template creation failed, retrying", zap.Int("attempt", i+1)) | ||
time.Sleep(time.Duration(100*(1<<i)) * time.Millisecond) | ||
} | ||
} | ||
return errors.New("failed to create templates after retries") | ||
} | ||
|
||
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) { | ||
params := esSampleStore.Params{ | ||
Client: f.getPrimaryClient, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.