Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise wait_for_integration ES implementation #12150

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ https://github.com/elastic/apm-server/compare/8.11\...main[View commits]
[float]
==== Breaking Changes
- The unsupported apm_data_stream_migration ingest pipeline has been removed {pull}12102[12102].
- "publish_ready" is always false in the "GET /" response until events are received by apm-server {pull}12150[12150]

[float]
==== Bug fixes
Expand All @@ -23,3 +24,4 @@ https://github.com/elastic/apm-server/compare/8.11\...main[View commits]
- Add support for returning partial success response in OTLP input {pull}11883[11883]
- Setting event timestamp from OTel observed timestamp when needed {pull}11935[11935]
- Field mappings have been added for various formerly unindexed fields {pull}12102[12102]
- We now assert that index templates are installed by attempting to create data streams {pull}12150[12150]
46 changes: 31 additions & 15 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"os"
"runtime"
"sync"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -42,11 +43,9 @@ import (
_ "google.golang.org/grpc/encoding/gzip"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/outputs"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
agentconfig "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -347,7 +346,13 @@ func (s *Runner) Run(ctx context.Context) error {
// any events to Elasticsearch before the integration is ready.
publishReady := make(chan struct{})
drain := make(chan struct{})
startWaitReady := make(chan struct{})
var waitReadyOnce sync.Once
g.Go(func() error {
select {
case <-ctx.Done():
case <-startWaitReady:
}
if err := s.waitReady(ctx, kibanaClient, tracer); err != nil {
// One or more preconditions failed; drop events.
close(drain)
Expand All @@ -358,24 +363,25 @@ func (s *Runner) Run(ctx context.Context) error {
close(publishReady)
return nil
})
callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error {
prePublish := func(ctx context.Context) error {
waitReadyOnce.Do(func() {
close(startWaitReady)
})
select {
case <-ctx.Done():
return ctx.Err()
case <-drain:
return errServerShuttingDown
case <-publishReady:
return nil
default:
}
return errors.New("not ready for publishing events")
})
if err != nil {
return err
return nil
}
defer esoutput.DeregisterConnectCallback(callbackUUID)
newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) {
httpTransport, err := elasticsearch.NewHTTPTransport(cfg)
if err != nil {
return nil, err
}
transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
transport := &waitReadyRoundTripper{Transport: httpTransport, onBulk: prePublish}
return elasticsearch.NewClientParams(elasticsearch.ClientParams{
Config: cfg,
Transport: transport,
Expand Down Expand Up @@ -432,7 +438,7 @@ func (s *Runner) Run(ctx context.Context) error {
// Create the BatchProcessor chain that is used to process all events,
// including the metrics aggregated by APM Server.
finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(
tracer, newElasticsearchClient, memLimitGB,
tracer, newElasticsearchClient, memLimitGB, prePublish,
)
if err != nil {
return err
Expand Down Expand Up @@ -647,7 +653,9 @@ func (s *Runner) waitReady(
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
preconditions = append(preconditions, func(ctx context.Context) error {
return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger)
return checkIndexTemplatesInstalled(
ctx, kibanaClient, esOutputClient, s.config.DataStreams.Namespace, s.logger,
)
})
}

Expand All @@ -672,12 +680,13 @@ func (s *Runner) newFinalBatchProcessor(
tracer *apm.Tracer,
newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
memLimit float64,
prePublish func(context.Context) error,
) (modelpb.BatchProcessor, func(context.Context) error, error) {

monitoring.Default.Remove("libbeat")
libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
if s.elasticsearchOutputConfig == nil {
return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry)
return s.newLibbeatFinalBatchProcessor(tracer, prePublish, libbeatMonitoringRegistry)
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
Expand Down Expand Up @@ -829,6 +838,7 @@ func docappenderConfig(

func (s *Runner) newLibbeatFinalBatchProcessor(
tracer *apm.Tracer,
prePublish func(context.Context) error,
libbeatMonitoringRegistry *monitoring.Registry,
) (modelpb.BatchProcessor, func(context.Context) error, error) {
// When the publisher stops cleanly it will close its pipeline client,
Expand Down Expand Up @@ -889,7 +899,13 @@ func (s *Runner) newLibbeatFinalBatchProcessor(
}
return acker.Wait(ctx)
}
return publisher, stop, nil
processor := modelprocessor.Chained{
modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what's the reason for this ? Isn't this handled by the es client ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is for libbeat outputs, which is anything other than Elasticsearch.

return prePublish(ctx)
}),
publisher,
}
return processor, stop, nil
}

const sourcemapIndex = ".apm-source-map"
Expand Down
3 changes: 2 additions & 1 deletion internal/beater/beatertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server {
require.NoError(t, err)
if !outputConfig.Output.IsSet() {
err = cfg.Merge(map[string]any{
"output.null": map[string]any{},
"output.null": map[string]any{},
"queue.mem.flush": map[string]any{"min_events": 1, "timeout": "1ns"},
})
require.NoError(t, err)
}
Expand Down
91 changes: 37 additions & 54 deletions internal/beater/checkintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,26 @@ import (
"fmt"
"io"
"net/http"
"strings"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/kibana"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/typedapi/indices/createdatastream"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)

// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana
// and/or Elasticsearch, returning nil if and only if it is installed.
func checkIntegrationInstalled(
// checkIndexTemplatesInstalled checks if the APM index templates are installed by querying the
// APM integration status via Kibana, or by attempting to create a data stream via Elasticsearch,
// returning nil if and only if it is installed.
func checkIndexTemplatesInstalled(
ctx context.Context,
kibanaClient *kibana.Client,
esClient *elasticsearch.Client,
namespace string,
logger *logp.Logger,
) (err error) {
defer func() {
Expand All @@ -53,37 +56,36 @@ func checkIntegrationInstalled(
}
}
}()
if esClient != nil {
installed, err := checkCreateDataStream(ctx, esClient, namespace)
if err != nil {
return fmt.Errorf("error checking Elasticsearch index template setup: %w", err)
}
if !installed {
return errors.New("index templates not installed")
}
return nil
}
if kibanaClient != nil {
installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger)
installed, err := checkIntegrationInstalled(ctx, kibanaClient, logger)
if err != nil {
// We only return the Kibana error if we have no Elasticsearch client,
// as we may not have sufficient privileges to query the Fleet API.
if esClient == nil {
return fmt.Errorf("error querying Kibana for integration package status: %w", err)
}
} else if !installed {
}
if !installed {
// We were able to query Kibana, but the package is not yet installed.
// We should continue querying the package status via Kibana, as it is
// more authoritative than checking for index template installation.
return errors.New("integration package not yet installed")
}
// Fall through and query Elasticsearch (if we have a client). Kibana may prematurely
// report packages as installed: https://github.com/elastic/kibana/issues/108649
}
if esClient != nil {
installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger)
if err != nil {
return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err)
} else if !installed {
return errors.New("integration index templates not installed")
}
}
return nil
}

// checkIntegrationInstalledKibana checks if the APM integration package
// is installed by querying Kibana.
func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
return false, err
Expand All @@ -106,41 +108,22 @@ func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.C
return result.Response.Status == "installed", nil
}

func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) {
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-apm.sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
}
for _, intervals := range []string{"1m", "10m", "60m"} {
for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} {
templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals))
}
}
// IndicesGetIndexTemplateRequest accepts a slice of template names,
// but the REST API expects just one index template name. Query them
// in parallel.
g, ctx := errgroup.WithContext(ctx)
for _, template := range templates {
template := template // copy for closure
g.Go(func() error {
req := esapi.IndicesGetIndexTemplateRequest{Name: template}
resp, err := req.Do(ctx, esClient)
if err != nil {
return err
// checkCreateDataStream attempts to create a traces-apm-<namespace> data stream,
// returning an error if it could not be created. This will fail if there is no
// index template matching the pattern.
func checkCreateDataStream(ctx context.Context, esClient *elasticsearch.Client, namespace string) (bool, error) {
if _, err := createdatastream.New(esClient).Name("traces-apm-" + namespace).Do(ctx); err != nil {
var esError *types.ElasticsearchError
if errors.As(err, &esError) {
cause := esError.ErrorCause
if cause.Type == "resource_already_exists_exception" {
return true, nil
}
defer resp.Body.Close()

if resp.IsError() {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body))
if cause.Reason != nil && strings.HasPrefix(*cause.Reason, "no matching index template") {
return false, nil
}
return nil
})
}
return false, err
}
err := g.Wait()
return err == nil, err
return true, nil
}
46 changes: 31 additions & 15 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestServerOTLPGRPC(t *testing.T) {

func TestServerWaitForIntegrationKibana(t *testing.T) {
var requests int64
requestCh := make(chan struct{})
requestCh := make(chan struct{}, 3)
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
Expand Down Expand Up @@ -363,6 +363,14 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {
},
})))

