Skip to content

Commit

Permalink
Add in memory serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
thegodenage committed Apr 13, 2024
1 parent 066aea5 commit dedb184
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 23 deletions.
17 changes: 12 additions & 5 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,31 @@ package main
import (
"context"
"log"
"time"
"waffle/internal/packet"

"waffle/internal/worker"
)

const networkInterfaceDescription = "WAN Miniport (Network Monitor)"
const networkInterfaceDescription = "Intel(R) I211 Gigabit Network Connection"

func main() {
ctx := context.Background()

log.Println("starting collector")

inMemoryPacketSerializer := packet.NewMemoryPacketSerializer(time.Minute * 5)

// NEXT TODO: add BPF filter builder
// https://www.ibm.com/docs/en/qsip/7.4?topic=queries-berkeley-packet-filters
collector := worker.NewCollector(worker.CollectorConfig{
Protocol: "ip",
Port: "8080",
}, packet.NewWindowsNetworkInterfaceProvider(networkInterfaceDescription))
cfg := worker.CollectorConfig{
BPF: "ip",
}

collector := worker.NewCollector(
cfg,
packet.NewWindowsNetworkInterfaceProvider(networkInterfaceDescription),
inMemoryPacketSerializer)

if err := collector.Run(ctx); err != nil {
panic(err.Error())
Expand Down
8 changes: 6 additions & 2 deletions internal/packet/windows.go → internal/packet/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package packet
import (
"errors"
"fmt"
"strings"

"github.com/google/gopacket/pcap"

Expand Down Expand Up @@ -33,11 +34,14 @@ func (w *WindowsNetworkInterfaceProvider) GetNetworkInterface() (*pcap.Interface
return nil, fmt.Errorf("find all network interfaces, %w", err)
}

for _, netInterface := range interfaces {
devicesDescriptions := make([]string, len(interfaces))
for i, netInterface := range interfaces {
devicesDescriptions[i] = netInterface.Description

if netInterface.Description == w.interfaceDescription {
return &netInterface, nil
}
}

return nil, ErrNetworkInterfaceNotFound
return nil, fmt.Errorf("%w, available devices descriptions: %s", ErrNetworkInterfaceNotFound, strings.Join(devicesDescriptions, " ; "))
}
112 changes: 112 additions & 0 deletions internal/packet/memserializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package packet

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/google/gopacket"

"waffle/internal/worker"
)

// MemoryPacketSerializer is used to temp serialize
// packets in the memory, this way we can use this
// serialized packets to validate against DDOS attacks
type MemoryPacketSerializer struct {
//ttl of the packets
ttl time.Duration
addrPacketsMap map[string][]*inMemoryPacket

mu sync.Mutex
}

var _ worker.PacketSerializer = (*MemoryPacketSerializer)(nil)

// NewMemoryPacketSerializer creates new memory packet serializer.
//
// ttl - ttl of the packet, after this time packets are cleaned
// from the memory.
func NewMemoryPacketSerializer(ttl time.Duration) *MemoryPacketSerializer {
mps := &MemoryPacketSerializer{
ttl: ttl,
addrPacketsMap: make(map[string][]*inMemoryPacket),
}

go mps.runCleaner()

return mps
}

// SerializePackets is used to serialize packets in memory. Those packets are then removed after TTL in the another
// goroutine that is created when new instance of MemoryPacketSerializer is created by NewMemoryPacketSerializer.
func (m *MemoryPacketSerializer) SerializePackets(ctx context.Context, packetsChan <-chan gopacket.Packet) error {
for {
select {
case packet, ok := <-packetsChan:
if !ok {
return nil
}

func() {
m.mu.Lock()
defer m.mu.Unlock()

id := packet.NetworkLayer().NetworkFlow().Src().String()

inMemPacket := &inMemoryPacket{
obtainedAt: time.Now(),
packet: packet,
}

v, ok := m.addrPacketsMap[id]
if !ok {
m.addrPacketsMap[id] = []*inMemoryPacket{inMemPacket}
return
}

v = append(v, inMemPacket)

m.addrPacketsMap[id] = v
}()
case <-ctx.Done():
return nil
}
}
}

func (m *MemoryPacketSerializer) runCleaner() {
for {
// for now let's go with two minutes, but I guess it should be adjusted later
time.Sleep(time.Minute * 2)

log.Println("started cleaning the serialized packets")

now := time.Now()

m.mu.Lock()

// each few minutes we are clearing map from the packets that are no longer relevant
for k, v := range m.addrPacketsMap {
for i, packet := range v {
if packet.obtainedAt.Add(m.ttl).After(time.Now()) {
// TODO: slice out of bounds, we need to gather id, and which packets to remove and then remove those in the other loop
m.addrPacketsMap[k] = append(m.addrPacketsMap[k][:i], m.addrPacketsMap[k][i+1:]...)
}
}
}

m.mu.Unlock()

elapsed := time.Since(now)

log.Println(fmt.Sprintf("serialzied packets clean done. Elapsed: %d", elapsed))
}
}

type inMemoryPacket struct {
obtainedAt time.Time
packet gopacket.Packet
}
45 changes: 29 additions & 16 deletions internal/worker/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,47 @@ package worker
import (
"context"
"fmt"
"log"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
"log"
)

type PacketSerializer interface {
SerializePackets(ctx context.Context, packetsChan chan<- gopacket.Packet) error
SerializePackets(ctx context.Context, packetsChan <-chan gopacket.Packet) error
}

type NetworkInterfaceProvider interface {
GetNetworkInterface() (*pcap.Interface, error)
}

type CollectorConfig struct {
Protocol string
Port string
// BPF is a filter to filter out desired packets. IMPORTANT filter should only read the incoming packets.
BPF string
}

// Collector collects packets from the network interfaces
// on the device it works (it could be a PC, server).
// Then based on the BPF it filters the packets.
type Collector struct {
cfg *CollectorConfig
serializer PacketSerializer
netInterfaceProvider NetworkInterfaceProvider
}

func NewCollector(cfg CollectorConfig, deviceProvider NetworkInterfaceProvider) *Collector {
// NewCollector creates a new collector.
//
// cfg - configuration of the collector
// networkInterfaceProvider - provider of the network interface
// serializer - serializer used to serialize packets.
func NewCollector(
cfg CollectorConfig,
networkInterfaceProvider NetworkInterfaceProvider,
serializer PacketSerializer,
) *Collector {
return &Collector{
cfg: &cfg,
netInterfaceProvider: deviceProvider,
netInterfaceProvider: networkInterfaceProvider,
serializer: serializer,
}
}

Expand All @@ -46,33 +58,34 @@ func (c *Collector) Run(ctx context.Context) error {
return fmt.Errorf("pcap open live: %w", err)
}

if err := handle.SetBPFFilter("ip"); err != nil {
return fmt.Errorf("set BPFF filter: %w", err)
if err := handle.SetBPFFilter(c.cfg.BPF); err != nil {
return fmt.Errorf("set BPF filter: %w", err)
}

packetSource := gopacket.NewPacketSource(handle, handle.LinkType())

packetsChan := make(chan gopacket.Packet)

defer func() {
close(packetsChan)

log.Println("collector closed")
}()

//go func() {
// if err := c.serializer.SerializePackets(ctx, packetsChan); err != nil {
// log.Println("error in serialize packets")
// }
//}()
go func() {
if err := c.serializer.SerializePackets(ctx, packetsChan); err != nil {
log.Println("error in serialize packets")
}
}()

for {
select {
case packet, ok := <-packetSource.Packets():
if !ok {
log.Println("error reading packet")
}
log.Println(packet.String())
//packetsChan <- packet

packetsChan <- packet

case <-ctx.Done():
return nil
Expand Down

0 comments on commit dedb184

Please sign in to comment.