Skip to content

Commit

Permalink
added goroutine to ready the events from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
kumari-anupam committed Apr 22, 2024
1 parent 60ce8ad commit 1be78b7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,3 @@ require (
k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
)

replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240314095358-bd4d5419e74a
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andylibrian/tarian-detector v0.0.0-20240314095358-bd4d5419e74a h1:8nYnMuaN3B0iflTDJrjh1SOQUijvIR4Qw+0Q+GHQYYk=
github.com/andylibrian/tarian-detector v0.0.0-20240314095358-bd4d5419e74a/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -280,6 +278,8 @@ github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/intelops/tarian-detector v0.0.0-20240226164335-7701e4e67daa h1:ExaZjScIYDDIfCOygau+d09cvJdJdrWEN3yfHdehgbE=
github.com/intelops/tarian-detector v0.0.0-20240226164335-7701e4e67daa/go.mod h1:u7VW9+KOi2ujvIevz/LtfaXkjfkBp7BKgGuPcSq814E=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
51 changes: 29 additions & 22 deletions pkg/server/ingestion_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,40 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu
}

// Start starts the IngestionWorker, continuously processing messages from the ingestion queue.
//
// Working:
// - The IngestionWorker continuously fetches messages from the ingestion queue.
// - It checks if the message is a valid event.
// - If it is a valid event, it updates the server timestamp and stores the event in the event store.
// - If there are errors during processing, they are logged.
// It uses a goroutine and a buffered channel to read events from the queue in the background.
func (iw *IngestionWorker) Start() {
for {
msg, err := iw.IngestionQueue.NextMessage(&tarianpb.Event{})
if err != nil {
iw.logger.WithError(err).Error("error while processing event")
continue
}
eventChan := make(chan *tarianpb.Event, 1000) // buffered channel with capacity 1000

go func() {
defer close(eventChan) // close the channel on exit

for {
event, err := iw.IngestionQueue.NextMessage(&tarianpb.Event{})
if err != nil {
iw.logger.WithError(err).Error("error while processing event")
continue
}

event, ok := msg.(*tarianpb.Event)
if !ok {
iw.logger.WithError(err).Error("error while processing event")
continue
eventChan <- event.(*tarianpb.Event)
}
}()

event.ServerTimestamp = timestamppb.Now()
uid := uuid.NewV4()
event.Uid = uid.String()
err = iw.eventStore.Add(event)
go func() {
defer iw.logger.Info("stopped consuming events from ingestion queue")

if err != nil {
iw.logger.WithError(err).Error("error while processing event")
for event := range eventChan {
iw.processEvent(event)
}
}()
}

func (iw *IngestionWorker) processEvent(event *tarianpb.Event) {
event.ServerTimestamp = timestamppb.Now()
uid := uuid.NewV4()
event.Uid = uid.String()
err := iw.eventStore.Add(event)

if err != nil {
iw.logger.WithError(err).Error("error while processing event")
}
}

0 comments on commit 1be78b7

Please sign in to comment.