Skip to content

Commit

Permalink
Changed goflow decoding to match new goflow version
Browse files Browse the repository at this point in the history
  • Loading branch information
ynHuber committed Feb 26, 2025
1 parent 2f0b000 commit 949565e
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions segments/input/goflow/goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package goflow

import (
"bytes"
"context"
"fmt"
"log"
"log/slog"
"net/url"
Expand All @@ -17,18 +17,22 @@ import (

"github.com/BelWue/flowpipeline/pb"
"github.com/BelWue/flowpipeline/segments"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/encoding/protodelim"

_ "github.com/netsampler/goflow2/v2/format/binary"
"github.com/netsampler/goflow2/v2/utils/debug"

"github.com/netsampler/goflow2/v2/transport"
_ "github.com/netsampler/goflow2/v2/transport/file"
_ "github.com/netsampler/goflow2/v2/transport/kafka"

"github.com/netsampler/goflow2/v2/metrics"
rawproducer "github.com/netsampler/goflow2/v2/producer/raw"
"github.com/netsampler/goflow2/v2/utils"

// various formatters
"github.com/netsampler/goflow2/v2/format"
_ "github.com/netsampler/goflow2/v2/format/binary"

protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
)

type Goflow struct {
Expand Down Expand Up @@ -127,13 +131,13 @@ type channelDriver struct {
}

func (d *channelDriver) Send(key, data []byte) error {
msg := &pb.EnrichedFlow{}
// TODO: can we shave of this Unmarshal here and the Marshal in line 138
if err := proto.Unmarshal(data, msg); err != nil {
msg := new(pb.ProtoProducerMessage)
// TODO: can we shave of this Unmarshal here by writing a custom formatter
if err := protodelim.UnmarshalFrom(bytes.NewReader(data), msg); err != nil {
log.Println("[error] Goflow: Conversion error for received flow.")
return nil
}
d.out <- msg
d.out <- &msg.EnrichedFlow
return nil
}

Expand All @@ -142,21 +146,12 @@ func (d *channelDriver) Close(context.Context) error {
return nil
}

type myProtobufDriver struct {
}

func (d *myProtobufDriver) Format(data interface{}) ([]byte, []byte, error) {
msg, ok := data.(proto.Message)
if !ok {
return nil, nil, fmt.Errorf("message is not protobuf")
}
// TODO: can we shave of this Marshal here and the Unmarshal in line 116
b, err := proto.Marshal(msg)
return nil, b, err
}

func (segment *Goflow) startGoFlow(transport transport.TransportInterface) {
formatter := &myProtobufDriver{}
formatter, err := format.FindFormat("bin")
if err != nil {
slog.Error("error formatter", slog.String("error", err.Error()))
os.Exit(1)
}
var pipes []utils.FlowPipe

for _, listenAddrUrl := range segment.Listen {
Expand Down Expand Up @@ -192,10 +187,21 @@ func (segment *Goflow) startGoFlow(transport transport.TransportInterface) {
os.Exit(1)
}

var cfgProducer = &protoproducer.ProducerConfig{}
cfgm, err := cfgProducer.Compile()
if err != nil {
log.Fatal(err)
}
flowProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem)
if err != nil {
slog.Error("error producer", slog.String("error", err.Error()))
os.Exit(1)
}

cfgPipe := &utils.PipeConfig{
Format: formatter,
Transport: transport,
Producer: &rawproducer.RawProducer{},
Producer: flowProducer,
NetFlowTemplater: metrics.NewDefaultPromTemplateSystem, // wrap template system to get Prometheus info
}

Expand Down

0 comments on commit 949565e

Please sign in to comment.