From dc53f618a681dc7ed9e7da9b5013c7f945b8d1db Mon Sep 17 00:00:00 2001 From: rathnapandi Date: Wed, 10 May 2023 11:15:47 -0700 Subject: [PATCH] traceability agent to read data from transactions logs. --- pkg/cmd/traceability/root.go | 8 +-- pkg/traceability/agent.go | 5 +- pkg/traceability/eventprocessor.go | 47 ++++++++-------- pkg/traceability/traceconfig.go | 67 +++++++++++++++++++++++ pkg/traceability/types.go | 88 ++++++++++++++++++++++++++++-- 5 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 pkg/traceability/traceconfig.go diff --git a/pkg/cmd/traceability/root.go b/pkg/cmd/traceability/root.go index 08df1ce..3ba56d4 100644 --- a/pkg/cmd/traceability/root.go +++ b/pkg/cmd/traceability/root.go @@ -48,10 +48,10 @@ func run() error { // Callback that agent will call to initialize the config. CentralConfig is parsed by Agent SDK // and passed to the callback allowing the agent code to access the central config func initConfig(centralConfig corecfg.CentralConfig) (interface{}, error) { - agentConfig := &config.AgentConfig{ - CentralConfig: centralConfig, - WebMethodConfig: config.NewWebmothodsConfig(RootCmd.GetProperties(), centralConfig.GetAgentType()), + agentConfig := &traceability.AgentConfigTraceability{ + CentralConfig: centralConfig, + WebMethodConfigTracability: traceability.NewWebmothodsConfig(RootCmd.GetProperties(), centralConfig.GetAgentType()), } - config.SetConfig(agentConfig) + traceability.SetConfig(agentConfig) return agentConfig, nil } diff --git a/pkg/traceability/agent.go b/pkg/traceability/agent.go index 52325e0..0a7b6fc 100644 --- a/pkg/traceability/agent.go +++ b/pkg/traceability/agent.go @@ -9,7 +9,6 @@ import ( "github.com/Axway/agent-sdk/pkg/transaction" agenterrors "github.com/Axway/agent-sdk/pkg/util/errors" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" - "github.com/Axway/agents-webmethods/pkg/config" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) @@ -26,12 +25,12 @@ type Agent struct { // NewBeater creates an instance of webmethods_traceability_agent. func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) { eventChannel := make(chan string) - agentConfig := config.GetConfig() + agentConfig := GetConfig() generator := transaction.NewEventGenerator() mapper := &EventMapper{} processor := NewEventProcessor(agentConfig, generator, mapper) - emitter := NewWebmethodsEventEmitter(agentConfig.WebMethodConfig.LogFile, eventChannel) + emitter := NewWebmethodsEventEmitter(agentConfig.WebMethodConfigTracability.LogFile, eventChannel) return newAgent(processor, emitter, eventChannel) } diff --git a/pkg/traceability/eventprocessor.go b/pkg/traceability/eventprocessor.go index 0b5d869..ab85f87 100644 --- a/pkg/traceability/eventprocessor.go +++ b/pkg/traceability/eventprocessor.go @@ -1,13 +1,10 @@ package traceability import ( - "encoding/json" - "time" + "fmt" "github.com/Axway/agent-sdk/pkg/transaction" - "github.com/Axway/agent-sdk/pkg/util/log" - "github.com/Axway/agents-webmethods/pkg/config" "github.com/elastic/beats/v7/libbeat/beat" ) @@ -25,13 +22,13 @@ type Processor interface { // log entry and performs the mapping to structure expected for Amplify Central Observer. The method returns the converted Events to // transport publisher which then produces the events over the transport. type EventProcessor struct { - cfg *config.AgentConfig + cfg *AgentConfigTraceability eventGenerator transaction.EventGenerator eventMapper *EventMapper } func NewEventProcessor( - gateway *config.AgentConfig, + gateway *AgentConfigTraceability, eventGenerator transaction.EventGenerator, mapper *EventMapper, ) *EventProcessor { @@ -45,23 +42,27 @@ func NewEventProcessor( // ProcessRaw - process the received log entry and returns the event to be published to Amplifyingestion service func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event { - var gatewayTrafficLogEntry GwTrafficLogEntry - err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry) - if err != nil { - log.Error(err.Error()) - return nil - } + // var gatewayTrafficLogEntry GwTrafficLogEntry + // err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry) + // if err != nil { + // log.Error(err.Error()) + // return nil + // } + fmt.Println(string(rawEvent)) + // data := strings.Split(string(rawEvent), "|") + // fmt.Println(len(data)) // Map the log entry to log event structure expected by AmplifyCentral Observer - summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry) - if err != nil { - log.Error(err.Error()) - return nil - } + // summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry) + // if err != nil { + // log.Error(err.Error()) + // return nil + // } - events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil) - if err != nil { - log.Error(err.Error()) - return nil - } - return events + // events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil) + // if err != nil { + // log.Error(err.Error()) + // return nil + // } + // return events + return nil } diff --git a/pkg/traceability/traceconfig.go b/pkg/traceability/traceconfig.go new file mode 100644 index 0000000..78d3955 --- /dev/null +++ b/pkg/traceability/traceconfig.go @@ -0,0 +1,67 @@ +package traceability + +import ( + "fmt" + "os" + + "github.com/Axway/agent-sdk/pkg/cmd/properties" + corecfg "github.com/Axway/agent-sdk/pkg/config" +) + +var agentConfig *AgentConfigTraceability + +const ( + pathLogFile = "webmethods.logFile" +) + +// SetConfig sets the global AgentConfig reference. +func SetConfig(newConfig *AgentConfigTraceability) { + agentConfig = newConfig +} + +// GetConfig gets the AgentConfig +func GetConfig() *AgentConfigTraceability { + return agentConfig +} + +// AgentConfig - represents the config for agent +type AgentConfigTraceability struct { + CentralConfig corecfg.CentralConfig `config:"central"` + WebMethodConfigTracability *WebMethodConfigTracability `config:"webmethod"` +} + +// WebMethodConfig - represents the config for the Webmethods APIM +type WebMethodConfigTracability struct { + corecfg.IConfigValidator + AgentType corecfg.AgentType + LogFile string `config:"logFile"` + TLS corecfg.TLSConfig `config:"ssl"` +} + +// ValidateCfg - Validates the gateway config +func (c *WebMethodConfigTracability) ValidateCfg() (err error) { + + if c.LogFile == "" { + return fmt.Errorf("invalid Webmethods APIM configuration: logFile is not configured") + } + + if c.AgentType == corecfg.TraceabilityAgent && c.LogFile != "" { + if _, err := os.Stat(c.LogFile); os.IsNotExist(err) { + return fmt.Errorf("invalid Webmethods APIM log path: path does not exist: %s", c.LogFile) + } + } + return +} + +// AddConfigProperties - Adds the command properties needed for Webmethods agent +func AddConfigProperties(props properties.Properties) { + props.AddStringProperty(pathLogFile, "./logs/traffic.log", "Sample log file with traffic event from gateway") +} + +// NewWebmothodsConfig - parse the props and create an Webmethods Configuration structure +func NewWebmothodsConfig(props properties.Properties, agentType corecfg.AgentType) *WebMethodConfigTracability { + return &WebMethodConfigTracability{ + AgentType: agentType, + LogFile: props.StringPropertyValue(pathLogFile), + } +} diff --git a/pkg/traceability/types.go b/pkg/traceability/types.go index cc2e1cf..42dc6e4 100644 --- a/pkg/traceability/types.go +++ b/pkg/traceability/types.go @@ -19,10 +19,90 @@ type GwTransaction struct { ResponseBytes int `json:"responseByte"` } +// # Transaction event logs +// 1 EVENT_TYPE +// 2 ROOT_CONTEXT +// 3 PARENT_CONTEXT +// 4 CURRENT_CONTEXT +// 5 UUID +// 6 SERVER_ID +// 7 EVENT_TIMESTAMP +// 8 CURRENT_TIMESTAMP +// 9 SESSION_ID +// 10 API_NAME +// 11 API_VERSION +// 12 TARGET_NAME +// 13 APPLICATION_NAME +// 14 APPLICATION_IP +// 15 APPLICATION_ID +// 16 RESPONSE +// 17 REQUEST +// 18 TOTAL_TIME +// 19 NATIVE_TIME +// 20 REQUEST_STATUS +// 21 OPERATION_NAME +// 22 NATIVE_ENDPOINT +// 23 PARTNER_ID +// 24 API_ID +// 25 SERVICE_NAME +// 26 REQ_HEADERS +// 27 QUERY_PARAM +// 28 RES_HEADERS +// 29 CORRELATIONID +// 30 ERROR_ORIGIN +// 31 CUSTOM +// 32 NATIVE_REQUEST_HEADERS +// 33 NATIVE_REQUEST_PAYLOAD +// 34 NATIVE_RESPONSE_HEADERS +// 35 NATIVE_RESPONSE_PAYLOAD +// 36 NATIVE_HTTP_METHOD GET +// 37 NATIVE_URL +// 38 EXTERNAL_CALLS +// 39 SOURCE_GATEWAY_NODE + // GwTrafficLogEntry - Represents the structure of log entry the agent will receive type GwTrafficLogEntry struct { - TraceID string `json:"traceId"` - APIName string `json:"apiName"` - InboundTransaction GwTransaction `json:"inbound"` - OutboundTransaction GwTransaction `json:"outbound"` + TraceID string `json:"traceId"` + APIName string `json:"apiName"` + InboundTransaction GwTransaction `json:"inbound"` + OutboundTransaction GwTransaction `json:"outbound"` + EventType string + RootContext string + ParentContext string + CurrentContext string + Uuid string + ServerId string + EventTimestamp string + CurrentTimestamp string + SessionId string + ApiName string + ApiVersion string + TargetName string + ApplicationName string + ApplicationIp string + ApplicationId string + Request string + Response string + TotalTime int + NativeTime int + RequestStatus string + OperationName string + NatvieEndpoint string + PartnerId string + ApiId string + ServiceName string + RequestHeaders string + QueryParam string + ResponseHeaders string + CorrelationId string + ErrorOrigin string + Custom string + NativeRequestHeaders string + NativeRequestPayload string + NativeResponseHeaders string + NativeResponsePayload string + NativeHttpMethod string + NativeUrl string + ExternalCalls string + SourceGatewayNode string }