Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
traceability agent based on filebeat
Browse files Browse the repository at this point in the history
  • Loading branch information
rathnapandi committed May 13, 2023
1 parent d5378c3 commit c8fab35
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 279 deletions.
12 changes: 8 additions & 4 deletions pkg/cmd/traceability/root.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package traceability

import (
"fmt"

corecmd "github.com/Axway/agent-sdk/pkg/cmd"
"github.com/Axway/agent-sdk/pkg/cmd/service"
corecfg "github.com/Axway/agent-sdk/pkg/config"
"github.com/Axway/agents-webmethods/pkg/config"
"github.com/Axway/agents-webmethods/pkg/traceability"

libcmd "github.com/elastic/beats/v7/libbeat/cmd"
Expand All @@ -16,6 +17,7 @@ var RootCmd corecmd.AgentRootCmd
var beatCmd *libcmd.BeatsRootCmd

func init() {
fmt.Println("initanliing root")
name := "webmethods_traceability_agent"
settings := instance.Settings{
Name: name,
Expand All @@ -24,7 +26,7 @@ func init() {
}

// Initialize the beat command
beatCmd = libcmd.GenRootCmdWithSettings(traceability.NewBeater, settings)
beatCmd = libcmd.GenRootCmdWithSettings(traceability.New, settings)
cmd := beatCmd.Command
// Wrap the beat command with the agent command processor with callbacks to initialize the agent config and command execution.
// The first parameter identifies the name of the yaml file that agent will look for to load the config
Expand All @@ -36,12 +38,14 @@ func init() {
run,
corecfg.TraceabilityAgent,
)
config.AddConfigProperties(RootCmd.GetProperties())
RootCmd.AddCommand(service.GenServiceCmd("pathConfig"))
traceability.AddConfigProperties(RootCmd.GetProperties())
RootCmd.AddCommand(service.GenServiceCmd("path.Config"))
}

// Callback that agent will call to process the execution
func run() error {
fmt.Println("initanliing root")

return beatCmd.Execute()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "fmt"

const (
AppID = "appID"
AttrAPIID = "sampleApiId"
AttrAPIID = "apiId"
AttrChecksum = "checksum"
AttrAppID = "webmethodsApplicationId"
)
Expand Down
28 changes: 28 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package errors

import "github.com/Axway/agent-sdk/pkg/util/errors"

// Error definitions
var (
ErrGatewayConfig = errors.Newf(3500, "error apigateway.getHeaders is set to true and apigateway.%s not set in config")

ErrV7HealthcheckHost = errors.Newf(3501, "%s Failed. Error communicating with API Gateway: %s. Check API Gateway host configuration values")
ErrV7HealthcheckAuth = errors.Newf(3502, "%s Failed. Error sending request to API Gateway. HTTP response code %v. Check API Gateway authentication configuration values")

// Event Processing
ErrEventNoMsg = errors.Newf(3510, "the log event had no message field: %s")
ErrEventMsgStructure = errors.Newf(3511, "could not parse the log event: %s")
ErrTrxnDataGet = errors.New(3512, "could not retrieve the transaction data from API Gateway")
ErrTrxnDataProcess = errors.New(3513, "could not process the transaction data")
ErrTrxnHeaders = errors.Newf(3514, "could not process the transaction headers: %s")
ErrProtocolStructure = errors.Newf(3515, "could not parse the %s transaction details: %s")
ErrCreateCondorEvent = errors.New(3516, "error creating the Amplify Visibility event")
ErrNotHTTPService = errors.New(3517, "the event processor can only handle http services")
ErrWrongProcessor = errors.Newf(3518, "the message looks to be from the %s log but open traffic process is %s, check OPENTRAFFIC_LOG_INPUT")
ErrInvalidInputConfig = errors.Newf(3519, "input configuration is not valid: %s")
ErrConfigFile = errors.New(3520, "could not find the 'traceability_agent' section in the configuration file")

// API Gateway Communication
ErrAPIGWRequest = errors.New(3530, "error encountered sending a request to API Gateway")
ErrAPIGWResponse = errors.Newf(3531, "unexpected HTTP response code %v in response from API Gateway")
)
134 changes: 67 additions & 67 deletions pkg/traceability/agent.go
Original file line number Diff line number Diff line change
@@ -1,93 +1,93 @@
package traceability

import (
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"path/filepath"
"strings"

coreagent "github.com/Axway/agent-sdk/pkg/agent"
"github.com/Axway/agent-sdk/pkg/transaction"
agenterrors "github.com/Axway/agent-sdk/pkg/util/errors"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
"github.com/elastic/beats/v7/filebeat/beater"
fbcfg "github.com/elastic/beats/v7/filebeat/config"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

// Agent - Webmethods Beater configuration. Implements the beat.Beater interface.
type Agent struct {
client beat.Client
doneCh chan struct{}
eventChannel chan string
eventProcessor Processor
webmethods Emitter
}
"github.com/elastic/beats/v7/libbeat/logp"

// NewBeater creates an instance of webmethods_traceability_agent.
func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) {
eventChannel := make(chan string)
agentConfig := GetConfig()
generator := transaction.NewEventGenerator()
mapper := &EventMapper{}
processor := NewEventProcessor(agentConfig, generator, mapper)
emitter := NewWebmethodsEventEmitter(agentConfig.WebMethodConfigTracability.LogFile, eventChannel)
"github.com/Axway/agent-sdk/pkg/traceability"
localerrors "github.com/Axway/agents-webmethods/pkg/errors"
)

return newAgent(processor, emitter, eventChannel)
}
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
fmt.Println("beater")
err := validateInput(cfg)
if err != nil {
return nil, localerrors.ErrInvalidInputConfig.FormatError(err.Error())
}

func newAgent(
processor Processor,
emitter Emitter,
eventChannel chan string,
) (*Agent, error) {
a := &Agent{
doneCh: make(chan struct{}),
eventChannel: eventChannel,
eventProcessor: processor,
webmethods: emitter,
agentCfg := GetConfig()
if agentCfg == nil {
return nil, localerrors.ErrConfigFile
}
eventProcessor := NewEventProcessor(agentCfg, traceability.GetMaxRetries())
traceability.SetOutputEventProcessor(eventProcessor)

// Validate that all necessary services are up and running. If not, return error
if hc.RunChecks() != hc.OK {
return nil, agenterrors.ErrInitServicesNotReady
factory := func(beat.Info, *logp.Logger, beater.StateStore) []v2.Plugin {
return []v2.Plugin{}
}

return a, nil
// Initialize the filebeat to read events
creator := beater.New(factory)
return creator(b, cfg)
}

// Run starts the webmethods traceability agent.
func (a *Agent) Run(b *beat.Beat) error {
coreagent.OnConfigChange(a.onConfigChange)

var err error
a.client, err = b.Publisher.Connect()
func validateInput(cfg *common.Config) error {
filebeatConfig := fbcfg.DefaultConfig
err := cfg.Unpack(&filebeatConfig)
if err != nil {
coreagent.UpdateStatus(coreagent.AgentFailed, err.Error())
return err
}

go a.webmethods.Start()

gracefulStop := make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGTERM, os.Interrupt)
if len(filebeatConfig.Inputs) == 0 {
return errors.New("no inputs configured")
}

for {
select {
case <-a.doneCh:
return a.client.Close()
case <-gracefulStop:
return a.client.Close()
case event := <-a.eventChannel:
eventsToPublish := a.eventProcessor.ProcessRaw([]byte(event))
a.client.PublishAll(eventsToPublish)
inputsEnabled := 0
for _, input := range filebeatConfig.Inputs {
inputConfig := struct {
Enabled bool `config:"enabled"`
Paths []string `config:"paths"`
}{}
input.Unpack(&inputConfig)
if inputConfig.Enabled {
inputsEnabled++
err = validateInputPaths(inputConfig.Paths)
if err != nil {
return err
}
}
}
return nil
}

// onConfigChange apply configuration changes
func (a *Agent) onConfigChange() {
}

// Stop stops the agent.
func (a *Agent) Stop() {
a.doneCh <- struct{}{}
func validateInputPaths(paths []string) error {
foundPath := false
for _, path := range paths {
path = strings.TrimSpace(path)
if path != "" {
parentDir := filepath.Dir(path)
fileInfo, err := os.Stat(parentDir)
if err != nil {
return err
}
if !fileInfo.IsDir() {
return errors.New("invalid path " + path)
}
foundPath = true
}
}
if !foundPath {
return errors.New("no paths were defined for input processing")
}
return nil
}
126 changes: 0 additions & 126 deletions pkg/traceability/eventmapper.go

This file was deleted.

Loading

0 comments on commit c8fab35

Please sign in to comment.