Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cloudevents-server): do not stop the consumer when error happened in loop #204

Merged
merged 3 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudevents-server/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.23.3 AS builder
COPY . /app
RUN cd /app && go build -o server
RUN cd /app && go build -o server ./cmd/server

FROM debian:12
LABEL org.opencontainers.image.licenses="MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -67,33 +68,44 @@ func main() {
}

func startServices(srv *http.Server, cg handler.EventConsumerGroup) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create channel used by both the signal handler and server goroutines
// to notify the main goroutine when to stop the server.
errc := make(chan error)

// Setup interrupt handler. This optional step configures the process so
// that SIGINT and SIGTERM signals cause the services to stop gracefully.
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errc <- fmt.Errorf("%s", <-c)
}()

// Start http server
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal().Err(err).Msg("server error")
}
}()
log.Info().Str("address", srv.Addr).Msg("server started.")

// Start consumer workers
cgWg := new(sync.WaitGroup)
cgCtx, cgCancel := context.WithCancel(context.Background())
go cg.Start(cgCtx)
cg.Start(cgCtx, cgWg)

// Wait for interrupt signal to gracefully shutdown
sig := <-sigChan
log.Warn().Str("signal", sig.String()).Msg("signal received")
// Wait for signal.
log.Warn().Msgf("exiting (%v)", <-errc)

// shutdown consumer group.
// Send cancellation signal to the goroutines.
cgCancel()
cgWg.Wait()
log.Warn().Msg("Workers are gracefully stopped")

// shutdown http server.
shutdownSrvCtx, shutdownSrvCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownSrvCancel()
if err := srv.Shutdown(shutdownSrvCtx); err != nil {
if err := srv.Shutdown(context.Background()); err != nil {
log.Error().Err(err).Msg("server shutdown error")
}

log.Info().Msg("server gracefully stopped")
log.Warn().Msg("server gracefully stopped")
}

