diff --git a/components/consumers/elasticsearch/main.go b/components/consumers/elasticsearch/main.go index 81c3d3c5a..52f271a02 100644 --- a/components/consumers/elasticsearch/main.go +++ b/components/consumers/elasticsearch/main.go @@ -10,12 +10,14 @@ import ( "strings" "time" + elasticsearch "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/go-errors/errors" + v1 "github.com/ocurity/dracon/api/proto/v1" "github.com/ocurity/dracon/components/consumers" "github.com/ocurity/dracon/pkg/enumtransformers" "github.com/ocurity/dracon/pkg/templating" - - elasticsearch "github.com/elastic/go-elasticsearch/v8" ) var ( @@ -61,19 +63,21 @@ func main() { flag.Parse() if err := parseFlags(); err != nil { - log.Fatal(err) + log.Fatalf("could not parse incoming flags error: %s", err) } + slog.Debug("connecting to elasticsearch") es, err := getESClient() if err != nil { - log.Fatal("could not contact remote Elasticsearch: ", err) + log.Fatalf("could not contact remote Elasticsearch error: %s", err) } + slog.Debug("successfully connected to elasticsearch") if consumers.Raw { slog.Debug("Parsing Raw results") responses, err := consumers.LoadToolResponse() if err != nil { - log.Fatal("could not load raw results, file malformed: ", err) + log.Fatalf("could not load raw results, file malformed: %s", err) } numIssues := 0 for _, res := range responses { @@ -85,18 +89,17 @@ func main() { log.Fatal("Could not parse raw issue", err) } res, err := es.Index(esIndex, bytes.NewBuffer(b)) - log.Printf("%+v", res) - if err != nil { - log.Fatal("Could not push raw issue", err) + if err != nil || res.StatusCode != 200 || res.IsError() { + log.Fatalf("could not push raw issue to index: %s, status code received: %d, elasticsearch result: %s, error:%s", esIndex, res.StatusCode, dumpStringResponse(res), err) } } } - slog.Info("Pushed", "numIssues", numIssues, "issues to Elasticsearch", "") + slog.Info("Pushed issues to Elasticsearch", slog.Int("numIssues", numIssues)) } else { - log.Print("Parsing Enriched results") + slog.Debug("Parsing Enriched results") responses, err := consumers.LoadEnrichedToolResponse() if err != nil { - log.Fatal("could not load enriched results, file malformed: ", err) + log.Fatalf("could not load enriched results, file malformed error: %s ", err) } numIssues := 0 for _, res := range responses { @@ -105,22 +108,24 @@ func main() { for _, iss := range res.GetIssues() { b, err := getEnrichedIssue(scanStartTime, res, iss) if err != nil { - log.Fatal("Could not parse enriched issue", err) + log.Fatalf("Could not parse enriched issue error:%s", err) } res, err := es.Index(esIndex, bytes.NewBuffer(b)) - if err != nil || res.IsError() { - log.Fatal("Could not push enriched issue", err, "received", res.StatusCode) + if err != nil || res.StatusCode != 200 || res.IsError() { + log.Fatalf("could not push enriched issue to index: %s, status code received: %d, elasticsearch result: %s, error:%s", esIndex, res.StatusCode, dumpStringResponse(res), err) } } } - slog.Info("Pushed", "numIssues", numIssues, "issues to Elasticsearch", "") + slog.Info("Pushed issues to Elasticsearch", slog.Int("numIssues", numIssues)) } } - +func dumpStringResponse(res *esapi.Response) string { + return res.String() +} func getRawIssue(scanStartTime time.Time, res *v1.LaunchToolResponse, iss *v1.Issue) ([]byte, error) { description, err := templating.TemplateStringRaw(issueTemplate, iss) if err != nil { - log.Fatal("Could not template raw issue ", err) + return nil, errors.Errorf("Could not template raw issue %w", err) } jBytes, err := json.Marshal(&esDocument{ ScanStartTime: scanStartTime, @@ -140,7 +145,7 @@ func getRawIssue(scanStartTime time.Time, res *v1.LaunchToolResponse, iss *v1.Is CVE: iss.GetCve(), }) if err != nil { - return []byte{}, err + return nil, errors.Errorf("could not marshal elasticsearch document, err: %w", err) } return jBytes, nil } @@ -148,7 +153,7 @@ func getRawIssue(scanStartTime time.Time, res *v1.LaunchToolResponse, iss *v1.Is func getEnrichedIssue(scanStartTime time.Time, res *v1.EnrichedLaunchToolResponse, iss *v1.EnrichedIssue) ([]byte, error) { description, err := templating.TemplateStringEnriched(issueTemplate, iss) if err != nil { - log.Fatal("Could not template raw issue ", err) + return nil, errors.Errorf("Could not template raw issue %w", err) } firstSeenTime := iss.GetFirstSeen().AsTime() jBytes, err := json.Marshal(&esDocument{ @@ -172,7 +177,7 @@ func getEnrichedIssue(scanStartTime time.Time, res *v1.EnrichedLaunchToolRespons Annotations: iss.GetAnnotations(), }) if err != nil { - return []byte{}, err + return nil, errors.Errorf("could not marshal elasticsearch document, err: %w", err) } return jBytes, nil } @@ -202,6 +207,11 @@ func getESClient() (*elasticsearch.Client, error) { var es *elasticsearch.Client var err error var esConfig elasticsearch.Config = elasticsearch.Config{} + type esInfo struct { + Version struct { + Number string `json:"number"` + } `json:"version"` + } if basicAuthUser != "" && basicAuthPass != "" { esConfig.Username = basicAuthUser @@ -219,24 +229,27 @@ func getESClient() (*elasticsearch.Client, error) { es, err = elasticsearch.NewClient(esConfig) if err != nil { - return nil, err - } - type esInfo struct { - Version struct { - Number string `json:"number"` - } `json:"version"` + return nil, errors.Errorf("could not get elasticsearch client err: %w", err) } + // prove connection by attempting to retrieve cluster info, this requires read access to the cluster info + var info esInfo res, err := es.Info() if err != nil { - return nil, err + return nil, errors.Errorf("could not get cluster information as proof of connection, err: %w, raw response: %s", err, dumpStringResponse(res)) } - var info esInfo - if err := json.NewDecoder(res.Body).Decode(&info); err != nil { - return nil, err + if res.StatusCode != 200 || res.IsError() { + return nil, errors.Errorf("could not contact Elasticsearch, attempted to retrieve cluster info and got status code: %d as a result, body: %s", res.StatusCode, dumpStringResponse(res)) } + + slog.Debug("received info from elasticsearch successfully") + body := json.NewDecoder(res.Body) + if err := body.Decode(&info); err != nil { + return nil, errors.Errorf("could not decode elasticsearch cluster information %w", err) + } + if info.Version.Number[0] != '8' { - return nil, fmt.Errorf("unsupported ES Server version %s", info.Version.Number) + return nil, errors.Errorf("unsupported elasticsearch server version %s only version 8.x is supported, got %s instead", info.Version.Number, info.Version.Number) } - return es, err + return es, nil }