Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
traceability agent to read data from transactions logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
rathnapandi committed May 10, 2023
1 parent e66c1db commit dc53f61
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 34 deletions.
8 changes: 4 additions & 4 deletions pkg/cmd/traceability/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func run() error {
// Callback that agent will call to initialize the config. CentralConfig is parsed by Agent SDK
// and passed to the callback allowing the agent code to access the central config
func initConfig(centralConfig corecfg.CentralConfig) (interface{}, error) {
agentConfig := &config.AgentConfig{
CentralConfig: centralConfig,
WebMethodConfig: config.NewWebmothodsConfig(RootCmd.GetProperties(), centralConfig.GetAgentType()),
agentConfig := &traceability.AgentConfigTraceability{
CentralConfig: centralConfig,
WebMethodConfigTracability: traceability.NewWebmothodsConfig(RootCmd.GetProperties(), centralConfig.GetAgentType()),
}
config.SetConfig(agentConfig)
traceability.SetConfig(agentConfig)
return agentConfig, nil
}
5 changes: 2 additions & 3 deletions pkg/traceability/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/Axway/agent-sdk/pkg/transaction"
agenterrors "github.com/Axway/agent-sdk/pkg/util/errors"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
"github.com/Axway/agents-webmethods/pkg/config"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)
Expand All @@ -26,12 +25,12 @@ type Agent struct {
// NewBeater creates an instance of webmethods_traceability_agent.
func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) {
eventChannel := make(chan string)
agentConfig := config.GetConfig()
agentConfig := GetConfig()

generator := transaction.NewEventGenerator()
mapper := &EventMapper{}
processor := NewEventProcessor(agentConfig, generator, mapper)
emitter := NewWebmethodsEventEmitter(agentConfig.WebMethodConfig.LogFile, eventChannel)
emitter := NewWebmethodsEventEmitter(agentConfig.WebMethodConfigTracability.LogFile, eventChannel)

return newAgent(processor, emitter, eventChannel)
}
Expand Down
47 changes: 24 additions & 23 deletions pkg/traceability/eventprocessor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package traceability

import (
"encoding/json"
"time"
"fmt"

"github.com/Axway/agent-sdk/pkg/transaction"
"github.com/Axway/agent-sdk/pkg/util/log"

"github.com/Axway/agents-webmethods/pkg/config"
"github.com/elastic/beats/v7/libbeat/beat"
)

Expand All @@ -25,13 +22,13 @@ type Processor interface {
// log entry and performs the mapping to structure expected for Amplify Central Observer. The method returns the converted Events to
// transport publisher which then produces the events over the transport.
type EventProcessor struct {
cfg *config.AgentConfig
cfg *AgentConfigTraceability
eventGenerator transaction.EventGenerator
eventMapper *EventMapper
}

func NewEventProcessor(
gateway *config.AgentConfig,
gateway *AgentConfigTraceability,
eventGenerator transaction.EventGenerator,
mapper *EventMapper,
) *EventProcessor {
Expand All @@ -45,23 +42,27 @@ func NewEventProcessor(

// ProcessRaw - process the received log entry and returns the event to be published to Amplifyingestion service
func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event {
var gatewayTrafficLogEntry GwTrafficLogEntry
err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry)
if err != nil {
log.Error(err.Error())
return nil
}
// var gatewayTrafficLogEntry GwTrafficLogEntry
// err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry)
// if err != nil {
// log.Error(err.Error())
// return nil
// }
fmt.Println(string(rawEvent))
// data := strings.Split(string(rawEvent), "|")
// fmt.Println(len(data))
// Map the log entry to log event structure expected by AmplifyCentral Observer
summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry)
if err != nil {
log.Error(err.Error())
return nil
}
// summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry)
// if err != nil {
// log.Error(err.Error())
// return nil
// }

events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil)
if err != nil {
log.Error(err.Error())
return nil
}
return events
// events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil)
// if err != nil {
// log.Error(err.Error())
// return nil
// }
// return events
return nil
}
67 changes: 67 additions & 0 deletions pkg/traceability/traceconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package traceability

import (
"fmt"
"os"

"github.com/Axway/agent-sdk/pkg/cmd/properties"
corecfg "github.com/Axway/agent-sdk/pkg/config"
)

var agentConfig *AgentConfigTraceability

const (
pathLogFile = "webmethods.logFile"
)

// SetConfig sets the global AgentConfig reference.
func SetConfig(newConfig *AgentConfigTraceability) {
agentConfig = newConfig
}

// GetConfig gets the AgentConfig
func GetConfig() *AgentConfigTraceability {
return agentConfig
}

// AgentConfig - represents the config for agent
type AgentConfigTraceability struct {
CentralConfig corecfg.CentralConfig `config:"central"`
WebMethodConfigTracability *WebMethodConfigTracability `config:"webmethod"`
}

// WebMethodConfig - represents the config for the Webmethods APIM
type WebMethodConfigTracability struct {
corecfg.IConfigValidator
AgentType corecfg.AgentType
LogFile string `config:"logFile"`
TLS corecfg.TLSConfig `config:"ssl"`
}

// ValidateCfg - Validates the gateway config
func (c *WebMethodConfigTracability) ValidateCfg() (err error) {

if c.LogFile == "" {
return fmt.Errorf("invalid Webmethods APIM configuration: logFile is not configured")
}

if c.AgentType == corecfg.TraceabilityAgent && c.LogFile != "" {
if _, err := os.Stat(c.LogFile); os.IsNotExist(err) {
return fmt.Errorf("invalid Webmethods APIM log path: path does not exist: %s", c.LogFile)
}
}
return
}

// AddConfigProperties - Adds the command properties needed for Webmethods agent
func AddConfigProperties(props properties.Properties) {
props.AddStringProperty(pathLogFile, "./logs/traffic.log", "Sample log file with traffic event from gateway")
}

// NewWebmothodsConfig - parse the props and create an Webmethods Configuration structure
func NewWebmothodsConfig(props properties.Properties, agentType corecfg.AgentType) *WebMethodConfigTracability {
return &WebMethodConfigTracability{
AgentType: agentType,
LogFile: props.StringPropertyValue(pathLogFile),
}
}
88 changes: 84 additions & 4 deletions pkg/traceability/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,90 @@ type GwTransaction struct {
ResponseBytes int `json:"responseByte"`
}

// # Transaction event logs
// 1 EVENT_TYPE
// 2 ROOT_CONTEXT
// 3 PARENT_CONTEXT
// 4 CURRENT_CONTEXT
// 5 UUID
// 6 SERVER_ID
// 7 EVENT_TIMESTAMP
// 8 CURRENT_TIMESTAMP
// 9 SESSION_ID
// 10 API_NAME
// 11 API_VERSION
// 12 TARGET_NAME
// 13 APPLICATION_NAME
// 14 APPLICATION_IP
// 15 APPLICATION_ID
// 16 RESPONSE
// 17 REQUEST
// 18 TOTAL_TIME
// 19 NATIVE_TIME
// 20 REQUEST_STATUS
// 21 OPERATION_NAME
// 22 NATIVE_ENDPOINT
// 23 PARTNER_ID
// 24 API_ID
// 25 SERVICE_NAME
// 26 REQ_HEADERS
// 27 QUERY_PARAM
// 28 RES_HEADERS
// 29 CORRELATIONID
// 30 ERROR_ORIGIN
// 31 CUSTOM
// 32 NATIVE_REQUEST_HEADERS
// 33 NATIVE_REQUEST_PAYLOAD
// 34 NATIVE_RESPONSE_HEADERS
// 35 NATIVE_RESPONSE_PAYLOAD
// 36 NATIVE_HTTP_METHOD GET
// 37 NATIVE_URL
// 38 EXTERNAL_CALLS
// 39 SOURCE_GATEWAY_NODE

// GwTrafficLogEntry - Represents the structure of log entry the agent will receive
type GwTrafficLogEntry struct {
TraceID string `json:"traceId"`
APIName string `json:"apiName"`
InboundTransaction GwTransaction `json:"inbound"`
OutboundTransaction GwTransaction `json:"outbound"`
TraceID string `json:"traceId"`
APIName string `json:"apiName"`
InboundTransaction GwTransaction `json:"inbound"`
OutboundTransaction GwTransaction `json:"outbound"`
EventType string
RootContext string
ParentContext string
CurrentContext string
Uuid string
ServerId string
EventTimestamp string
CurrentTimestamp string
SessionId string
ApiName string
ApiVersion string
TargetName string
ApplicationName string
ApplicationIp string
ApplicationId string
Request string
Response string
TotalTime int
NativeTime int
RequestStatus string
OperationName string
NatvieEndpoint string
PartnerId string
ApiId string
ServiceName string
RequestHeaders string
QueryParam string
ResponseHeaders string
CorrelationId string
ErrorOrigin string
Custom string
NativeRequestHeaders string
NativeRequestPayload string
NativeResponseHeaders string
NativeResponsePayload string
NativeHttpMethod string
NativeUrl string
ExternalCalls string
SourceGatewayNode string
}

0 comments on commit dc53f61

Please sign in to comment.