func setRouters(r gin.IRoutes, cfg *config.Config) {
Expand Down
2 changes: 1 addition & 1 deletion cloudevents-server/ent/ent.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions cloudevents-server/ent/problemcaserun_query.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cloudevents-server/ent/runtime/runtime.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions cloudevents-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/PingCAP-QE/ee-apps/cloudevents-server
go 1.23

require (
entgo.io/ent v0.13.1
entgo.io/ent v0.14.2-0.20241102145616-365b49817603
github.com/Masterminds/sprig/v3 v3.3.0
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/gin-gonic/gin v1.10.0
Expand All @@ -20,7 +20,7 @@ require (
)

require (
ariga.io/atlas v0.19.1-0.20240203083654-5948b60a8e43 // indirect
ariga.io/atlas v0.27.1-0.20240912191503-92195304dbe1 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
dario.cat/mergo v1.0.1 // indirect
Expand All @@ -30,6 +30,7 @@ require (
github.com/Masterminds/semver/v3 v3.3.0 // indirect
github.com/agext/levenshtein v1.2.1 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/bytedance/sonic v1.12.3 // indirect
Expand Down Expand Up @@ -105,7 +106,7 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/zclconf/go-cty v1.8.0 // indirect
github.com/zclconf/go-cty v1.14.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
22 changes: 14 additions & 8 deletions cloudevents-server/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ariga.io/atlas v0.19.1-0.20240203083654-5948b60a8e43 h1:GwdJbXydHCYPedeeLt4x/lrlIISQ4JTH1mRWuE5ZZ14=
ariga.io/atlas v0.19.1-0.20240203083654-5948b60a8e43/go.mod h1:uj3pm+hUTVN/X5yfdBexHlZv+1Xu5u5ZbZx7+CDavNU=
ariga.io/atlas v0.27.1-0.20240912191503-92195304dbe1 h1:tqUmxmE7r2qqWmgczyNZNOyhzXLqmdUpRhVSuvCUqTU=
ariga.io/atlas v0.27.1-0.20240912191503-92195304dbe1/go.mod h1:cpxrs2J9HTrjMrfNKFmUpONY9L0vzAHZ1pY++nTtVw0=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -44,8 +44,8 @@ contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
entgo.io/ent v0.13.1 h1:uD8QwN1h6SNphdCCzmkMN3feSUzNnVvV/WIkHKMbzOE=
entgo.io/ent v0.13.1/go.mod h1:qCEmo+biw3ccBn9OyL4ZK5dfpwg++l1Gxwac5B1206A=
entgo.io/ent v0.14.2-0.20241102145616-365b49817603 h1:mdW4xCZP/KQOGI8xMn1+ARzlucpmG44ux9ZqwYbYpIg=
entgo.io/ent v0.14.2-0.20241102145616-365b49817603/go.mod h1:nPwpHTcyAjBMt+hT6q7Y/SdF41e6IJudQPbTj1P5nYY=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
Expand Down Expand Up @@ -78,6 +78,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw=
github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo=
github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -354,6 +356,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -382,6 +386,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
Expand Down Expand Up @@ -453,6 +459,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
Expand All @@ -479,8 +487,6 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/vmihailenco/msgpack/v4 v4.3.12/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4=
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand All @@ -493,8 +499,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zclconf/go-cty v1.8.0 h1:s4AvqaeQzJIu3ndv4gVIhplVD0krU+bgrcLSVUnaWuA=
github.com/zclconf/go-cty v1.8.0/go.mod h1:vVKLxnk3puL4qRAv72AO+W99LUD4da90g3uUAzyuvAk=
github.com/zclconf/go-cty v1.14.4 h1:uXXczd9QDGsgu0i/QFR/hzI5NYCHLf6NQw/atrbnhq8=
github.com/zclconf/go-cty v1.14.4/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
67 changes: 36 additions & 31 deletions cloudevents-server/pkg/events/handler/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,12 @@ func (ecs EventConsumerGroup) Close() {
// Start runs the EventConsumerGroup in parallel, starting each EventConsumer
// in a separate goroutine. It waits for all EventConsumers to finish before
// returning.
func (ecs EventConsumerGroup) Start(ctx context.Context) {
wg := new(sync.WaitGroup)
func (ecs EventConsumerGroup) Start(ctx context.Context, wg *sync.WaitGroup) {
for _, ec := range ecs {
if ec != nil {
wg.Add(1)
go func(c *EventConsumer) {
c.Start(ctx)
wg.Done()
}(ec)
ec.Start(ctx, wg)
}
}
wg.Wait()
}

type EventConsumer struct {
Expand All @@ -135,32 +129,43 @@ type EventConsumer struct {
}

// consumer workers
func (ec *EventConsumer) Start(ctx context.Context) error {
func (ec *EventConsumer) Start(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
defer ec.Close()

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
m, err := ec.reader.ReadMessage(ctx)
if err != nil {
return err
}

var event cloudevents.Event
err = event.UnmarshalJSON(m.Value)
if err != nil {
return err
}

result := ec.handler.Handle(event)
if !cloudevents.IsACK(result) {
log.Error().Err(err).Msg("error handling event")
ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value})
go func() {
defer wg.Done()
defer ec.Close()

log.Info().Msg("Kafka consumer started")
for {
select {
case <-ctx.Done():
return
default:
msg, err := ec.reader.ReadMessage(ctx)
if err != nil {
log.Err(err).Msg("Error reading message")
continue
}

var event cloudevents.Event
if err := event.UnmarshalJSON(msg.Value); err != nil {
log.Err(err).Msg("Error unmarshaling CloudEvent")
continue
}

log.Debug().Str("ce-id", event.ID()).Str("ce-type", event.Type()).Msg("received cloud event")

result := ec.handler.Handle(event)
if !cloudevents.IsACK(result) {
log.Error().Err(err).Msg("error handling event")
ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: msg.Key, Value: msg.Value})
}
}
}
}
}()

log.Info().Msg("Kafka consumer started")
}

func (ec *EventConsumer) Close() {
Expand Down