Skip to content

Commit

Permalink
chore(decoders.sflow): Cleanup constant definitions and use switch st…
Browse files Browse the repository at this point in the history
…atements
  • Loading branch information
srebhan committed May 23, 2024
1 parent 7265159 commit cbea0d6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
119 changes: 81 additions & 38 deletions decoders/sflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,48 @@ import (
"github.com/netsampler/goflow2/v2/decoders/utils"
)

// Opaque sample_data types according to https://sflow.org/SFLOW-DATAGRAM5.txt
const (
SAMPLE_FORMAT_FLOW = 1
SAMPLE_FORMAT_COUNTER = 2
SAMPLE_FORMAT_EXPANDED_FLOW = 3
SAMPLE_FORMAT_EXPANDED_COUNTER = 4
)

// Opaque flow_data types according to https://sflow.org/SFLOW-STRUCTS5.txt
const (
FLOW_TYPE_RAW = 1
FLOW_TYPE_ETH = 2
FLOW_TYPE_IPV4 = 3
FLOW_TYPE_IPV6 = 4
FLOW_TYPE_EXT_SWITCH = 1001
FLOW_TYPE_EXT_ROUTER = 1002
FLOW_TYPE_EXT_GATEWAY = 1003
FLOW_TYPE_EXT_USER = 1004
FLOW_TYPE_EXT_URL = 1005
FLOW_TYPE_EXT_MPLS = 1006
FLOW_TYPE_EXT_NAT = 1007
FLOW_TYPE_EXT_MPLS_TUNNEL = 1008
FLOW_TYPE_EXT_MPLS_VC = 1009
FLOW_TYPE_EXT_MPLS_FEC = 1010
FLOW_TYPE_EXT_MPLS_LVP_FEC = 1011
FLOW_TYPE_EXT_VLAN_TUNNEL = 1012
)

// Opaque counter_data types according to https://sflow.org/SFLOW-STRUCTS5.txt
const (
COUNTER_TYPE_IF = 1
COUNTER_TYPE_ETH = 2
COUNTER_TYPE_TOKENRING = 3
COUNTER_TYPE_VG = 4
COUNTER_TYPE_VLAN = 5
COUNTER_TYPE_CPU = 1001
)

