Skip to content

Commit

Permalink
Merge pull request #152 from xmidt-org/limit-panic-damage
Browse files Browse the repository at this point in the history
Add a fix where a panic in the sender go routine doesn't kill the ent…
  • Loading branch information
schmidtw authored Aug 7, 2019
2 parents 7e3adb6 + 0daf464 commit 58479b0
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/caduceus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
IncomingEventTypeCounter = "incoming_event_type_count"
DropsDueToInvalidPayload = "drops_due_to_invalid_payload"
OutgoingQueueDepth = "outgoing_queue_depths"
DropsDueToPanic = "drops_due_to_panic"
)

func Metrics() []xmetrics.Metric {
Expand Down Expand Up @@ -82,6 +83,12 @@ func Metrics() []xmetrics.Metric {
Type: "counter",
LabelNames: []string{"event"},
},
{
Name: DropsDueToPanic,
Help: "The outgoing message delivery pipeline panicked.",
Type: "counter",
LabelNames: []string{"url"},
},
}
}

Expand All @@ -94,6 +101,7 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende
c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "network_err")
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.contentTypeCounter = m.NewCounter(IncomingContentTypeCounter)
}
11 changes: 11 additions & 0 deletions src/caduceus/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"net/http"
"time"
"unicode/utf8"

"github.com/Comcast/webpa-common/health"
"github.com/Comcast/webpa-common/webhook"
Expand Down Expand Up @@ -99,6 +100,11 @@ func (m *mockCounter) Add(delta float64) {
}

func (m *mockCounter) With(labelValues ...string) metrics.Counter {
for _, v := range labelValues {
if !utf8.ValidString(v) {
panic("not UTF-8")
}
}
args := m.Called(labelValues)
return args.Get(0).(metrics.Counter)
}
Expand All @@ -117,6 +123,11 @@ func (m *mockGauge) Set(value float64) {
}

func (m *mockGauge) With(labelValues ...string) metrics.Gauge {
for _, v := range labelValues {
if !utf8.ValidString(v) {
panic("not UTF-8")
}
}
args := m.Called(labelValues)
return args.Get(0).(metrics.Gauge)
}
Expand Down
10 changes: 9 additions & 1 deletion src/caduceus/outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type CaduceusOutboundSender struct {
droppedExpiredCounter metrics.Counter
droppedNetworkErrCounter metrics.Counter
droppedInvalidConfig metrics.Counter
droppedPanic metrics.Counter
cutOffCounter metrics.Counter
contentTypeCounter metrics.Counter
queueDepthGauge metrics.Gauge
Expand Down Expand Up @@ -481,7 +482,14 @@ func (obs *CaduceusOutboundSender) dispatcher() {
// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) {
defer obs.workers.Release()
defer func() {
if r := recover(); nil != r {
obs.droppedPanic.Add(1.0)
logging.Error(obs.logger).Log(logging.MessageKey(), "goroutine send() panicked",
"id", obs.id, "panic", r)
}
obs.workers.Release()
}()

payload := msg.Payload
body := payload
Expand Down
13 changes: 13 additions & 0 deletions src/caduceus/outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
fakeQdepth.On("With", []string{"url", w.Config.URL}).Return(fakeQdepth)
fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return()

// DropsDueToPanic case
fakePanicDrop := new(mockCounter)
fakePanicDrop.On("With", []string{"url", w.Config.URL}).Return(fakePanicDrop)
fakePanicDrop.On("Add", 1.0).Return()

// Build a registry and register all fake metrics, these are synymous with the metrics in
// metrics.go
//
Expand All @@ -148,6 +153,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeContentType)
fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth)

return &OutboundSenderFactory{
Expand Down Expand Up @@ -398,6 +404,13 @@ func TestSimpleWrpWithWildcardMatchers(t *testing.T) {
r4.Destination = "event:test"
obs.Queue(r4)

/* This will panic. */
r5 := simpleRequest()
r5.TransactionUUID = "1234"
r5.Source = "mac:112233445560"
r5.Destination = "event:test\xedoops"
obs.Queue(r5)

obs.Shutdown(true)

assert.Equal(int32(4), trans.i)
Expand Down
1 change: 1 addition & 0 deletions src/caduceus/senderWrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func getFakeFactory() *SenderWrapperFactory {
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakeIgnore)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge)

return &SenderWrapperFactory{
Expand Down

0 comments on commit 58479b0

Please sign in to comment.