Skip to content

Commit

Permalink
fix: Model gateway silently ignores errors (#6014)
Browse files Browse the repository at this point in the history
* fix: Model gateway silently ignores errors

* fix: adding log line for kafka topic and channel close
  • Loading branch information
abhimanyu003 authored Nov 20, 2024
1 parent 8c5b554 commit d8e69b0
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions scheduler/pkg/kafka/gateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,13 @@ func (iw *InferWorker) produce(
return err
}
go func() {
<-deliveryChan
e := <-deliveryChan
span.End()
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
iw.logger.WithError(m.TopicPartition.Error).Errorf("Failed to produce event for model %s", topic)
}
close(deliveryChan)
}()

return nil
Expand All @@ -304,7 +309,7 @@ func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeCon
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, restUrl.String(), bytes.NewBuffer(data))
if err != nil {
return err
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}

req.Header.Set("Content-Type", "application/json")
Expand All @@ -315,17 +320,17 @@ func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeCon

response, err := iw.httpClient.Do(req)
if err != nil {
return err
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}

b, err := io.ReadAll(response.Body)
if err != nil {
return err
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}

err = response.Body.Close()
if err != nil {
return err
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}

iw.logger.Infof("v2 server response: %s", b)
Expand All @@ -335,14 +340,19 @@ func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeCon
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), b, true, nil)
}

return iw.produce(
err = iw.produce(
ctx,
job,
iw.topicNamer.GetModelTopicOutputs(job.modelName),
b,
false,
extractHeadersHttp(response.Header),
)
if err != nil {
logger.WithError(err).Errorf("Failed infer request iw.produce")
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}
return nil
}

// Add all external headers to request metadata
Expand Down Expand Up @@ -377,14 +387,21 @@ func (iw *InferWorker) grpcRequest(ctx context.Context, job *InferWork, req *v2.
}
b, err := proto.Marshal(resp)
if err != nil {
return err
logger.WithError(err).Errorf("Failed to proto.Marshal")
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}
return iw.produce(

err = iw.produce(
ctx,
job,
iw.topicNamer.GetModelTopicOutputs(job.modelName),
b,
false,
extractHeadersGrpc(header, trailer),
)
if err != nil {
logger.WithError(err).Errorf("Failed infer request iw.produce")
return iw.produce(ctx, job, iw.topicNamer.GetModelErrorTopic(), []byte(err.Error()), true, nil)
}
return nil
}

0 comments on commit d8e69b0

Please sign in to comment.