diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index eafa4b47d..b4903516e 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -385,16 +385,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { partitionTopic := partitions[partitionIdx] - go func(idx int, pt string) { + go func() { defer wg.Done() - opts := newPartitionConsumerOpts(pt, c.consumerName, idx, c.options) + opts := newPartitionConsumerOpts(partitionTopic, c.consumerName, partitionIdx, c.options) cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ err: err, - partition: idx, + partition: partitionIdx, consumer: cons, } - }(partitionIdx, partitionTopic) + }() } go func() { @@ -776,7 +776,6 @@ func (c *consumer) hasNext() bool { hasNext := make(chan bool) for _, pc := range c.consumers { - pc := pc go func() { defer wg.Done() if pc.hasNext() {