Skip to content

Commit

Permalink
Log more message stats
Browse files Browse the repository at this point in the history
  • Loading branch information
radazen committed Apr 23, 2024
1 parent 4d91f25 commit 24fd4cb
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions kafka-streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,48 @@ func newDeserializerFromRegistry() (*avro.GenericDeserializer, error) {
return deserializer, nil
}

type messageStats struct {
interval time.Duration
nextReportTime time.Time
inserts int64
updates int64
deletes int64
}

func newMessageStats(reportingInterval time.Duration) *messageStats {
return &messageStats{
interval: reportingInterval,
nextReportTime: time.Now().Add(reportingInterval),
}
}

func (m *messageStats) Update(ctx context.Context, msg *kafka.Message) {
actionType, err := getActionType(msg)
if err != nil {
logger.For(ctx).Errorf("failed to get action type for message: %v", msg)
return
}

switch actionType {
case "insert":
m.inserts++
case "update":
m.updates++
case "delete":
m.deletes++
default:
logger.For(ctx).Errorf("invalid action type %s for message %v", actionType, msg)
}

if time.Now().After(m.nextReportTime) {
logger.For(ctx).Infof("processed %d messages in the last %s (%d inserts, %d updates, and %d deletes)", m.inserts+m.updates+m.deletes, m.interval, m.inserts, m.updates, m.deletes)
m.inserts = 0
m.updates = 0
m.deletes = 0
m.nextReportTime = time.Now().Add(m.interval)
}
}

type batcher[T any] struct {
maxSize int
timeoutDuration time.Duration
Expand Down Expand Up @@ -212,8 +254,7 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, deserializer *avro.Gene
return fmt.Errorf("failed to subscribe to topic %s: %w", config.Topic, err)
}

var messagesPerHour int64 = 0
var nextHourReportTime = time.Now().Add(time.Hour)
stats := newMessageStats(time.Hour)

for {
msg, err := c.ReadMessage(100)
Expand Down Expand Up @@ -245,12 +286,7 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, deserializer *avro.Gene
}
}

messagesPerHour++
if time.Now().After(nextHourReportTime) {
logger.For(ctx).Infof("Processed %d messages in the last hour", messagesPerHour)
messagesPerHour = 0
nextHourReportTime = time.Now().Add(time.Hour)
}
stats.Update(ctx, msg)
}
}

Expand Down

0 comments on commit 24fd4cb

Please sign in to comment.