Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: report linked tsfile size & Subscription: decrease reference count for other enriched events & add logging to observe possible stuck situations #14668

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ private void ttlCheck() throws InterruptedException {
} else {
logBuilder.append(
String.format(
"<%s , %d times> ", entry.getKey(), entry.getValue().getReferenceCount()));
"<%s , %d times, %d bytes> ",
entry.getKey(),
entry.getValue().getReferenceCount(),
entry.getValue().getFileSize()));
}
} catch (final IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ private void tryPrefetch() {
"Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.",
this,
event);
((EnrichedEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false);
if (onEvent()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class SubscriptionEvent {
// record file name for file payload
private String fileName;

private static final long NACK_COUNT_REPORT_THRESHOLD = 3;
private final AtomicLong nackCount = new AtomicLong();

/**
* Constructs a {@link SubscriptionEvent} with the response type of {@link
* SubscriptionEventSingleResponse}.
Expand Down Expand Up @@ -143,11 +146,11 @@ public boolean isCommittable() {
}

public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
// ack response
response.ack(onCommittedHook);

// ack pipe events
// NOTE: we should ack pipe events before ack response since multiple events may reuse the same
// batch (as pipe events)
// TODO: consider more elegant design for this method
pipeEvents.ack();
response.ack(onCommittedHook);
}

/**
Expand All @@ -156,11 +159,8 @@ public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
* SubscriptionPrefetchingQueue} or {@link SubscriptionPrefetchingQueue#cleanUp}.
*/
public void cleanUp() {
// reset serialized responses
response.cleanUp();

// clean up pipe events
pipeEvents.cleanUp();
response.cleanUp();

// TODO: clean more fields
}
Expand Down Expand Up @@ -216,6 +216,11 @@ public void nack() {

// reset lastPolledTimestamp makes this event pollable
lastPolledTimestamp.set(INVALID_TIMESTAMP);

// record nack count
if (nackCount.getAndIncrement() > NACK_COUNT_REPORT_THRESHOLD) {
LOGGER.warn("{} has been nacked {} times", this, nackCount);
}
}

public void recordLastPolledConsumerId(final String consumerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch
implements Iterator<List<Tablet>> {
Expand All @@ -62,6 +63,10 @@ public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch

private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();

private static final long ITERATED_COUNT_REPORT_FREQ =
30000; // based on the full parse of a 128MB tsfile estimate
private final AtomicLong iteratedCount = new AtomicLong();

public SubscriptionPipeTabletEventBatch(
final int regionId,
final SubscriptionPrefetchingTabletQueue prefetchingQueue,
Expand Down Expand Up @@ -230,6 +235,23 @@ public boolean hasNext() {

@Override
public List<Tablet> next() {
final List<Tablet> tablets = nextInternal();
if (Objects.isNull(tablets)) {
return null;
}
if (iteratedCount.incrementAndGet() % ITERATED_COUNT_REPORT_FREQ == 0) {
LOGGER.info(
"{} has been iterated {} times, current TsFileInsertionEvent {}",
this,
iteratedCount,
Objects.isNull(currentTsFileInsertionEvent)
? "<unknown>"
: ((EnrichedEvent) currentTsFileInsertionEvent).coreReportMessage());
}
return tablets;
}

private List<Tablet> nextInternal() {
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
if (currentTabletInsertionEventsIterator.hasNext()) {
final TabletInsertionEvent tabletInsertionEvent =
Expand Down
Loading