Skip to content

Commit

Permalink
fix #379, add significantly more error logging to the elasticsearch c…
Browse files Browse the repository at this point in the history
…onsumer
  • Loading branch information
northdpole committed Oct 1, 2024
1 parent 80cac7e commit dc70704
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions components/consumers/elasticsearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"strings"
"time"

elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/go-errors/errors"

v1 "github.com/ocurity/dracon/api/proto/v1"
"github.com/ocurity/dracon/components/consumers"
"github.com/ocurity/dracon/pkg/enumtransformers"
"github.com/ocurity/dracon/pkg/templating"

elasticsearch "github.com/elastic/go-elasticsearch/v8"
)

var (
Expand Down Expand Up @@ -61,19 +63,21 @@ func main() {
flag.Parse()

if err := parseFlags(); err != nil {
log.Fatal(err)
log.Fatalf("could not parse incoming flags error: %s", err)
}

slog.Debug("connecting to elasticsearch")
es, err := getESClient()
if err != nil {
log.Fatal("could not contact remote Elasticsearch: ", err)
log.Fatalf("could not contact remote Elasticsearch error: %s", err)
}
slog.Debug("successfully connected to elasticsearch")

if consumers.Raw {
slog.Debug("Parsing Raw results")
responses, err := consumers.LoadToolResponse()
if err != nil {
log.Fatal("could not load raw results, file malformed: ", err)
log.Fatalf("could not load raw results, file malformed: %s", err)
}
numIssues := 0
for _, res := range responses {
Expand All @@ -85,18 +89,17 @@ func main() {
log.Fatal("Could not parse raw issue", err)
}
res, err := es.Index(esIndex, bytes.NewBuffer(b))
log.Printf("%+v", res)
if err != nil {
log.Fatal("Could not push raw issue", err)
if err != nil || res.StatusCode != 200 || res.IsError() {
log.Fatalf("could not push raw issue to index: %s, status code received: %d, elasticsearch result: %s, error:%s", esIndex, res.StatusCode, dumpStringResponse(res), err)
}
}
}
slog.Info("Pushed", "numIssues", numIssues, "issues to Elasticsearch", "")
slog.Info("Pushed issues to Elasticsearch", slog.Int("numIssues", numIssues))
} else {
log.Print("Parsing Enriched results")
slog.Debug("Parsing Enriched results")
responses, err := consumers.LoadEnrichedToolResponse()
if err != nil {
log.Fatal("could not load enriched results, file malformed: ", err)
log.Fatalf("could not load enriched results, file malformed error: %s ", err)
}
numIssues := 0
for _, res := range responses {
Expand All @@ -105,22 +108,24 @@ func main() {
for _, iss := range res.GetIssues() {
b, err := getEnrichedIssue(scanStartTime, res, iss)
if err != nil {
log.Fatal("Could not parse enriched issue", err)
log.Fatalf("Could not parse enriched issue error:%s", err)
}
res, err := es.Index(esIndex, bytes.NewBuffer(b))
if err != nil || res.IsError() {
log.Fatal("Could not push enriched issue", err, "received", res.StatusCode)
if err != nil || res.StatusCode != 200 || res.IsError() {
log.Fatalf("could not push enriched issue to index: %s, status code received: %d, elasticsearch result: %s, error:%s", esIndex, res.StatusCode, dumpStringResponse(res), err)
}
}
}
slog.Info("Pushed", "numIssues", numIssues, "issues to Elasticsearch", "")
slog.Info("Pushed issues to Elasticsearch", slog.Int("numIssues", numIssues))
}
}

func dumpStringResponse(res *esapi.Response) string {
return res.String()
}
func getRawIssue(scanStartTime time.Time, res *v1.LaunchToolResponse, iss *v1.Issue) ([]byte, error) {
description, err := templating.TemplateStringRaw(issueTemplate, iss)
if err != nil {
log.Fatal("Could not template raw issue ", err)
return nil, errors.Errorf("Could not template raw issue %w", err)
}
jBytes, err := json.Marshal(&esDocument{
ScanStartTime: scanStartTime,
Expand All @@ -140,15 +145,15 @@ func getRawIssue(scanStartTime time.Time, res *v1.LaunchToolResponse, iss *v1.Is
CVE: iss.GetCve(),
})
if err != nil {
return []byte{}, err
return nil, errors.Errorf("could not marshal elasticsearch document, err: %w", err)
}
return jBytes, nil
}

func getEnrichedIssue(scanStartTime time.Time, res *v1.EnrichedLaunchToolResponse, iss *v1.EnrichedIssue) ([]byte, error) {
description, err := templating.TemplateStringEnriched(issueTemplate, iss)
if err != nil {
log.Fatal("Could not template raw issue ", err)
return nil, errors.Errorf("Could not template raw issue %w", err)
}
firstSeenTime := iss.GetFirstSeen().AsTime()
jBytes, err := json.Marshal(&esDocument{
Expand All @@ -172,7 +177,7 @@ func getEnrichedIssue(scanStartTime time.Time, res *v1.EnrichedLaunchToolRespons
Annotations: iss.GetAnnotations(),
})
if err != nil {
return []byte{}, err
return nil, errors.Errorf("could not marshal elasticsearch document, err: %w", err)
}
return jBytes, nil
}
Expand Down Expand Up @@ -202,6 +207,11 @@ func getESClient() (*elasticsearch.Client, error) {
var es *elasticsearch.Client
var err error
var esConfig elasticsearch.Config = elasticsearch.Config{}
type esInfo struct {
Version struct {
Number string `json:"number"`
} `json:"version"`
}

if basicAuthUser != "" && basicAuthPass != "" {
esConfig.Username = basicAuthUser
Expand All @@ -219,24 +229,27 @@ func getESClient() (*elasticsearch.Client, error) {

es, err = elasticsearch.NewClient(esConfig)
if err != nil {
return nil, err
}
type esInfo struct {
Version struct {
Number string `json:"number"`
} `json:"version"`
return nil, errors.Errorf("could not get elasticsearch client err: %w", err)
}

// prove connection by attempting to retrieve cluster info, this requires read access to the cluster info
var info esInfo
res, err := es.Info()
if err != nil {
return nil, err
return nil, errors.Errorf("could not get cluster information as proof of connection, err: %w, raw response: %s", err, dumpStringResponse(res))
}
var info esInfo
if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
return nil, err
if res.StatusCode != 200 || res.IsError() {
return nil, errors.Errorf("could not contact Elasticsearch, attempted to retrieve cluster info and got status code: %d as a result, body: %s", res.StatusCode, dumpStringResponse(res))
}

slog.Debug("received info from elasticsearch successfully")
body := json.NewDecoder(res.Body)
if err := body.Decode(&info); err != nil {
return nil, errors.Errorf("could not decode elasticsearch cluster information %w", err)
}

if info.Version.Number[0] != '8' {
return nil, fmt.Errorf("unsupported ES Server version %s", info.Version.Number)
return nil, errors.Errorf("unsupported elasticsearch server version %s only version 8.x is supported, got %s instead", info.Version.Number, info.Version.Number)
}
return es, err
return es, nil
}

0 comments on commit dc70704

Please sign in to comment.