Skip to content

Commit

Permalink
refactor: factor out validateMsg (#1117)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Oct 24, 2023
1 parent e61e966 commit af56e60
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,14 +495,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
return
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
}
var schema Schema
var schemaVersion []byte
if msg.Schema != nil {
Expand Down Expand Up @@ -1121,17 +1113,35 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
p.internalSendAsync(ctx, msg, callback, false)
}

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
p.log.Error("Message is nil")
runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil"))
return
return newError(InvalidMessage, "Message is nil")
}

if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both"))
return newError(InvalidMessage, "Can not set Value and Payload both")
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return fmt.Errorf("msg schema can not match with producer schema")
}
}

return nil
}

func (p *partitionProducer) internalSendAsync(
ctx context.Context,
msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error),
flushImmediately bool,
) {
if err := p.validateMsg(msg); err != nil {
p.log.Error(err)
runCallback(callback, nil, msg, err)
return
}

Expand Down

0 comments on commit af56e60

Please sign in to comment.