Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
traceability cahnges
Browse files Browse the repository at this point in the history
  • Loading branch information
rathnapandi committed May 10, 2023
1 parent dc53f61 commit d5378c3
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 93 deletions.
1 change: 0 additions & 1 deletion pkg/traceability/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Agent struct {
func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) {
eventChannel := make(chan string)
agentConfig := GetConfig()

generator := transaction.NewEventGenerator()
mapper := &EventMapper{}
processor := NewEventProcessor(agentConfig, generator, mapper)
Expand Down
95 changes: 49 additions & 46 deletions pkg/traceability/eventmapper.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package traceability

import (
"encoding/json"
"net/http"
"strconv"
"strings"
"time"

"github.com/Axway/agent-sdk/pkg/agent"
"github.com/Axway/agent-sdk/pkg/transaction"
transutil "github.com/Axway/agent-sdk/pkg/transaction/util"
"github.com/Axway/agent-sdk/pkg/util/log"
)

// EventMapper -
Expand All @@ -19,19 +18,12 @@ type EventMapper struct {
func (m *EventMapper) processMapping(gatewayTrafficLogEntry GwTrafficLogEntry) (*transaction.LogEvent, []transaction.LogEvent, error) {
centralCfg := agent.GetCentralConfig()

eventTime := time.Now().UnixNano() / int64(time.Millisecond)
txID := gatewayTrafficLogEntry.TraceID
txEventID := gatewayTrafficLogEntry.InboundTransaction.ID
txDetails := gatewayTrafficLogEntry.InboundTransaction
transInboundLogEventLeg, err := m.createTransactionEvent(eventTime, txID, txDetails, txEventID, "", "Inbound")
if err != nil {
return nil, nil, err
}

txEventID = gatewayTrafficLogEntry.OutboundTransaction.ID
txParentEventID := gatewayTrafficLogEntry.InboundTransaction.ID
txDetails = gatewayTrafficLogEntry.OutboundTransaction
transOutboundLogEventLeg, err := m.createTransactionEvent(eventTime, txID, txDetails, txEventID, txParentEventID, "Outbound")
eventTimestamp, _ := time.Parse(time.RFC3339, gatewayTrafficLogEntry.EventTimestamp)
eventTime := eventTimestamp.UTC().UnixNano() / int64(time.Millisecond)
//eventTime := time.Now().UTC().Format(gatewayTrafficLogEntry.EventTimestamp)
txID := gatewayTrafficLogEntry.Uuid
txEventID := gatewayTrafficLogEntry.CorrelationId
transInboundLogEventLeg, err := m.createTransactionEvent(eventTime, txID, gatewayTrafficLogEntry, txEventID, "Inbound")
if err != nil {
return nil, nil, err
}
Expand All @@ -43,15 +35,21 @@ func (m *EventMapper) processMapping(gatewayTrafficLogEntry GwTrafficLogEntry) (

return transSummaryLogEvent, []transaction.LogEvent{
*transInboundLogEventLeg,
*transOutboundLogEventLeg,
}, nil
}

func (m *EventMapper) getTransactionEventStatus(code int) transaction.TxEventStatus {
if code >= 400 {
func (m *EventMapper) getTransactionEventStatus(code string) transaction.TxEventStatus {
if code != "SUCCESS" {
return transaction.TxEventStatusFail
}
return transaction.TxEventStatusFail
return transaction.TxEventStatusPass
}

func (m *EventMapper) getHttpStatusCode(code string) int {
if code == "SUCCESS" {
return 200
}
return 500
}

func (m *EventMapper) getTransactionSummaryStatus(statusCode int) transaction.TxSummaryStatus {
Expand All @@ -66,25 +64,25 @@ func (m *EventMapper) getTransactionSummaryStatus(statusCode int) transaction.Tx
return transSummaryStatus
}

func (m *EventMapper) buildHeaders(headers map[string]string) string {
jsonHeader, err := json.Marshal(headers)
if err != nil {
log.Error(err.Error())
}
return string(jsonHeader)
}
func (m *EventMapper) createTransactionEvent(eventTime int64, txID string, txDetails GwTrafficLogEntry, eventID, direction string) (*transaction.LogEvent, error) {

httpStatus := m.getHttpStatusCode(txDetails.RequestStatus)

func (m *EventMapper) createTransactionEvent(eventTime int64, txID string, txDetails GwTransaction, eventID, parentEventID, direction string) (*transaction.LogEvent, error) {
host := txDetails.ServerId
port := 443
if strings.Index(host, ":") != -1 {
uris := strings.Split(host, ":")
host = uris[0]
port, _ = strconv.Atoi(uris[1])

}

httpProtocolDetails, err := transaction.NewHTTPProtocolBuilder().
SetURI(txDetails.URI).
SetMethod(txDetails.Method).
SetStatus(txDetails.StatusCode, http.StatusText(txDetails.StatusCode)).
SetHost(txDetails.SourceHost).
SetHeaders(m.buildHeaders(txDetails.RequestHeaders), m.buildHeaders(txDetails.ResponseHeaders)).
SetByteLength(txDetails.RequestBytes, txDetails.ResponseBytes).
SetRemoteAddress("", txDetails.DesHost, txDetails.DestPort).
SetLocalAddress(txDetails.SourceHost, txDetails.SourcePort).
SetURI(txDetails.OperationName).
SetMethod(txDetails.NativeHttpMethod).
SetStatus(httpStatus, http.StatusText(httpStatus)).
SetHost(host).
SetLocalAddress(host, port).
Build()
if err != nil {
return nil, err
Expand All @@ -94,30 +92,35 @@ func (m *EventMapper) createTransactionEvent(eventTime int64, txID string, txDet
SetTimestamp(eventTime).
SetTransactionID(txID).
SetID(eventID).
SetParentID(parentEventID).
SetSource(txDetails.SourceHost + ":" + strconv.Itoa(txDetails.SourcePort)).
SetDestination(txDetails.DesHost + ":" + strconv.Itoa(txDetails.DestPort)).
SetSource(txDetails.ServerId).
SetDirection(direction).
SetStatus(m.getTransactionEventStatus(txDetails.StatusCode)).
SetStatus(m.getTransactionEventStatus(txDetails.RequestStatus)).
SetProtocolDetail(httpProtocolDetails).
Build()
}

func (m *EventMapper) createSummaryEvent(eventTime int64, txID string, gatewayTrafficLogEntry GwTrafficLogEntry, teamID string) (*transaction.LogEvent, error) {
statusCode := gatewayTrafficLogEntry.InboundTransaction.StatusCode
method := gatewayTrafficLogEntry.InboundTransaction.Method
uri := gatewayTrafficLogEntry.InboundTransaction.URI
host := gatewayTrafficLogEntry.InboundTransaction.SourceHost
statusCode := m.getHttpStatusCode(gatewayTrafficLogEntry.RequestStatus)
method := gatewayTrafficLogEntry.NativeHttpMethod
uri := gatewayTrafficLogEntry.OperationName
host := gatewayTrafficLogEntry.ApplicationIp

return transaction.NewTransactionSummaryBuilder().
builder := transaction.NewTransactionSummaryBuilder().
SetTimestamp(eventTime).
SetTransactionID(txID).
SetStatus(m.getTransactionSummaryStatus(statusCode), strconv.Itoa(statusCode)).
SetTeam(teamID).
SetDuration(gatewayTrafficLogEntry.TotalTime).
SetEntryPoint("http", method, uri, host).
// If the API is published to Central as unified catalog item/API service, se the Proxy details with the API definition
// The Proxy.Name represents the name of the API
// The Proxy.ID should be of format "remoteApiId_<ID Of the API on remote gateway>". Use transaction.FormatProxyID(<ID Of the API on remote gateway>) to get the formatted value.
SetProxy(transutil.FormatProxyID(gatewayTrafficLogEntry.APIName), gatewayTrafficLogEntry.APIName, 0).
Build()
SetProxy(transutil.FormatProxyID(gatewayTrafficLogEntry.ApiName), gatewayTrafficLogEntry.ApiName, 0)

if gatewayTrafficLogEntry.ApplicationName != "Unknown" && gatewayTrafficLogEntry.ApplicationId != "Unknown" {
builder.SetApplication(gatewayTrafficLogEntry.ApplicationId, gatewayTrafficLogEntry.ApplicationName)
}

return builder.Build()

}
91 changes: 68 additions & 23 deletions pkg/traceability/eventprocessor.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package traceability

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/Axway/agent-sdk/pkg/transaction"

"github.com/elastic/beats/v7/libbeat/beat"

"github.com/Axway/agent-sdk/pkg/util/log"
)

type Processor interface {
Expand Down Expand Up @@ -42,27 +46,68 @@ func NewEventProcessor(

// ProcessRaw - process the received log entry and returns the event to be published to Amplifyingestion service
func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event {
// var gatewayTrafficLogEntry GwTrafficLogEntry
// err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry)
// if err != nil {
// log.Error(err.Error())
// return nil
// }
fmt.Println(string(rawEvent))
// data := strings.Split(string(rawEvent), "|")
// fmt.Println(len(data))
// Map the log entry to log event structure expected by AmplifyCentral Observer
// summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry)
// if err != nil {
// log.Error(err.Error())
// return nil
// }
var gatewayTrafficLogEntry GwTrafficLogEntry
data := strings.Split(string(rawEvent), "|")
if len(data) == 39 && data[0] == "#AGW_EVENT_TXN" {
totalTime, _ := strconv.Atoi(data[17])
nativeTime, _ := strconv.Atoi(data[18])
gatewayTrafficLogEntry = GwTrafficLogEntry{
EventType: data[0],
RootContext: data[1],
ParentContext: data[2],
CurrentContext: data[3],
Uuid: data[4],
ServerId: data[5],
EventTimestamp: data[6],
CurrentTimestamp: data[7],
SessionId: data[8],
ApiName: data[9],
ApiVersion: data[10],
TargetName: data[11],
ApplicationName: data[12],
ApplicationIp: data[13],
ApplicationId: data[14],
Request: data[15],
Response: data[16],
TotalTime: totalTime,
NativeTime: nativeTime,
RequestStatus: data[19],
OperationName: data[20],
NatvieEndpoint: data[21],
PartnerId: data[22],
ApiId: data[23],
ServiceName: data[24],
RequestHeaders: data[25],
QueryParam: data[26],
ResponseHeaders: data[27],
CorrelationId: data[28],
ErrorOrigin: data[29],
Custom: data[30],
NativeRequestHeaders: data[31],
NativeRequestPayload: data[32],
NativeResponseHeaders: data[33],
NativeResponsePayload: data[34],
NativeHttpMethod: data[35],
NativeUrl: data[36],
ExternalCalls: data[37],
SourceGatewayNode: data[38],
}
} else {
log.Errorf("Invalid record %s", string(rawEvent))
return nil
}

// events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil)
// if err != nil {
// log.Error(err.Error())
// return nil
// }
// return events
return nil
//Map the log entry to log event structure expected by AmplifyCentral Observer
summaryEvent, logEvents, err := ep.eventMapper.processMapping(gatewayTrafficLogEntry)
if err != nil {
log.Error(err.Error())
return nil
}

events, err := ep.eventGenerator.CreateEvents(*summaryEvent, logEvents, time.Now(), nil, nil, nil)
if err != nil {
log.Error(err.Error())
return nil
}
return events
}
23 changes: 0 additions & 23 deletions pkg/traceability/types.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,5 @@
package traceability

// Headers - Type for request/response headers
type Headers map[string]string

// GwTransaction - Type for gateway transaction detail
type GwTransaction struct {
ID string `json:"id"`
SourceHost string `json:"srcHost"`
SourcePort int `json:"srcPort"`
DesHost string `json:"destHost"`
DestPort int `json:"destPort"`
URI string `json:"uri"`
Method string `json:"method"`
StatusCode int `json:"statusCode"`
RequestHeaders Headers `json:"requestHeaders"`
ResponseHeaders Headers `json:"responseHeaders"`
RequestBytes int `json:"requestByte"`
ResponseBytes int `json:"responseByte"`
}

// # Transaction event logs
// 1 EVENT_TYPE
// 2 ROOT_CONTEXT
Expand Down Expand Up @@ -62,10 +43,6 @@ type GwTransaction struct {

// GwTrafficLogEntry - Represents the structure of log entry the agent will receive
type GwTrafficLogEntry struct {
TraceID string `json:"traceId"`
APIName string `json:"apiName"`
InboundTransaction GwTransaction `json:"inbound"`
OutboundTransaction GwTransaction `json:"outbound"`
EventType string
RootContext string
ParentContext string
Expand Down

0 comments on commit d5378c3

Please sign in to comment.