// Send some events to the server. They should be accepted and enqueued.
req := makeTransactionRequest(t, srv.URL)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := srv.Client.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
resp.Body.Close()

timeout := time.After(10 * time.Second)
for i := 0; i < 3; i++ {
select {
Expand All @@ -387,8 +395,8 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
var tracesRequests int
tracesRequestsCh := make(chan int)
var createDataStreamRequests int
createDataStreamRequestsCh := make(chan int)
bulkCh := make(chan struct{}, 1)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -397,17 +405,24 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/_data_stream/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
template := path.Base(r.URL.Path)
if template == "traces-apm" {
tracesRequests++
if tracesRequests == 1 {
w.WriteHeader(404)
}
tracesRequestsCh <- tracesRequests
name := path.Base(r.URL.Path)
if name != "traces-apm-testing" {
panic("unexpected data stream name: " + name)
}
createDataStreamRequests++
switch createDataStreamRequests {
case 1:
w.WriteHeader(500)
case 2:
w.WriteHeader(400)
w.Write([]byte(`{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"}],"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"},"status":400}`))
case 3:
w.Write([]byte(`{"acknowledged":true}`))
}
createDataStreamRequestsCh <- createDataStreamRequests
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
select {
Expand All @@ -422,6 +437,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
"apm-server": map[string]interface{}{
"wait_ready_interval": "100ms",
"data_streams.wait_for_integration": true,
"data_streams.namespace": "testing",
},
"output.elasticsearch": map[string]interface{}{
"hosts": []string{elasticsearchServer.URL},
Expand Down Expand Up @@ -456,8 +472,8 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var done bool
for !done {
select {
case n := <-tracesRequestsCh:
done = n == 2
case n := <-createDataStreamRequestsCh:
done = n == 3
case <-timeout:
t.Fatal("timed out waiting for request")
}
Expand All @@ -471,7 +487,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
}

logs := srv.Logs.FilterMessageSnippet("please install the apm integration")
assert.Len(t, logs.All(), 1, "couldn't find remediation message logs")
assert.Len(t, logs.All(), 2, "couldn't find remediation message logs")

// Healthcheck should now report that the server is publish-ready.
resp, err = srv.Client.Get(srv.URL + api.RootPath)
Expand All @@ -490,7 +506,7 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) {
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/_data_stream/traces-apm-default", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading
Loading