// Deprecated: These definitions are mixing sample-format, flow-data type and
// counter-data type definitions and are left here for compatibility only.
//
// Use the sample-format or flow/counter type definitions instead!
const (
FORMAT_EXT_SWITCH = 1001
FORMAT_EXT_ROUTER = 1002
Expand Down Expand Up @@ -84,7 +126,7 @@ func DecodeCounterRecord(header *RecordHeader, payload *bytes.Buffer) (CounterRe
Header: *header,
}
switch header.DataFormat {
case 1:
case COUNTER_TYPE_IF:
var ifCounters IfCounters
if err := utils.BinaryDecoder(payload,
&ifCounters.IfIndex,
Expand All @@ -110,7 +152,7 @@ func DecodeCounterRecord(header *RecordHeader, payload *bytes.Buffer) (CounterRe
return counterRecord, &RecordError{header.DataFormat, err}
}
counterRecord.Data = ifCounters
case 2:
case COUNTER_TYPE_ETH:
var ethernetCounters EthernetCounters
if err := utils.BinaryDecoder(payload,
&ethernetCounters.Dot3StatsAlignmentErrors,
Expand Down Expand Up @@ -145,14 +187,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}
var err error
switch header.DataFormat {
case FORMAT_EXT_SWITCH:
extendedSwitch := ExtendedSwitch{}
err := utils.BinaryDecoder(payload, &extendedSwitch.SrcVlan, &extendedSwitch.SrcPriority, &extendedSwitch.DstVlan, &extendedSwitch.DstPriority)
if err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = extendedSwitch
case FORMAT_RAW_PKT:
case FLOW_TYPE_RAW:
sampledHeader := SampledHeader{}
if err := utils.BinaryDecoder(payload,
&sampledHeader.Protocol,
Expand All @@ -164,7 +199,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}
sampledHeader.HeaderData = payload.Bytes()
flowRecord.Data = sampledHeader
case FORMAT_IPV4:
case FLOW_TYPE_IPV4:
sampledIP := SampledIPv4{
SampledIPBase: SampledIPBase{
SrcIP: make([]byte, 4),
Expand All @@ -184,7 +219,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = sampledIP
case FORMAT_IPV6:
case FLOW_TYPE_IPV6:
sampledIP := SampledIPv6{
SampledIPBase: SampledIPBase{
SrcIP: make([]byte, 16),
Expand All @@ -204,7 +239,14 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = sampledIP
case FORMAT_EXT_ROUTER:
case FLOW_TYPE_EXT_SWITCH:
extendedSwitch := ExtendedSwitch{}
err := utils.BinaryDecoder(payload, &extendedSwitch.SrcVlan, &extendedSwitch.SrcPriority, &extendedSwitch.DstVlan, &extendedSwitch.DstPriority)
if err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = extendedSwitch
case FLOW_TYPE_EXT_ROUTER:
extendedRouter := ExtendedRouter{}
if extendedRouter.NextHopIPVersion, extendedRouter.NextHop, err = DecodeIP(payload); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
Expand All @@ -216,7 +258,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = extendedRouter
case FORMAT_EXT_GATEWAY:
case FLOW_TYPE_EXT_GATEWAY:
extendedGateway := ExtendedGateway{}
if extendedGateway.NextHopIPVersion, extendedGateway.NextHop, err = DecodeIP(payload); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
Expand Down Expand Up @@ -295,33 +337,35 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
return sample, fmt.Errorf("header seq [%w]", err)
}
seq := header.SampleSequenceNumber
if format == FORMAT_RAW_PKT || format == FORMAT_ETH {
switch format {
case SAMPLE_FORMAT_FLOW, SAMPLE_FORMAT_COUNTER:
// Interlaced data-source format
var sourceId uint32
if err := utils.BinaryDecoder(payload, &sourceId); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)}
}

header.SourceIdType = sourceId >> 24
header.SourceIdValue = sourceId & 0x00ffffff
} else if format == FORMAT_IPV4 || format == FORMAT_IPV6 {
case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER:
// Explicit data-source format
if err := utils.BinaryDecoder(payload,
&header.SourceIdType,
&header.SourceIdValue,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)}
}
} else {
default:
return sample, &FlowError{format, seq, fmt.Errorf("unknown format %d", format)}
}

var recordsCount uint32
var flowSample FlowSample
var counterSample CounterSample
var expandedFlowSample ExpandedFlowSample
if format == FORMAT_RAW_PKT {
flowSample = FlowSample{
Header: *header,
}
switch format {
case SAMPLE_FORMAT_FLOW:
flowSample.Header = *header
if err := utils.BinaryDecoder(payload,
&flowSample.SamplingRate,
&flowSample.SamplePool,
Expand All @@ -338,23 +382,19 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
}
flowSample.Records = make([]FlowRecord, recordsCount) // max size of 1000 for protection
sample = flowSample
} else if format == FORMAT_ETH || format == FORMAT_IPV6 {
if err := utils.BinaryDecoder(payload, &recordsCount); err != nil {
case SAMPLE_FORMAT_COUNTER, SAMPLE_FORMAT_EXPANDED_COUNTER:
counterSample.Header = *header
if err := utils.BinaryDecoder(payload, &counterSample.CounterRecordsCount); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("eth [%w]", err)}
}
recordsCount = counterSample.CounterRecordsCount
if recordsCount > 1000 { // protection against ddos
return sample, &FlowError{format, seq, fmt.Errorf("too many flow records: %d", recordsCount)}
}
counterSample = CounterSample{
Header: *header,
CounterRecordsCount: recordsCount,
}
counterSample.Records = make([]CounterRecord, recordsCount) // max size of 1000 for protection
sample = counterSample
} else if format == FORMAT_IPV4 {
expandedFlowSample = ExpandedFlowSample{
Header: *header,
}
case SAMPLE_FORMAT_EXPANDED_FLOW:
expandedFlowSample.Header = *header
if err := utils.BinaryDecoder(payload,
&expandedFlowSample.SamplingRate,
&expandedFlowSample.SamplePool,
Expand Down Expand Up @@ -383,22 +423,25 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
break
}
recordReader := bytes.NewBuffer(payload.Next(int(recordHeader.Length)))
if format == FORMAT_RAW_PKT || format == FORMAT_IPV4 {
switch format {
case SAMPLE_FORMAT_FLOW:
record, err := DecodeFlowRecord(&recordHeader, recordReader)
if err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)}
}
if format == FORMAT_RAW_PKT {
flowSample.Records[i] = record
} else if format == FORMAT_IPV4 {
expandedFlowSample.Records[i] = record
}
} else if format == FORMAT_ETH || format == FORMAT_IPV6 {
flowSample.Records[i] = record
case SAMPLE_FORMAT_COUNTER, SAMPLE_FORMAT_EXPANDED_COUNTER:
record, err := DecodeCounterRecord(&recordHeader, recordReader)
if err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("counter [%w]", err)}
}
counterSample.Records[i] = record
case SAMPLE_FORMAT_EXPANDED_FLOW:
record, err := DecodeFlowRecord(&recordHeader, recordReader)
if err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)}
}
expandedFlowSample.Records[i] = record
}
}
return sample, nil
Expand Down
16 changes: 6 additions & 10 deletions decoders/sflow/sflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,11 @@ func TestSFlowDecode(t *testing.T) {
}
buf := bytes.NewBuffer(data)
var packet Packet
assert.Nil(t, DecodeMessageVersion(buf, &packet))
assert.NoError(t, DecodeMessageVersion(buf, &packet))
}

func TestExpandedSFlowDecode(t *testing.T) {
data := getExpandedSFlowDecode()

buf := bytes.NewBuffer(data)
var packet Packet
assert.Nil(t, DecodeMessageVersion(buf, &packet))
}

func getExpandedSFlowDecode() []byte {
return []byte{
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x03, 0x04, 0x00, 0x00, 0x00, 0x00,
0x0f, 0xa7, 0x72, 0xc2, 0x0f, 0x76, 0x73, 0x48, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0xdc, 0x20, 0x90, 0x93, 0x26, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0xa4,
Expand Down Expand Up @@ -131,4 +123,8 @@ func getExpandedSFlowDecode() []byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
}

0 comments on commit cbea0d6

Please sign in to comment.