diff --git a/pkg/cmd/traceability/root.go b/pkg/cmd/traceability/root.go index 3ba56d4..413f023 100644 --- a/pkg/cmd/traceability/root.go +++ b/pkg/cmd/traceability/root.go @@ -1,10 +1,11 @@ package traceability import ( + "fmt" + corecmd "github.com/Axway/agent-sdk/pkg/cmd" "github.com/Axway/agent-sdk/pkg/cmd/service" corecfg "github.com/Axway/agent-sdk/pkg/config" - "github.com/Axway/agents-webmethods/pkg/config" "github.com/Axway/agents-webmethods/pkg/traceability" libcmd "github.com/elastic/beats/v7/libbeat/cmd" @@ -16,6 +17,7 @@ var RootCmd corecmd.AgentRootCmd var beatCmd *libcmd.BeatsRootCmd func init() { + fmt.Println("initanliing root") name := "webmethods_traceability_agent" settings := instance.Settings{ Name: name, @@ -24,7 +26,7 @@ func init() { } // Initialize the beat command - beatCmd = libcmd.GenRootCmdWithSettings(traceability.NewBeater, settings) + beatCmd = libcmd.GenRootCmdWithSettings(traceability.New, settings) cmd := beatCmd.Command // Wrap the beat command with the agent command processor with callbacks to initialize the agent config and command execution. // The first parameter identifies the name of the yaml file that agent will look for to load the config @@ -36,12 +38,14 @@ func init() { run, corecfg.TraceabilityAgent, ) - config.AddConfigProperties(RootCmd.GetProperties()) - RootCmd.AddCommand(service.GenServiceCmd("pathConfig")) + traceability.AddConfigProperties(RootCmd.GetProperties()) + RootCmd.AddCommand(service.GenServiceCmd("path.Config")) } // Callback that agent will call to process the execution func run() error { + fmt.Println("initanliing root") + return beatCmd.Execute() } diff --git a/pkg/common/common.go b/pkg/common/common.go index 9536c17..509e1a3 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -4,7 +4,7 @@ import "fmt" const ( AppID = "appID" - AttrAPIID = "sampleApiId" + AttrAPIID = "apiId" AttrChecksum = "checksum" AttrAppID = "webmethodsApplicationId" ) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go new file mode 100644 index 0000000..83dd77d --- /dev/null +++ b/pkg/errors/errors.go @@ -0,0 +1,28 @@ +package errors + +import "github.com/Axway/agent-sdk/pkg/util/errors" + +// Error definitions +var ( + ErrGatewayConfig = errors.Newf(3500, "error apigateway.getHeaders is set to true and apigateway.%s not set in config") + + ErrV7HealthcheckHost = errors.Newf(3501, "%s Failed. Error communicating with API Gateway: %s. Check API Gateway host configuration values") + ErrV7HealthcheckAuth = errors.Newf(3502, "%s Failed. Error sending request to API Gateway. HTTP response code %v. Check API Gateway authentication configuration values") + + // Event Processing + ErrEventNoMsg = errors.Newf(3510, "the log event had no message field: %s") + ErrEventMsgStructure = errors.Newf(3511, "could not parse the log event: %s") + ErrTrxnDataGet = errors.New(3512, "could not retrieve the transaction data from API Gateway") + ErrTrxnDataProcess = errors.New(3513, "could not process the transaction data") + ErrTrxnHeaders = errors.Newf(3514, "could not process the transaction headers: %s") + ErrProtocolStructure = errors.Newf(3515, "could not parse the %s transaction details: %s") + ErrCreateCondorEvent = errors.New(3516, "error creating the Amplify Visibility event") + ErrNotHTTPService = errors.New(3517, "the event processor can only handle http services") + ErrWrongProcessor = errors.Newf(3518, "the message looks to be from the %s log but open traffic process is %s, check OPENTRAFFIC_LOG_INPUT") + ErrInvalidInputConfig = errors.Newf(3519, "input configuration is not valid: %s") + ErrConfigFile = errors.New(3520, "could not find the 'traceability_agent' section in the configuration file") + + // API Gateway Communication + ErrAPIGWRequest = errors.New(3530, "error encountered sending a request to API Gateway") + ErrAPIGWResponse = errors.Newf(3531, "unexpected HTTP response code %v in response from API Gateway") +) diff --git a/pkg/traceability/agent.go b/pkg/traceability/agent.go index b40136c..b184088 100644 --- a/pkg/traceability/agent.go +++ b/pkg/traceability/agent.go @@ -1,93 +1,93 @@ package traceability import ( + "errors" + "fmt" "os" - "os/signal" - "syscall" + "path/filepath" + "strings" - coreagent "github.com/Axway/agent-sdk/pkg/agent" - "github.com/Axway/agent-sdk/pkg/transaction" - agenterrors "github.com/Axway/agent-sdk/pkg/util/errors" - hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" + "github.com/elastic/beats/v7/filebeat/beater" + fbcfg "github.com/elastic/beats/v7/filebeat/config" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" -) - -// Agent - Webmethods Beater configuration. Implements the beat.Beater interface. -type Agent struct { - client beat.Client - doneCh chan struct{} - eventChannel chan string - eventProcessor Processor - webmethods Emitter -} + "github.com/elastic/beats/v7/libbeat/logp" -// NewBeater creates an instance of webmethods_traceability_agent. -func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) { - eventChannel := make(chan string) - agentConfig := GetConfig() - generator := transaction.NewEventGenerator() - mapper := &EventMapper{} - processor := NewEventProcessor(agentConfig, generator, mapper) - emitter := NewWebmethodsEventEmitter(agentConfig.WebMethodConfigTracability.LogFile, eventChannel) + "github.com/Axway/agent-sdk/pkg/traceability" + localerrors "github.com/Axway/agents-webmethods/pkg/errors" +) - return newAgent(processor, emitter, eventChannel) -} +func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { + fmt.Println("beater") + err := validateInput(cfg) + if err != nil { + return nil, localerrors.ErrInvalidInputConfig.FormatError(err.Error()) + } -func newAgent( - processor Processor, - emitter Emitter, - eventChannel chan string, -) (*Agent, error) { - a := &Agent{ - doneCh: make(chan struct{}), - eventChannel: eventChannel, - eventProcessor: processor, - webmethods: emitter, + agentCfg := GetConfig() + if agentCfg == nil { + return nil, localerrors.ErrConfigFile } + eventProcessor := NewEventProcessor(agentCfg, traceability.GetMaxRetries()) + traceability.SetOutputEventProcessor(eventProcessor) - // Validate that all necessary services are up and running. If not, return error - if hc.RunChecks() != hc.OK { - return nil, agenterrors.ErrInitServicesNotReady + factory := func(beat.Info, *logp.Logger, beater.StateStore) []v2.Plugin { + return []v2.Plugin{} } - return a, nil + // Initialize the filebeat to read events + creator := beater.New(factory) + return creator(b, cfg) } -// Run starts the webmethods traceability agent. -func (a *Agent) Run(b *beat.Beat) error { - coreagent.OnConfigChange(a.onConfigChange) - - var err error - a.client, err = b.Publisher.Connect() +func validateInput(cfg *common.Config) error { + filebeatConfig := fbcfg.DefaultConfig + err := cfg.Unpack(&filebeatConfig) if err != nil { - coreagent.UpdateStatus(coreagent.AgentFailed, err.Error()) return err } - go a.webmethods.Start() - - gracefulStop := make(chan os.Signal, 1) - signal.Notify(gracefulStop, syscall.SIGTERM, os.Interrupt) + if len(filebeatConfig.Inputs) == 0 { + return errors.New("no inputs configured") + } - for { - select { - case <-a.doneCh: - return a.client.Close() - case <-gracefulStop: - return a.client.Close() - case event := <-a.eventChannel: - eventsToPublish := a.eventProcessor.ProcessRaw([]byte(event)) - a.client.PublishAll(eventsToPublish) + inputsEnabled := 0 + for _, input := range filebeatConfig.Inputs { + inputConfig := struct { + Enabled bool `config:"enabled"` + Paths []string `config:"paths"` + }{} + input.Unpack(&inputConfig) + if inputConfig.Enabled { + inputsEnabled++ + err = validateInputPaths(inputConfig.Paths) + if err != nil { + return err + } } } + return nil } -// onConfigChange apply configuration changes -func (a *Agent) onConfigChange() { -} - -// Stop stops the agent. -func (a *Agent) Stop() { - a.doneCh <- struct{}{} +func validateInputPaths(paths []string) error { + foundPath := false + for _, path := range paths { + path = strings.TrimSpace(path) + if path != "" { + parentDir := filepath.Dir(path) + fileInfo, err := os.Stat(parentDir) + if err != nil { + return err + } + if !fileInfo.IsDir() { + return errors.New("invalid path " + path) + } + foundPath = true + } + } + if !foundPath { + return errors.New("no paths were defined for input processing") + } + return nil } diff --git a/pkg/traceability/eventmapper.go b/pkg/traceability/eventmapper.go deleted file mode 100644 index f60c248..0000000 --- a/pkg/traceability/eventmapper.go +++ /dev/null @@ -1,126 +0,0 @@ -package traceability - -import ( - "net/http" - "strconv" - "strings" - "time" - - "github.com/Axway/agent-sdk/pkg/agent" - "github.com/Axway/agent-sdk/pkg/transaction" - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" -) - -// EventMapper - -type EventMapper struct { -} - -func (m *EventMapper) processMapping(gatewayTrafficLogEntry GwTrafficLogEntry) (*transaction.LogEvent, []transaction.LogEvent, error) { - centralCfg := agent.GetCentralConfig() - - eventTimestamp, _ := time.Parse(time.RFC3339, gatewayTrafficLogEntry.EventTimestamp) - eventTime := eventTimestamp.UTC().UnixNano() / int64(time.Millisecond) - //eventTime := time.Now().UTC().Format(gatewayTrafficLogEntry.EventTimestamp) - txID := gatewayTrafficLogEntry.Uuid - txEventID := gatewayTrafficLogEntry.CorrelationId - transInboundLogEventLeg, err := m.createTransactionEvent(eventTime, txID, gatewayTrafficLogEntry, txEventID, "Inbound") - if err != nil { - return nil, nil, err - } - - transSummaryLogEvent, err := m.createSummaryEvent(eventTime, txID, gatewayTrafficLogEntry, centralCfg.GetTeamID()) - if err != nil { - return nil, nil, err - } - - return transSummaryLogEvent, []transaction.LogEvent{ - *transInboundLogEventLeg, - }, nil -} - -func (m *EventMapper) getTransactionEventStatus(code string) transaction.TxEventStatus { - if code != "SUCCESS" { - return transaction.TxEventStatusFail - } - return transaction.TxEventStatusPass -} - -func (m *EventMapper) getHttpStatusCode(code string) int { - if code == "SUCCESS" { - return 200 - } - return 500 -} - -func (m *EventMapper) getTransactionSummaryStatus(statusCode int) transaction.TxSummaryStatus { - transSummaryStatus := transaction.TxSummaryStatusUnknown - if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest { - transSummaryStatus = transaction.TxSummaryStatusSuccess - } else if statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError { - transSummaryStatus = transaction.TxSummaryStatusFailure - } else if statusCode >= http.StatusInternalServerError && statusCode < http.StatusNetworkAuthenticationRequired { - transSummaryStatus = transaction.TxSummaryStatusException - } - return transSummaryStatus -} - -func (m *EventMapper) createTransactionEvent(eventTime int64, txID string, txDetails GwTrafficLogEntry, eventID, direction string) (*transaction.LogEvent, error) { - - httpStatus := m.getHttpStatusCode(txDetails.RequestStatus) - - host := txDetails.ServerId - port := 443 - if strings.Index(host, ":") != -1 { - uris := strings.Split(host, ":") - host = uris[0] - port, _ = strconv.Atoi(uris[1]) - - } - - httpProtocolDetails, err := transaction.NewHTTPProtocolBuilder(). - SetURI(txDetails.OperationName). - SetMethod(txDetails.NativeHttpMethod). - SetStatus(httpStatus, http.StatusText(httpStatus)). - SetHost(host). - SetLocalAddress(host, port). - Build() - if err != nil { - return nil, err - } - - return transaction.NewTransactionEventBuilder(). - SetTimestamp(eventTime). - SetTransactionID(txID). - SetID(eventID). - SetSource(txDetails.ServerId). - SetDirection(direction). - SetStatus(m.getTransactionEventStatus(txDetails.RequestStatus)). - SetProtocolDetail(httpProtocolDetails). - Build() -} - -func (m *EventMapper) createSummaryEvent(eventTime int64, txID string, gatewayTrafficLogEntry GwTrafficLogEntry, teamID string) (*transaction.LogEvent, error) { - statusCode := m.getHttpStatusCode(gatewayTrafficLogEntry.RequestStatus) - method := gatewayTrafficLogEntry.NativeHttpMethod - uri := gatewayTrafficLogEntry.OperationName - host := gatewayTrafficLogEntry.ApplicationIp - - builder := transaction.NewTransactionSummaryBuilder(). - SetTimestamp(eventTime). - SetTransactionID(txID). - SetStatus(m.getTransactionSummaryStatus(statusCode), strconv.Itoa(statusCode)). - SetTeam(teamID). - SetDuration(gatewayTrafficLogEntry.TotalTime). - SetEntryPoint("http", method, uri, host). - // If the API is published to Central as unified catalog item/API service, se the Proxy details with the API definition - // The Proxy.Name represents the name of the API - // The Proxy.ID should be of format "remoteApiId_". Use transaction.FormatProxyID() to get the formatted value. - SetProxy(transutil.FormatProxyID(gatewayTrafficLogEntry.ApiName), gatewayTrafficLogEntry.ApiName, 0) - - if gatewayTrafficLogEntry.ApplicationName != "Unknown" && gatewayTrafficLogEntry.ApplicationId != "Unknown" { - builder.SetApplication(gatewayTrafficLogEntry.ApplicationId, gatewayTrafficLogEntry.ApplicationName) - } - - return builder.Build() - -} diff --git a/pkg/traceability/eventprocessor.go b/pkg/traceability/eventprocessor.go index 5acb5a3..4140d15 100644 --- a/pkg/traceability/eventprocessor.go +++ b/pkg/traceability/eventprocessor.go @@ -1,19 +1,33 @@ package traceability import ( + "fmt" + "net/http" "strconv" "strings" "time" + "github.com/elastic/beats/v7/libbeat/publisher" + + "github.com/Axway/agent-sdk/pkg/agent" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" "github.com/Axway/agent-sdk/pkg/transaction" + "github.com/Axway/agent-sdk/pkg/util" + "github.com/Axway/agent-sdk/pkg/util/log" - "github.com/elastic/beats/v7/libbeat/beat" + transutil "github.com/Axway/agent-sdk/pkg/transaction/util" + sdkerrors "github.com/Axway/agent-sdk/pkg/util/errors" + agenterrors "github.com/Axway/agents-webmethods/pkg/errors" +) - "github.com/Axway/agent-sdk/pkg/util/log" +const ( + condorKey = "condor" + retriesKey = "retries" ) -type Processor interface { - ProcessRaw(rawEvent []byte) []beat.Event +type cacheManager interface { + GetManagedApplicationCacheKeys() []string + GetManagedApplication(id string) *v1.ResourceInstance } // EventProcessor - represents the processor for received event for Amplify Central @@ -28,30 +42,69 @@ type Processor interface { type EventProcessor struct { cfg *AgentConfigTraceability eventGenerator transaction.EventGenerator - eventMapper *EventMapper + tenantID string + cacheManager cacheManager + appIDToManApp map[string]string } +// func NewEventProcessor( +// gateway *AgentConfigTraceability, +// eventGenerator transaction.EventGenerator, +// mapper *EventMapper, +// ) *EventProcessor { +// ep := &EventProcessor{ +// cfg: gateway, +// eventGenerator: eventGenerator, +// eventMapper: mapper, +// cacheManager: agent.GetCacheManager(), +// appIDToManApp: make(map[string]string), +// //tenantID: agentConfig.Central.GetTenantID(), +// } +// return ep +// } + func NewEventProcessor( gateway *AgentConfigTraceability, - eventGenerator transaction.EventGenerator, - mapper *EventMapper, + maxRetries int, ) *EventProcessor { ep := &EventProcessor{ cfg: gateway, - eventGenerator: eventGenerator, - eventMapper: mapper, + eventGenerator: transaction.NewEventGenerator(), + cacheManager: agent.GetCacheManager(), + appIDToManApp: make(map[string]string), + //tenantID: agentConfig.Central.GetTenantID(), } + log.Trace("Event Processor Created") return ep } -// ProcessRaw - process the received log entry and returns the event to be published to Amplifyingestion service -func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event { - var gatewayTrafficLogEntry GwTrafficLogEntry - data := strings.Split(string(rawEvent), "|") - if len(data) == 39 && data[0] == "#AGW_EVENT_TXN" { +// Process - Process the log file, waiting for events +func (p *EventProcessor) Process(events []publisher.Event) []publisher.Event { + newEvents := make([]publisher.Event, 0) + for _, event := range events { + newEvents, _ = p.ProcessEvent(newEvents, event) + } + for _, newEvent := range newEvents { + str := fmt.Sprintf("%v", newEvent) + log.Trace("New event message to process - " + str) + } + return newEvents +} + +func (p *EventProcessor) ProcessEvent(newEvents []publisher.Event, event publisher.Event) ([]publisher.Event, error) { + eventMsgFieldVal, err := event.Content.Fields.GetValue("message") + if err != nil { + return newEvents, sdkerrors.Wrap(agenterrors.ErrEventNoMsg, err.Error()).FormatError(event) + } + eventMsg, ok := eventMsgFieldVal.(string) + if !ok { + return newEvents, nil + } + data := strings.Split(string(eventMsg), "|") + if len(data) == 39 { totalTime, _ := strconv.Atoi(data[17]) nativeTime, _ := strconv.Atoi(data[18]) - gatewayTrafficLogEntry = GwTrafficLogEntry{ + gatewayTrafficLogEntry := GwTrafficLogEntry{ EventType: data[0], RootContext: data[1], ParentContext: data[2], @@ -92,22 +145,183 @@ func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event { ExternalCalls: data[37], SourceGatewayNode: data[38], } + summaryEvent, logEvents, err := p.processMapping(gatewayTrafficLogEntry) + if err != nil { + log.Error(err.Error()) + return newEvents, nil + } + + //events, err := p.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil) + events, err := p.createCondorEvents(event, *summaryEvent, logEvents) + if err != nil { + return newEvents, nil + } + newEvents = append(newEvents, events...) + } else { - log.Errorf("Invalid record %s", string(rawEvent)) - return nil + log.Errorf("Invalid record %s", eventMsg) + } + return newEvents, nil + +} + +func (p *EventProcessor) getApplicationByName(appId string, appName string) (string, string) { + // Add the V7 Application ID, with prefix, and Name to the event + + // find the manged application in the cache + manAppName := p.getManagedApplicationNameByID(appId) + if manAppName != "" { + return appId, manAppName + } + return appId, appName + +} + +func (p *EventProcessor) getManagedApplicationNameByID(appID string) string { + if name, ok := p.appIDToManApp[appID]; ok { + return name + } + for _, key := range p.cacheManager.GetManagedApplicationCacheKeys() { + ri := p.cacheManager.GetManagedApplication(key) + val, _ := util.GetAgentDetailsValue(ri, "webmethodsApplicationId") + if val == appID { + p.appIDToManApp[appID] = ri.Name + return ri.Name + } + } + return "" +} + +func (p *EventProcessor) createCondorEvents(originalLogEvent publisher.Event, summaryEvent transaction.LogEvent, detailEvents []transaction.LogEvent) ([]publisher.Event, error) { + // Create the beat event then wrap that in the publisher event for Condor + + // Add a Retry count to Meta + if originalLogEvent.Content.Meta == nil { + originalLogEvent.Content.Meta = make(map[string]interface{}) + } + originalLogEvent.Content.Meta[retriesKey] = 0 + originalLogEvent.Content.Meta[condorKey] = true + + beatEvents, err := p.eventGenerator.CreateEvents(summaryEvent, detailEvents, originalLogEvent.Content.Timestamp, originalLogEvent.Content.Meta, originalLogEvent.Content.Fields, originalLogEvent.Content.Private) + + if err != nil { + return nil, sdkerrors.Wrap(agenterrors.ErrCreateCondorEvent, err.Error()) + } + + events := make([]publisher.Event, 0) + for _, beatEvent := range beatEvents { + events = append(events, publisher.Event{ + Content: beatEvent, + Flags: originalLogEvent.Flags, + }) + } + return events, nil +} + +func (ep *EventProcessor) processMapping(gatewayTrafficLogEntry GwTrafficLogEntry) (*transaction.LogEvent, []transaction.LogEvent, error) { + centralCfg := agent.GetCentralConfig() + + eventTimestamp, _ := time.Parse(time.RFC3339, gatewayTrafficLogEntry.EventTimestamp) + eventTime := eventTimestamp.UTC().UnixNano() / int64(time.Millisecond) + //eventTime := time.Now().UTC().Format(gatewayTrafficLogEntry.EventTimestamp) + txID := gatewayTrafficLogEntry.Uuid + txEventID := gatewayTrafficLogEntry.CorrelationId + transInboundLogEventLeg, err := ep.createTransactionEvent(eventTime, txID, gatewayTrafficLogEntry, txEventID, "Inbound") + if err != nil { + return nil, nil, err } - //Map the log entry to log event structure expected by AmplifyCentral Observer - summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry) + transSummaryLogEvent, err := ep.createSummaryEvent(eventTime, txID, gatewayTrafficLogEntry, centralCfg.GetTeamID()) if err != nil { - log.Error(err.Error()) - return nil + return nil, nil, err + } + + return transSummaryLogEvent, []transaction.LogEvent{ + *transInboundLogEventLeg, + }, nil +} + +func (ep *EventProcessor) getTransactionEventStatus(code string) transaction.TxEventStatus { + if code != "SUCCESS" { + return transaction.TxEventStatusFail + } + return transaction.TxEventStatusPass +} + +func (ep *EventProcessor) getHttpStatusCode(code string) int { + if code == "SUCCESS" { + return 200 + } + return 500 +} + +func (ep *EventProcessor) getTransactionSummaryStatus(statusCode int) transaction.TxSummaryStatus { + transSummaryStatus := transaction.TxSummaryStatusUnknown + if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest { + transSummaryStatus = transaction.TxSummaryStatusSuccess + } else if statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError { + transSummaryStatus = transaction.TxSummaryStatusFailure + } else if statusCode >= http.StatusInternalServerError && statusCode < http.StatusNetworkAuthenticationRequired { + transSummaryStatus = transaction.TxSummaryStatusException + } + return transSummaryStatus +} + +func (ep *EventProcessor) createTransactionEvent(eventTime int64, txID string, txDetails GwTrafficLogEntry, eventID, direction string) (*transaction.LogEvent, error) { + + httpStatus := ep.getHttpStatusCode(txDetails.RequestStatus) + + host := txDetails.ServerId + port := 443 + if strings.Index(host, ":") != -1 { + uris := strings.Split(host, ":") + host = uris[0] + port, _ = strconv.Atoi(uris[1]) + } - events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil) + httpProtocolDetails, err := transaction.NewHTTPProtocolBuilder(). + SetURI(txDetails.OperationName). + SetMethod(txDetails.NativeHttpMethod). + SetStatus(httpStatus, http.StatusText(httpStatus)). + SetHost(host). + SetLocalAddress(host, port). + Build() if err != nil { - log.Error(err.Error()) - return nil + return nil, err + } + + return transaction.NewTransactionEventBuilder(). + SetTimestamp(eventTime). + SetTransactionID(txID). + SetID(eventID). + SetSource(txDetails.ServerId). + SetDirection(direction). + SetStatus(ep.getTransactionEventStatus(txDetails.RequestStatus)). + SetProtocolDetail(httpProtocolDetails). + Build() +} + +func (ep *EventProcessor) createSummaryEvent(eventTime int64, txID string, gatewayTrafficLogEntry GwTrafficLogEntry, teamID string) (*transaction.LogEvent, error) { + statusCode := ep.getHttpStatusCode(gatewayTrafficLogEntry.RequestStatus) + method := gatewayTrafficLogEntry.NativeHttpMethod + uri := gatewayTrafficLogEntry.OperationName + host := gatewayTrafficLogEntry.ApplicationIp + + builder := transaction.NewTransactionSummaryBuilder(). + SetTimestamp(eventTime). + SetTransactionID(txID). + SetStatus(ep.getTransactionSummaryStatus(statusCode), strconv.Itoa(statusCode)). + SetTeam(teamID). + SetDuration(gatewayTrafficLogEntry.TotalTime). + SetEntryPoint("http", method, uri, host). + // If the API is published to Central as unified catalog item/API service, se the Proxy details with the API definition + // The Proxy.Name represents the name of the API + // The Proxy.ID should be of format "remoteApiId_". Use transaction.FormatProxyID() to get the formatted value. + SetProxy(transutil.FormatProxyID(gatewayTrafficLogEntry.ApiId), gatewayTrafficLogEntry.ApiName, 0) + + if gatewayTrafficLogEntry.ApplicationName != "Unknown" && gatewayTrafficLogEntry.ApplicationId != "Unknown" { + builder.SetApplication(gatewayTrafficLogEntry.ApplicationId, gatewayTrafficLogEntry.ApplicationName) } - return events + return builder.Build() } diff --git a/pkg/traceability/traceconfig.go b/pkg/traceability/traceconfig.go index 78d3955..e256f7e 100644 --- a/pkg/traceability/traceconfig.go +++ b/pkg/traceability/traceconfig.go @@ -1,9 +1,6 @@ package traceability import ( - "fmt" - "os" - "github.com/Axway/agent-sdk/pkg/cmd/properties" corecfg "github.com/Axway/agent-sdk/pkg/config" ) @@ -41,15 +38,6 @@ type WebMethodConfigTracability struct { // ValidateCfg - Validates the gateway config func (c *WebMethodConfigTracability) ValidateCfg() (err error) { - if c.LogFile == "" { - return fmt.Errorf("invalid Webmethods APIM configuration: logFile is not configured") - } - - if c.AgentType == corecfg.TraceabilityAgent && c.LogFile != "" { - if _, err := os.Stat(c.LogFile); os.IsNotExist(err) { - return fmt.Errorf("invalid Webmethods APIM log path: path does not exist: %s", c.LogFile) - } - } return } diff --git a/pkg/traceability/webmethodsemitter.go b/pkg/traceability/webmethodsemitter.go deleted file mode 100644 index 4713eca..0000000 --- a/pkg/traceability/webmethodsemitter.go +++ /dev/null @@ -1,44 +0,0 @@ -package traceability - -import ( - "github.com/hpcloud/tail" -) - -const ( - healthCheckEndpoint = "ingestion" - CacheKeyTimeStamp = "LAST_RUN" -) - -type Emitter interface { - Start() error -} - -// WebmethodsEventEmitter - Gathers analytics data for publishing to Central. -type WebmethodsEventEmitter struct { - eventChannel chan string - logFile string -} - -// NewWebmethodsEventEmitter - Creates a client to poll for events. -func NewWebmethodsEventEmitter(logFile string, eventChannel chan string) *WebmethodsEventEmitter { - me := &WebmethodsEventEmitter{ - eventChannel: eventChannel, - logFile: logFile, - } - return me -} - -// Start retrieves analytics data from anypoint and sends them on the event channel for processing. -func (me *WebmethodsEventEmitter) Start() error { - go me.tailFile() - - return nil - -} - -func (me WebmethodsEventEmitter) tailFile() { - t, _ := tail.TailFile(me.logFile, tail.Config{Follow: true}) - for line := range t.Lines { - me.eventChannel <- line.Text - } -}