Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Producer] The callback was not invoked during reconnecting. #1332

Open
gunli opened this issue Feb 17, 2025 · 5 comments · May be fixed by #1333
Open

[Bug][Producer] The callback was not invoked during reconnecting. #1332

gunli opened this issue Feb 17, 2025 · 5 comments · May be fixed by #1333
Assignees

Comments

@gunli
Copy link
Contributor

gunli commented Feb 17, 2025

Expected behavior

Callback should be called with a timeout error.

Actual behavior

Callback was not called

Steps to reproduce

  1. Connect to the server;
  2. Call SendAsync() to send a message, it will send successfully;
  3. Stop the server;
  4. Call SendAsync() to send some messages, the messages' callback won't be called anymore

System configuration

Pulsar version: x.y

@gunli
Copy link
Contributor Author

gunli commented Feb 17, 2025

I don't know if there is a same bug with the consumer.

It seems that when we call SendAsync() at this time, the messages are sent into p.dataChan, but runEventsLoop() is busy in doing reconnecting: p.reconnectToBroker(connectionClosed), and have no chance to chandle the case case data, ok := <-p.dataChan, and failTimeoutMessages() just check the messages in p.pendingQueue, so no one can tell the messages are timeout.

func (p *partitionProducer) runEventsLoop() {
	for {
		select {
		case data, ok := <-p.dataChan:
			// when doClose() is call, p.dataChan will be closed, data will be nil
			if !ok {
				return
			}
			p.internalSend(data)
		case cmd, ok := <-p.cmdChan:
			// when doClose() is call, p.dataChan will be closed, cmd will be nil
			if !ok {
				return
			}
			switch v := cmd.(type) {
			case *flushRequest:
				p.internalFlush(v)
			case *closeProducer:
				p.internalClose(v)
				return
			}
		case connectionClosed := <-p.connectClosedCh:
			p.log.Info("runEventsLoop will reconnect in producer")
			p.reconnectToBroker(connectionClosed)
		case <-p.batchFlushTicker.C:
			p.internalFlushCurrentBatch()
		}
	}
}

func (p *partitionProducer) failTimeoutMessages() {
	diff := func(sentAt time.Time) time.Duration {
		return p.options.SendTimeout - time.Since(sentAt)
	}

	t := time.NewTimer(p.options.SendTimeout)
	defer t.Stop()

	for range t.C {
		state := p.getProducerState()
		if state == producerClosing || state == producerClosed {
			return
		}

		item := p.pendingQueue.Peek()
		if item == nil {
			// pending queue is empty
			t.Reset(p.options.SendTimeout)
			continue
		}
		oldestItem := item.(*pendingItem)
		if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 {
			// none of these pending messages have timed out, wait and retry
			t.Reset(nextWaiting)
			continue
		}

		// since pending queue is not thread safe because of there is no global iteration lock
		// to control poll from pending queue, current goroutine and connection receipt handler
		// iterate pending queue at the same time, this maybe a performance trade-off
		// see https://github.com/apache/pulsar-client-go/pull/301
		curViewItems := p.pendingQueue.ReadableSlice()
		viewSize := len(curViewItems)
		if viewSize <= 0 {
			// double check
			t.Reset(p.options.SendTimeout)
			continue
		}
		p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
		lastViewItem := curViewItems[viewSize-1].(*pendingItem)

		// iterate at most viewSize items
		for i := 0; i < viewSize; i++ {
			tickerNeedWaiting := time.Duration(0)
			item := p.pendingQueue.CompareAndPoll(
				func(m interface{}) bool {
					if m == nil {
						return false
					}

					pi := m.(*pendingItem)
					pi.Lock()
					defer pi.Unlock()
					if nextWaiting := diff(pi.createdAt); nextWaiting > 0 {
						// current and subsequent items not timeout yet, stop iterating
						tickerNeedWaiting = nextWaiting
						return false
					}
					return true
				})

			if item == nil {
				t.Reset(p.options.SendTimeout)
				break
			}

			if tickerNeedWaiting > 0 {
				t.Reset(tickerNeedWaiting)
				break
			}

			pi := item.(*pendingItem)
			pi.Lock()

			for _, i := range pi.sendRequests {
				sr := i.(*sendRequest)
				sr.done(nil, ErrSendTimeout)
			}

			// flag the sending has completed with error, flush make no effect
			pi.done(ErrSendTimeout)
			pi.Unlock()

			// finally reached the last view item, current iteration ends
			if pi == lastViewItem {
				t.Reset(p.options.SendTimeout)
				break
			}
		}
	}
}

@gunli
Copy link
Contributor Author

gunli commented Feb 17, 2025

@gunli
Copy link
Contributor Author

gunli commented Feb 17, 2025

We have discussed whether we should block the send and flush requests when we are reconnecting in #1249, but not sufficient enough.
Now, IMO, we should not block send/flush when we are reconnecting, 'cause it has no side effect, send/flush will succeed or timeout, if we block them, as this bug described, timeout will not triggered.

@BewareMyPower
Copy link
Contributor

What's your client and producer configs? There was a similar issue happened when maxReconnectAttempts is 1: #1312 (comment)

@gunli
Copy link
Contributor Author

gunli commented Feb 18, 2025

What's your client and producer configs? There was a similar issue happened when maxReconnectAttempts is 1: #1312 (comment)

My maxReconnectAttempts is the default value: -1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants