Skip to content

Commit

Permalink
add a bunch of logging and a little delay so that we have time to set…
Browse files Browse the repository at this point in the history
… up the listener
  • Loading branch information
reillyse committed Jan 22, 2025
1 parent 0f1cbd9 commit e648220
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions api/v1/server/handlers/monitoring/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *MonitoringService) MonitoringPostRunProbe(ctx echo.Context, request gen

cleanup, err := m.run(cancellableContext, cf, m.workflowName, events, stepChan, errorChan)
if err != nil {
m.l.Error().Msgf("error running probe: %s", err)
m.l.Error().Msgf("probe: error running probe: %s", err)
return nil, err
}

Expand All @@ -85,26 +85,29 @@ func (m *MonitoringService) MonitoringPostRunProbe(ctx echo.Context, request gen
case <-cancellableContext.Done():

if cancellableContext.Err() == context.DeadlineExceeded {
m.l.Error().Msg("timed out waiting for probe to complete")
return nil, fmt.Errorf("timed out waiting for probe to complete")
m.l.Error().Msg("probe: timed out waiting for probe to complete")
return nil, fmt.Errorf("probe: timed out waiting for probe to complete")
}

case err := <-errorChan:
m.l.Error().Msgf("error during probe: %s", err)
m.l.Error().Msgf("probe: error during probe: %s", err)
return nil, err

case e := <-events:
m.l.Debug().Msgf("probe: received event: %s", e)
if !strings.HasPrefix(e, messages[messageIndex]) {
return nil, fmt.Errorf("expected message %s, to start with %s", messages[messageIndex], e)
m.l.Error().Msgf("probe: expected message %s, to start with %s", e, messages[messageIndex])
return nil, fmt.Errorf("probe: expected message %s, to start with %s", messages[messageIndex], e)
}
m.l.Debug().Msgf("probe: received expected message: %s", e)
messageIndex++

if messageIndex == len(messages) {
for i := range stepMessages {
stepMessage := <-stepChan
if stepMessage != stepMessages[i] {

return nil, fmt.Errorf("probe did not complete successfully - step messages failed")
m.l.Error().Msgf("probe: expected step message %s, got %s", stepMessages[i], stepMessage)
return nil, fmt.Errorf("probe: probe did not complete successfully - step messages failed")
}
}
return nil, nil
Expand Down Expand Up @@ -214,15 +217,18 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi

go func() {
err = c.Subscribe().StreamByAdditionalMetadata(ctx, streamKey, streamValue, func(event client.StreamEvent) error {
m.l.Info().Msgf("probe: received stream event: %s", string(event.Message))
events <- string(event.Message)

return nil
})
if err != nil {
m.l.Error().Msgf("error subscribing to stream: %s", err)
errors <- fmt.Errorf("error subscribing to stream: %w", err)
}
}()

time.Sleep(100 * time.Millisecond)
go func() {
testEvent := probeEvent{
UniqueStreamId: streamValue,
Expand All @@ -239,13 +245,15 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi
)

if err != nil {
m.l.Error().Msgf("error pushing event: %s", err)
errors <- fmt.Errorf("error pushing event: %w", err)
}
}()

cleanupWorker, err := w.Start()

if err != nil {
m.l.Error().Msgf("error starting worker: %s", err)
return nil, fmt.Errorf("error starting worker: %w", err)
}

Expand Down

0 comments on commit e648220

Please sign in to comment.