From 5c023a5033b92cf94d0ba57f349782592b2faed6 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Tue, 15 Mar 2022 14:25:49 +0800 Subject: [PATCH 1/8] BP-49 support read ahead async resolve conflicts --- .../storage/ldb/DbLedgerStorageStats.java | 45 +- .../bookie/storage/ldb/ReadAheadManager.java | 652 ++++++++++++++++++ .../ldb/SingleDirectoryDbLedgerStorage.java | 93 ++- 3 files changed, 754 insertions(+), 36 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java index d947df56e13..e16ac108542 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java @@ -70,6 +70,11 @@ class DbLedgerStorageStats { private static final String READ_CACHE_SIZE = "read-cache-size"; private static final String READ_CACHE_COUNT = "read-cache-count"; + private static final String READ_AHEAD_TOTAL_TIME = "read-ahead-total-time"; + private static final String READ_AHEAD_ASYNC_QUEUE_TIME = "read-ahead-async-queue-time"; + private static final String READ_AHEAD_ASYNC_TOTAL_TIME = "read-ahead-async-total-time"; + private static final String READ_AHEAD_ASYNC_BLOCK_TIME = "read-ahead-async-block-time"; + @StatsDoc( name = ADD_ENTRY, help = "operation stats of adding entries to db ledger storage", @@ -122,12 +127,12 @@ class DbLedgerStorageStats { name = READAHEAD_BATCH_COUNT, help = "the distribution of num of entries to read in one readahead batch" ) - private final OpStatsLogger readAheadBatchCountStats; + private final Counter readAheadBatchCountCounter; @StatsDoc( name = READAHEAD_BATCH_SIZE, help = "the distribution of num of bytes to read in one readahead batch" ) - private final OpStatsLogger readAheadBatchSizeStats; + private final Counter readAheadBatchSizeCounter; @StatsDoc( name = READAHEAD_TIME, help = "Time spent on readahead operations" @@ -195,6 +200,33 @@ class DbLedgerStorageStats { ) private final Gauge readCacheCountGauge; + /********** Read-ahead async **********/ + @StatsDoc( + name = READ_AHEAD_TOTAL_TIME, + help = "time spent in reading ahead", + parent = READ_ENTRY + ) + private final OpStatsLogger readAheadTotalTime; + @StatsDoc( + name = READ_AHEAD_ASYNC_QUEUE_TIME, + help = "time spent in the queue of reading ahead async", + parent = READ_ENTRY + ) + private final OpStatsLogger readAheadAsyncQueueTime; + @StatsDoc( + name = READ_AHEAD_ASYNC_TOTAL_TIME, + help = "time spent of the entire task of reading ahead async", + parent = READ_ENTRY + ) + private final OpStatsLogger readAheadAsyncTotalTime; + @StatsDoc( + name = READ_AHEAD_ASYNC_BLOCK_TIME, + help = "time spent in waiting the completion of reading ahead async", + parent = READ_ENTRY + ) + private final OpStatsLogger readAheadAsyncBlockTime; + /********** End read-ahead async **********/ + DbLedgerStorageStats(StatsLogger stats, Supplier writeCacheSizeSupplier, Supplier writeCacheCountSupplier, @@ -208,8 +240,8 @@ class DbLedgerStorageStats { readCacheMissCounter = stats.getCounter(READ_CACHE_MISSES); writeCacheHitCounter = stats.getCounter(WRITE_CACHE_HITS); writeCacheMissCounter = stats.getCounter(WRITE_CACHE_MISSES); - readAheadBatchCountStats = stats.getOpStatsLogger(READAHEAD_BATCH_COUNT); - readAheadBatchSizeStats = stats.getOpStatsLogger(READAHEAD_BATCH_SIZE); + readAheadBatchCountCounter = stats.getCounter(READAHEAD_BATCH_COUNT); + readAheadBatchSizeCounter = stats.getCounter(READAHEAD_BATCH_SIZE); readAheadTime = stats.getThreadScopedCounter(READAHEAD_TIME); flushStats = stats.getOpStatsLogger(FLUSH); flushEntryLogStats = stats.getOpStatsLogger(FLUSH_ENTRYLOG); @@ -270,6 +302,11 @@ public Long getSample() { } }; stats.registerGauge(READ_CACHE_COUNT, readCacheCountGauge); + + readAheadTotalTime = stats.getOpStatsLogger(READ_AHEAD_TOTAL_TIME); + readAheadAsyncQueueTime = stats.getOpStatsLogger(READ_AHEAD_ASYNC_QUEUE_TIME); + readAheadAsyncTotalTime = stats.getOpStatsLogger(READ_AHEAD_ASYNC_TOTAL_TIME); + readAheadAsyncBlockTime = stats.getOpStatsLogger(READ_AHEAD_ASYNC_BLOCK_TIME); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java new file mode 100644 index 00000000000..4ef54df1d99 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -0,0 +1,652 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}. + **/ +public class ReadAheadManager { + + private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class); + + private static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages"; + private static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes"; + private static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio"; + private static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs"; + private static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs"; + + // operation behavior indicator + private static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; + private static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; + + private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; + private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; + private static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75; + + private static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000; + private static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 30 * 1000; + + private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; + private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; + + private static final class LedgerEntryPosition { + + private long ledgerId; + private long entryId; + + public LedgerEntryPosition(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LedgerEntryPosition that = (LedgerEntryPosition) o; + return ledgerId == that.ledgerId && + entryId == that.entryId; + } + + @Override + public int hashCode() { + return Objects.hash(ledgerId, entryId); + } + } + + private static final class ReadAheadPos { + + private final long ledgerId; + private final long entryId; + private final long location; + private final long createTimeMs; + + private long readAheadTaskExpiredTimeMs; + + public ReadAheadPos(long ledgerId, long entryId, long location, long readAheadTaskExpiredTimeMs) { + + this.ledgerId = ledgerId; + this.entryId = entryId; + this.location = location; + + this.createTimeMs = System.currentTimeMillis(); + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + } + + public boolean isExpired() { + return (System.currentTimeMillis() - createTimeMs) > readAheadTaskExpiredTimeMs; + } + + public long getLedgerId() { + return ledgerId; + } + + public long getEntryId() { + return entryId; + } + + public long getLocation() { + return location; + } + } + + private final ExecutorService readAheadExecutor; + private final ScheduledExecutorService cleanupExecutor; + + private final ConcurrentHashMap pendingReadAheadPositions; + private final ConcurrentLinkedQueue pendingDeletePositions; + + private final ConcurrentHashMap> inProgressReadAheadTaskStatuses; + private final ConcurrentLinkedQueue pendingDeleteReadAheadTaskStatuses; + + private final EntryLogger entryLogger; + private final EntryLocationIndex entryLocationIndex; + private final ReadCache cache; + + private final DbLedgerStorageStats dbLedgerStorageStats; + + private final boolean submitReadAheadTaskImmediately; + private final int readAheadMessages; + private final int readAheadBytes; + private final double preTriggerReadAheadRatio; + + private final long readAheadTaskExpiredTimeMs; + private final long readAheadTimeoutMs; + + public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) { + this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE, + DEFAULT_READ_AHEAD_MESSAGES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO, + DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS); + } + + /** + * Entrance for test cases. + * + * @param entryLogger + * @param entryLocationIndex + * @param cache + * @param dbLedgerStorageStats + * @param conf + */ + public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) { + this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY), + conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE), + conf.getInt(READ_AHEAD_MAX_MESSAGES, DEFAULT_READ_AHEAD_MESSAGES), + conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES), + conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO), + conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS), + conf.getLong(READ_AHEAD_TIMEOUT_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS)); + } + + /** + * Entrance for normal use. + * + * @param entryLogger + * @param entryLocationIndex + * @param cache + * @param dbLedgerStorageStats + * @param submitReadAheadTaskImmediately + * @param readAheadTaskPoolSize + * @param readAheadMessages + * @param readAheadBytes + * @param preTriggerReadAheadRatio + * @param readAheadTaskExpiredTimeMs + * @param readAheadTimeoutMs + */ + public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, + boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize, + int readAheadMessages, int readAheadBytes, double preTriggerReadAheadRatio, + long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) { + // core components initialization + readAheadExecutor = Executors.newFixedThreadPool( + readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead")); + cleanupExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("read-ahead-cleanup")); + + pendingReadAheadPositions = new ConcurrentHashMap<>(); + pendingDeletePositions = new ConcurrentLinkedQueue<>(); + + inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>(); + pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>(); + + // external assistant components assignment + this.entryLogger = entryLogger; + this.entryLocationIndex = entryLocationIndex; + this.cache = cache; + + // metrics + this.dbLedgerStorageStats = dbLedgerStorageStats; + + // configurable arguments + this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately; + this.readAheadMessages = readAheadMessages; + this.readAheadBytes = readAheadBytes; + this.preTriggerReadAheadRatio = preTriggerReadAheadRatio; + + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + this.readAheadTimeoutMs = readAheadTimeoutMs; + + cleanupExecutor.scheduleAtFixedRate( + SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS); + } + + public void shutdown() { + this.readAheadExecutor.shutdown(); + this.cleanupExecutor.shutdown(); + } + + private static void recordStatsInNano(OpStatsLogger logger, long startTimeNanos) { + logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + + private static void recordStatsInNano(Counter counter, long startTimeNanos) { + counter.add(MathUtils.elapsedNanos(startTimeNanos)); + } + + public void addNextReadPosition(long expectedLedgerId, long expectedEntryId, + long actualStartLedgerId, long actualStartEntryId, long location) { + LedgerEntryPosition lep = new LedgerEntryPosition(expectedLedgerId, expectedEntryId); + pendingReadAheadPositions.put(lep, new ReadAheadPos( + actualStartLedgerId, actualStartEntryId, location, readAheadTaskExpiredTimeMs)); + pendingDeletePositions.add(lep); + } + + private ReadAheadTaskStatus getNearestTask(long ledgerId, long entryId) { + NavigableMap ledgerReadAheadTaskStatuses + = inProgressReadAheadTaskStatuses.get(ledgerId); + if (ledgerReadAheadTaskStatuses != null) { + Map.Entry floorEntry = ledgerReadAheadTaskStatuses.floorEntry(entryId); + if (floorEntry != null) { + return floorEntry.getValue(); + } + } + return null; + } + + /** + * Remove those read-ahead tasks which have already exceeded expired time. + * NOTE: this method is NOT thread-safe, thus it should be kept in a single thread. + */ + private void removeExpiredReadAheadTasks() { + // cleanup read-ahead pos + int reclaimedPositions = 0; + while (!pendingDeletePositions.isEmpty()) { + ReadAheadPos pos = pendingReadAheadPositions.computeIfPresent( + pendingDeletePositions.peek(), + (lep, rap) -> { + if (rap.isExpired()) { + return null; + } + return rap; + }); + if (pos == null) { + pendingDeletePositions.poll(); + reclaimedPositions ++; + } else { + break; + } + } + + // cleanup read-ahead task + int reclaimedTasks = 0; + while (!pendingDeleteReadAheadTaskStatuses.isEmpty() + && pendingDeleteReadAheadTaskStatuses.peek().isExpired()) { + ReadAheadTaskStatus readAheadTaskStatus = pendingDeleteReadAheadTaskStatuses.poll(); + reclaimedTasks ++; + inProgressReadAheadTaskStatuses.computeIfPresent( + readAheadTaskStatus.ledgerId, + (lid, ledgerReadAheadTaskStatuses) -> { + ledgerReadAheadTaskStatuses.remove(readAheadTaskStatus.startEntryId); + return ledgerReadAheadTaskStatuses.isEmpty() ? null : ledgerReadAheadTaskStatuses; + }); + } + + if (log.isDebugEnabled()) { + log.debug("Pending position map reclaimed {} positions, now is {}. " + + "Read-ahead task map reclaimed {} tasks, now is {}", + reclaimedPositions, pendingDeletePositions.size(), + reclaimedTasks, pendingDeleteReadAheadTaskStatuses.size()); + } + } + + /** + * This method could be invoked frequently. Please make it short and simple. + */ + public boolean hitInReadAheadPositions(long ledgerId, long entryId) { + AtomicBoolean isHit = new AtomicBoolean(false); + pendingReadAheadPositions.computeIfPresent( + new LedgerEntryPosition(ledgerId, entryId), + (lep, rap) -> { + isHit.set(true); + readAheadAsync(rap.getLedgerId(), rap.getEntryId(), rap.getLocation(), + readAheadMessages, readAheadBytes); + + if (log.isDebugEnabled()) { + log.debug("Submitted read-ahead task. Info: hit-pos=[L{} E{}] / actual-start-pos=[L{} E{}]", + ledgerId, entryId, rap.getLedgerId(), rap.getEntryId()); + } + return null; + }); + return isHit.get(); + } + + private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) throws IOException { + ByteBuf entry; + long entryLocation; + + long getLocationIndexStartNanos = MathUtils.nowInNano(); + try { + entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); + if (entryLocation == 0) { + throw new Bookie.NoEntryException(ledgerId, entryId); + } + } catch (Bookie.NoEntryException e) { + log.warn("[L{} E{}] Entry not found", ledgerId, entryId); + throw e; + } finally { + recordStatsInNano(dbLedgerStorageStats.getReadFromLocationIndexTime(), getLocationIndexStartNanos); + } + + long readEntryStartNanos = MathUtils.nowInNano(); + try { + entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); + } finally { + recordStatsInNano(dbLedgerStorageStats.getReadFromEntryLogTime(), readEntryStartNanos); + } + + cache.put(ledgerId, entryId, entry); + // init position + if (submitReadAheadTaskImmediately) { + // submit the read-ahead task immediately + readAheadAsync(ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes(), + readAheadMessages, readAheadBytes); + } else { + // actually execute read-ahead task after hitting this position next time + addNextReadPosition(ledgerId, entryId + 1, + ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes()); + } + + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Read {} bytes from local storage, and put L{} E{} to the pending map" + + " or submit task immediately according to submitReadAheadTaskImmediately={}.", + ledgerId, entryId, entry.readableBytes(), ledgerId, entryId + 1, submitReadAheadTaskImmediately); + } + return entry; + } + + /** + * Read an entry or wait until it is ready to be read. + * + * @param ledgerId + * @param entryId + * @return + * @throws IOException + */ + public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException { + // check if we need to read ahead + boolean isHit = hitInReadAheadPositions(ledgerId, entryId); + if (log.isDebugEnabled()) { + if (isHit) { + log.debug("[L{} E{}] Trigger read-ahead task", ledgerId, entryId); + } else { + log.debug("[L{} E{}] Not found in pending map", ledgerId, entryId); + } + } + + // hit in the cache + ByteBuf entry = cache.get(ledgerId, entryId); + if (entry != null) { + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Hit cache directly", ledgerId, entryId); + } + dbLedgerStorageStats.getReadCacheHitCounter().inc(); + return entry; + } + dbLedgerStorageStats.getReadCacheMissCounter().inc(); + + // search entry in the read-ahead index + ReadAheadTaskStatus readAheadTaskStatus = getNearestTask(ledgerId, entryId); + if (readAheadTaskStatus != null && readAheadTaskStatus.hasEntry(entryId)) { + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Block until data is ready", ledgerId, entryId); + } + readAheadTaskStatus.waitUntilReadCompleted(dbLedgerStorageStats); + + entry = cache.get(ledgerId, entryId); + if (entry != null) { + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Hit cache after read-ahead", ledgerId, entryId); + } + return entry; + } + } + + // read the current pos entry, and add the next pos into read-ahead set + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Read from storage layer", ledgerId, entryId); + } + return readAndAddNextReadAheadPosition(ledgerId, entryId); + } + + private void internalReadAhead(long ledgerId, long entryId, long location, + int maxMessages, int maxBytes, long submitStartNanos) { + // record queue time + recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncQueueTime(), submitStartNanos); + long readAheadStartNanos = MathUtils.nowInNano(); + + // obtain the read-ahead info + ReadAheadTaskStatus readAheadTaskStatus = getNearestTask(ledgerId, entryId); + if (readAheadTaskStatus == null) { + return; + } + + // record all the positions of these entries + List entriesPos = new ArrayList<>(); + + // read from fc + int messages = 0; + int bytes = 0; + + // flag to determine whether keeps reading ahead + boolean earlyExit = false; + + // flag to indicate whether reads from cache since last time + boolean readFromCacheBefore = false; + + try { + while (messages <= maxMessages && bytes <= maxBytes) { + + ByteBuf entry = cache.get(ledgerId, entryId); + if (entry != null) { + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Entry already exists.", ledgerId, entryId); + } + readFromCacheBefore = true; + entryId ++; + location += (4 + entry.readableBytes()); + entry.release(); + continue; + } + + if (readFromCacheBefore) { + try { + entry = entryLogger.readEntry(ledgerId, entryId, location); + readFromCacheBefore = false; + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Failed to locate entry with location:{}", ledgerId, entryId, location); + } + break; + } + } else { + entry = entryLogger.internalReadEntry(ledgerId, entryId, location, false); + } + + try { + long currentEntryLedgerId = entry.getLong(0); + long currentEntryId = entry.getLong(8); + + if (currentEntryLedgerId != ledgerId) { + earlyExit = true; + break; + } + + // add to cache + cache.put(ledgerId, currentEntryId, entry); + entriesPos.add(currentEntryId); + + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Entry loaded with {} bytes", + ledgerId, entryId, entry.readableBytes()); + } + + // update stats + messages ++; + bytes += entry.readableBytes(); + + entryId ++; + location += (4 + entry.readableBytes()); + } finally { + entry.release(); + } + } + } catch (Exception e) { + log.error("Exception during reading ahead for L{} E{}", ledgerId, entryId, e); + return; + } finally { + // update the actual range + if (messages >= 1) { + readAheadTaskStatus.updateEndEntry(entriesPos.get(messages - 1)); + } + + // notify all waiting threads + readAheadTaskStatus.readCompleted(); + pendingDeleteReadAheadTaskStatuses.add(readAheadTaskStatus); + } + + // set next read-ahead pos + if (preTriggerReadAheadRatio > 0 && !earlyExit && messages >= 1) { + int index = Math.min(messages - 1, (int) (preTriggerReadAheadRatio * messages)); + long nextHitEntryId = entriesPos.get(index); + addNextReadPosition(ledgerId, nextHitEntryId, ledgerId, entryId, location); + } + + if (log.isDebugEnabled()) { + log.debug("Read-ahead task [L{} E:{} - E{} len={}] eventually collected {} messages and {} bytes in {}ms", + readAheadTaskStatus.ledgerId, readAheadTaskStatus.startEntryId, readAheadTaskStatus.endEntryId, + readAheadTaskStatus.readAheadEntries, messages, bytes, + TimeUnit.NANOSECONDS.toMillis(MathUtils.elapsedNanos(readAheadStartNanos))); + } + + // record exec time + dbLedgerStorageStats.getReadAheadBatchCountCounter().add(messages); + dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(bytes); + recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncTotalTime(), submitStartNanos); + } + + public void readAheadAsync(long ledgerId, long entryId, long location, int maxMessages, int maxBytes) { + // add a new ReadAheadTaskStatus before actually read-ahead + inProgressReadAheadTaskStatuses.computeIfAbsent(ledgerId, lid -> new TreeMap<>()); + inProgressReadAheadTaskStatuses.computeIfPresent(ledgerId, (lid, ledgerReadAheadTaskStatuses) -> { + ledgerReadAheadTaskStatuses.put(entryId, new ReadAheadTaskStatus( + ledgerId, entryId, maxMessages, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); + return ledgerReadAheadTaskStatuses; + }); + + // submit the read-ahead task async + readAheadExecutor.submit(SafeRunnable.safeRun( + () -> internalReadAhead(ledgerId, entryId, location, maxMessages, maxBytes, MathUtils.nowInNano()))); + } + + public static class ReadAheadTaskStatus { + private long ledgerId; + private long startEntryId; + private long endEntryId = -1; + private long readAheadEntries; + + private final long readAheadTaskExpiredTimeMs; + private final long readAheadTimeoutMs; + private long readCompletedTimeMs = -1; + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private volatile boolean readCompleted = false; + + public ReadAheadTaskStatus(long ledgerId, long startEntryId, int readAheadEntries, + long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) { + this.ledgerId = ledgerId; + this.startEntryId = startEntryId; + this.readAheadEntries = readAheadEntries; + + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + this.readAheadTimeoutMs = readAheadTimeoutMs; + } + + public boolean hasEntry(long entryId) { + if (!readCompleted) { + // the max-estimate is return before the read-ahead task is completed + return entryId >= startEntryId && entryId <= startEntryId + readAheadEntries; + } else { + return entryId >= startEntryId && entryId <= endEntryId; + } + } + + public void updateEndEntry(long endEntryId) { + this.endEntryId = endEntryId; + } + + public boolean isExpired() { + return readCompletedTimeMs > 0 + && (System.currentTimeMillis() - readCompletedTimeMs) > readAheadTaskExpiredTimeMs; + } + + public void waitUntilReadCompleted(DbLedgerStorageStats dbLedgerStorageStats) { + long blockStartNanos = MathUtils.nowInNano(); + lock.lock(); + try { + while (!readCompleted) { + try { + condition.await(readAheadTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms", + readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs); + readCompleted(); + } + } + } finally { + lock.unlock(); + + // record request blocking time + recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncBlockTime(), blockStartNanos); + } + } + + public void readCompleted() { + lock.lock(); + try { + readCompleted = true; + condition.signalAll(); + readCompletedTimeMs = System.currentTimeMillis(); + + if (log.isDebugEnabled()) { + log.debug("[start:[L{} E{}]][len:{}] Completed reading data ahead", + ledgerId, startEntryId, readAheadEntries); + } + } finally { + lock.unlock(); + } + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index f450906de56..58f312ff582 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -142,6 +142,12 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; + private static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync"; + private static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false; + + private final boolean enableReadAheadAsync; + private final ReadAheadManager readAheadManager; + public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, EntryLogger entryLogger, StatsLogger statsLogger, ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, @@ -219,6 +225,16 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le if (!ledgerBaseDir.equals(indexBaseDir)) { indexDirsManager.addLedgerDirsListener(getLedgerDirsListener()); } + + enableReadAheadAsync = conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC); + if (enableReadAheadAsync) { + readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, readCache, dbLedgerStorageStats, conf); + log.info("Read-ahead running in async mode."); + } else { + readAheadManager = null; + log.info("Read-ahead running in sync mode."); + } } @Override @@ -291,6 +307,10 @@ public void shutdown() throws InterruptedException { readCache.close(); executor.shutdown(); + if (readAheadManager != null) { + readAheadManager.shutdown(); + } + } catch (IOException e) { log.error("Error closing db storage", e); } @@ -549,44 +569,53 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book dbLedgerStorageStats.getWriteCacheMissCounter().inc(); - // Try reading from read-ahead cache - entry = readCache.get(ledgerId, entryId); - if (entry != null) { - dbLedgerStorageStats.getReadCacheHitCounter().inc(); - return entry; - } + // Get entry from storage and trigger read-ahead + long readAheadTotalStartNano = MathUtils.nowInNano(); + if (enableReadAheadAsync) { + // Async mode + entry = readAheadManager.readEntryOrWait(ledgerId, entryId); + } else { + // Sync mode + // Try reading from read-ahead cache + entry = readCache.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getReadCacheHitCounter().inc(); + return entry; + } - dbLedgerStorageStats.getReadCacheMissCounter().inc(); + dbLedgerStorageStats.getReadCacheMissCounter().inc(); - // Read from main storage - long entryLocation; - long locationIndexStartNano = MathUtils.nowInNano(); - try { - entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); - if (entryLocation == 0) { - // Only a negative result while in limbo equates to unknown - throwIfLimbo(ledgerId); + // Read from main storage + long entryLocation; + long locationIndexStartNano = MathUtils.nowInNano(); + try { + entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); + if (entryLocation == 0) { + // Only a negative result while in limbo equates to unknown + throwIfLimbo(ledgerId); - throw new NoEntryException(ledgerId, entryId); - } - } finally { - dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( + throw new NoEntryException(ledgerId, entryId); + } + } finally { + dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); - } + } - long readEntryStartNano = MathUtils.nowInNano(); - try { - entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); - } finally { - dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( + long readEntryStartNano = MathUtils.nowInNano(); + try { + entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); + } finally { + dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); - } + } - readCache.put(ledgerId, entryId, entry); + readCache.put(ledgerId, entryId, entry); - // Try to read more entries - long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); - fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + // Try to read more entries + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + } + recordSuccessfulEvent(dbLedgerStorageStats.getReadAheadTotalTime(), readAheadTotalStartNano); return entry; } @@ -634,8 +663,8 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e); } } finally { - dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count); - dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size); + dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count); + dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size); dbLedgerStorageStats.getReadAheadTime().addLatency( MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS); } From a95dd019df23d387f55dc730c8bc9558411d35b1 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Wed, 16 Mar 2022 20:14:22 +0800 Subject: [PATCH 2/8] fix checkstyle --- .../storage/ldb/DbLedgerStorageStats.java | 2 - .../bookie/storage/ldb/ReadAheadManager.java | 93 ++++++++++--------- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java index e16ac108542..600529b2b04 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java @@ -200,7 +200,6 @@ class DbLedgerStorageStats { ) private final Gauge readCacheCountGauge; - /********** Read-ahead async **********/ @StatsDoc( name = READ_AHEAD_TOTAL_TIME, help = "time spent in reading ahead", @@ -225,7 +224,6 @@ class DbLedgerStorageStats { parent = READ_ENTRY ) private final OpStatsLogger readAheadAsyncBlockTime; - /********** End read-ahead async **********/ DbLedgerStorageStats(StatsLogger stats, Supplier writeCacheSizeSupplier, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index 4ef54df1d99..03389d904bf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -22,16 +22,6 @@ import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.DefaultThreadFactory; -import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.SafeRunnable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -48,6 +38,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}. @@ -56,15 +55,15 @@ public class ReadAheadManager { private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class); - private static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages"; - private static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes"; - private static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio"; - private static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs"; - private static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs"; + public static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages"; + public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes"; + public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio"; + public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs"; + public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs"; // operation behavior indicator - private static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; - private static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; + public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; + public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; @@ -88,11 +87,15 @@ public LedgerEntryPosition(long ledgerId, long entryId) { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } LedgerEntryPosition that = (LedgerEntryPosition) o; - return ledgerId == that.ledgerId && - entryId == that.entryId; + return ledgerId == that.ledgerId + && entryId == that.entryId; } @Override @@ -160,6 +163,14 @@ public long getLocation() { private final long readAheadTaskExpiredTimeMs; private final long readAheadTimeoutMs; + /** + * Entrance for test cases. + * + * @param entryLogger + * @param entryLocationIndex + * @param cache + * @param dbLedgerStorageStats + */ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) { this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, @@ -169,7 +180,7 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio } /** - * Entrance for test cases. + * Entrance for normal use. * * @param entryLogger * @param entryLocationIndex @@ -189,21 +200,6 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio conf.getLong(READ_AHEAD_TIMEOUT_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS)); } - /** - * Entrance for normal use. - * - * @param entryLogger - * @param entryLocationIndex - * @param cache - * @param dbLedgerStorageStats - * @param submitReadAheadTaskImmediately - * @param readAheadTaskPoolSize - * @param readAheadMessages - * @param readAheadBytes - * @param preTriggerReadAheadRatio - * @param readAheadTaskExpiredTimeMs - * @param readAheadTimeoutMs - */ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize, @@ -263,9 +259,9 @@ public void addNextReadPosition(long expectedLedgerId, long expectedEntryId, pendingDeletePositions.add(lep); } - private ReadAheadTaskStatus getNearestTask(long ledgerId, long entryId) { - NavigableMap ledgerReadAheadTaskStatuses - = inProgressReadAheadTaskStatuses.get(ledgerId); + protected ReadAheadTaskStatus getNearestTask(long ledgerId, long entryId) { + NavigableMap ledgerReadAheadTaskStatuses = + inProgressReadAheadTaskStatuses.get(ledgerId); if (ledgerReadAheadTaskStatuses != null) { Map.Entry floorEntry = ledgerReadAheadTaskStatuses.floorEntry(entryId); if (floorEntry != null) { @@ -293,7 +289,7 @@ private void removeExpiredReadAheadTasks() { }); if (pos == null) { pendingDeletePositions.poll(); - reclaimedPositions ++; + reclaimedPositions++; } else { break; } @@ -304,7 +300,7 @@ private void removeExpiredReadAheadTasks() { while (!pendingDeleteReadAheadTaskStatuses.isEmpty() && pendingDeleteReadAheadTaskStatuses.peek().isExpired()) { ReadAheadTaskStatus readAheadTaskStatus = pendingDeleteReadAheadTaskStatuses.poll(); - reclaimedTasks ++; + reclaimedTasks++; inProgressReadAheadTaskStatuses.computeIfPresent( readAheadTaskStatus.ledgerId, (lid, ledgerReadAheadTaskStatuses) -> { @@ -474,7 +470,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, log.debug("[L{} E{}] Entry already exists.", ledgerId, entryId); } readFromCacheBefore = true; - entryId ++; + entryId++; location += (4 + entry.readableBytes()); entry.release(); continue; @@ -513,10 +509,10 @@ private void internalReadAhead(long ledgerId, long entryId, long location, } // update stats - messages ++; + messages++; bytes += entry.readableBytes(); - entryId ++; + entryId++; location += (4 + entry.readableBytes()); } finally { entry.release(); @@ -570,6 +566,9 @@ public void readAheadAsync(long ledgerId, long entryId, long location, int maxMe () -> internalReadAhead(ledgerId, entryId, location, maxMessages, maxBytes, MathUtils.nowInNano()))); } + /** + * A structure that records the transitions of the task status. + */ public static class ReadAheadTaskStatus { private long ledgerId; private long startEntryId; @@ -607,6 +606,10 @@ public void updateEndEntry(long endEntryId) { this.endEntryId = endEntryId; } + public long getEndEntryId() { + return endEntryId; + } + public boolean isExpired() { return readCompletedTimeMs > 0 && (System.currentTimeMillis() - readCompletedTimeMs) > readAheadTaskExpiredTimeMs; From 7b7e7768648100b5eab6526f2e38e5e8fc579bfd Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Wed, 16 Mar 2022 20:15:51 +0800 Subject: [PATCH 3/8] add unit tests --- .../ldb/SingleDirectoryDbLedgerStorage.java | 8 + .../ldb/DbLedgerStorageReadAheadTest.java | 359 ++++++++++++++++++ 2 files changed, 367 insertions(+) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 58f312ff582..8d6c31d61a9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -1053,6 +1053,14 @@ public EntryLocationIndex getEntryLocationIndex() { return entryLocationIndex; } + public ReadCache getReadCache() { + return readCache; + } + + public DbLedgerStorageStats getDbLedgerStorageStats() { + return dbLedgerStorageStats; + } + private void recordSuccessfulEvent(OpStatsLogger logger, long startTimeNanos) { logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java new file mode 100644 index 00000000000..955447f3706 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java @@ -0,0 +1,359 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie.storage.ldb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.bookie.TestBookieImpl; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Basic tests for read-ahead module. + */ +@Slf4j +public class DbLedgerStorageReadAheadTest { + private DbLedgerStorage storage; + private File tmpDir; + + @Before + public void setup() throws Exception { + tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = BookieImpl.getCurrentDirectory(tmpDir); + BookieImpl.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + Bookie bookie = new TestBookieImpl(conf); + + storage = (DbLedgerStorage) bookie.getLedgerStorage(); + } + + @After + public void teardown() throws Exception { + storage.shutdown(); + tmpDir.delete(); + } + + @Test + public void testCanTriggerReadAheadTaskAfterHit() throws Exception { + SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0); + EntryLogger entryLogger = singleDirStorage.getEntryLogger(); + EntryLocationIndex entryLocationIndex = singleDirStorage.getEntryLocationIndex(); + ReadCache cache = singleDirStorage.getReadCache(); + + // write some entries + List, Long>> locations = new ArrayList<>(); + final int ledgers = 2, lInit = 3; + final int entries = 300, eInit = 4; + + for (int i = 0; i < ledgers; i++) { + for (int j = 0; j < entries; j++) { + ByteBuf entry = Unpooled.buffer(1024); + long lid = lInit + i; + long eid = eInit + j; + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeBytes(("entry-" + eid).getBytes()); + long location = entryLogger.addEntry(lid, entry, false); + entryLocationIndex.addLocation(lid, eid, location); + locations.add(Pair.of(Pair.of(lid, eid), location)); + } + } + + ReadAheadManager readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); + + // first pos + Pair, Long> firstEntry = locations.get(0); + long lid = firstEntry.getLeft().getLeft(); + long eid = firstEntry.getLeft().getRight(); + long location = firstEntry.getRight(); + + readAheadManager.addNextReadPosition(lid, eid, lid, eid, location); + assertTrue(readAheadManager.hitInReadAheadPositions(lid, eid)); + } + + @Test + public void testSingleTaskBlockUntilReadAheadTaskComplete() throws Exception { + SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0); + EntryLogger entryLogger = singleDirStorage.getEntryLogger(); + EntryLocationIndex entryLocationIndex = singleDirStorage.getEntryLocationIndex(); + ReadCache cache = singleDirStorage.getReadCache(); + + // write some entries + List, Long>> locations = new ArrayList<>(); + final int ledgers = 2, lInit = 3; + final int entries = 300, eInit = 4; + + for (int i = 0; i < ledgers; i++) { + for (int j = 0; j < entries; j++) { + ByteBuf entry = Unpooled.buffer(1024); + long lid = lInit + i; + long eid = eInit + j; + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeBytes(("entry-" + eid).getBytes()); + long location = entryLogger.addEntry(lid, entry, false); + entryLocationIndex.addLocation(lid, eid, location); + locations.add(Pair.of(Pair.of(lid, eid), location)); + } + } + + ReadAheadManager readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); + + // first pos + Pair, Long> firstEntry = locations.get(0); + long lid = firstEntry.getLeft().getLeft(); + long eid = firstEntry.getLeft().getRight(); + long location = firstEntry.getRight(); + + assertNull(cache.get(lid, eid)); + ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid); // miss + assertEquals(lid, entry.getLong(0)); + assertEquals(eid, entry.getLong(8)); + + assertNotNull(cache.get(lid, eid)); + entry = readAheadManager.readEntryOrWait(lid, eid + 1); // hit + assertEquals(lid, entry.getLong(0)); + assertEquals(eid + 1, entry.getLong(8)); + } + + @Test + public void testMultiTaskBlockUntilReadAheadTaskComplete() throws Exception { + SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0); + EntryLogger entryLogger = singleDirStorage.getEntryLogger(); + EntryLocationIndex entryLocationIndex = singleDirStorage.getEntryLocationIndex(); + ReadCache cache = singleDirStorage.getReadCache(); + + // write some entries + List, Long>> locations = new ArrayList<>(); + final int ledgers = 2, lInit = 3; + final int entries = 300, eInit = 4; + + for (int i = 0; i < ledgers; i++) { + for (int j = 0; j < entries; j++) { + ByteBuf entry = Unpooled.buffer(1024); + long lid = lInit + i; + long eid = eInit + j; + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeBytes(("entry-" + eid).getBytes()); + long location = entryLogger.addEntry(lid, entry, false); + entryLocationIndex.addLocation(lid, eid, location); + locations.add(Pair.of(Pair.of(lid, eid), location)); + } + } + + ReadAheadManager readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); + + // first pos + Pair, Long> firstEntry = locations.get(0); + long lid = firstEntry.getLeft().getLeft(); + long eid = firstEntry.getLeft().getRight(); + long location = firstEntry.getRight(); + + // multi reads + readAheadManager.readEntryOrWait(lid, eid); // miss + + new Thread(() -> { + try { + assertNull(cache.get(lid, eid + 1)); + ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 1); + assertEquals(lid, entry.getLong(0)); + assertEquals(eid + 1, entry.getLong(8)); + } catch (Exception e) { + + } + }).start(); + + new Thread(() -> { + // wait until the read-ahead task is actually executed + while (cache.get(lid, eid + 1) != null) { + // spin + } + + try { + ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 95); + assertEquals(lid, entry.getLong(0)); + assertEquals(eid + 95, entry.getLong(8)); + } catch (Exception e) { + + } + }).start(); + } + + @Test + public void testReadFewEntryAhead() throws Exception { + SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0); + EntryLogger entryLogger = singleDirStorage.getEntryLogger(); + EntryLocationIndex entryLocationIndex = singleDirStorage.getEntryLocationIndex(); + ReadCache cache = singleDirStorage.getReadCache(); + + // write some entries + List, Long>> locations = new ArrayList<>(); + final int ledgers = 2, lInit = 3; + final int entries = 300, eInit = 4; + + for (int i = 0; i < ledgers; i++) { + for (int j = 0; j < entries; j++) { + ByteBuf entry = Unpooled.buffer(1024); + long lid = lInit + i; + long eid = eInit + j; + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeBytes(("entry-" + eid).getBytes()); + long location = entryLogger.addEntry(lid, entry, false); + entryLocationIndex.addLocation(lid, eid, location); + locations.add(Pair.of(Pair.of(lid, eid), location)); + } + } + + ReadAheadManager readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); + + // first pos + Pair, Long> firstEntry = locations.get(0); + long lid = firstEntry.getLeft().getLeft(); + long eid = firstEntry.getLeft().getRight(); + long location = firstEntry.getRight(); + + ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + entries - 2); // miss + assertEquals(lid, entry.getLong(0)); + assertEquals(eid + entries - 2, entry.getLong(8)); + + entry = readAheadManager.readEntryOrWait(lid, eid + entries - 1); // hit + assertEquals(lid, entry.getLong(0)); + assertEquals(eid + entries - 1, entry.getLong(8)); + } + + @Test + public void testReadLastEntry() throws Exception { + SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0); + EntryLogger entryLogger = singleDirStorage.getEntryLogger(); + EntryLocationIndex entryLocationIndex = singleDirStorage.getEntryLocationIndex(); + ReadCache cache = singleDirStorage.getReadCache(); + + // write some entries + ByteBuf entry = Unpooled.buffer(1024); + long lid = 3; + long eid = 4; + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeBytes(("entry-" + eid).getBytes()); + long location = entryLogger.addEntry(lid, entry, false); + entryLocationIndex.addLocation(lid, eid, location); + + ReadAheadManager readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); + + assertNull(cache.get(lid, eid)); + entry = readAheadManager.readEntryOrWait(lid, eid); + assertEquals(lid, entry.getLong(0)); + assertEquals(eid, entry.getLong(8)); + } + + @Test + public void testCanSkipCachedEntriesWhenReadingAhead() throws Exception { + SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0); + EntryLogger entryLogger = singleDirStorage.getEntryLogger(); + EntryLocationIndex entryLocationIndex = singleDirStorage.getEntryLocationIndex(); + ReadCache cache = singleDirStorage.getReadCache(); + + // write some entries + List, Long>> locations = new ArrayList<>(); + final int ledgers = 2, lInit = 3; + final int entries = 300, eInit = 4; + + for (int i = 0; i < ledgers; i++) { + for (int j = 0; j < entries; j++) { + ByteBuf entry = Unpooled.buffer(1024); + long lid = lInit + i; + long eid = eInit + j; + entry.writeLong(lid); // ledger id + entry.writeLong(eid); // entry id + entry.writeBytes(("entry-" + eid).getBytes()); + long location = entryLogger.addEntry(lid, entry, false); + entryLocationIndex.addLocation(lid, eid, location); + locations.add(Pair.of(Pair.of(lid, eid), location)); + } + } + + final long maxEntries = 100; + ServerConfiguration conf = new ServerConfiguration(); + conf.setProperty(ReadAheadManager.READ_AHEAD_MAX_MESSAGES, maxEntries); + conf.setProperty(ReadAheadManager.READ_AHEAD_TASK_POOL_SIZE, 1); + ReadAheadManager readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats(), conf); + + // first pos + Pair, Long> firstEntry = locations.get(0); + long lid = firstEntry.getLeft().getLeft(); + long eid = firstEntry.getLeft().getRight(); + long location = firstEntry.getRight(); + + // set init pos + readAheadManager.readEntryOrWait(lid, eid); // miss + + // read some entries + int skipEntries = 5; + for (int i = 0; i < 5; i++) { + // do not read sequentially + readAheadManager.readEntryOrWait(lid, eid + maxEntries - i); + } + + // start reading ahead + ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 1); + assertEquals(lid, entry.getLong(0)); + assertEquals(eid + 1, entry.getLong(8)); + + // check last entry pos + ReadAheadManager.ReadAheadTaskStatus taskStatus = readAheadManager.getNearestTask(lid, eid + 1); + while (taskStatus.getEndEntryId() < 0) { + Thread.sleep(100); + } + assertEquals(eid + 1 + maxEntries + skipEntries, taskStatus.getEndEntryId()); + } +} From a4a4afd2bbc64b9f962dc4ff517e3a6ef0e77268 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Fri, 15 Apr 2022 17:29:21 +0800 Subject: [PATCH 4/8] refine the timeout mechanism --- .../bookie/storage/ldb/ReadAheadManager.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index 03389d904bf..3748020cc9c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -540,7 +540,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, } if (log.isDebugEnabled()) { - log.debug("Read-ahead task [L{} E:{} - E{} len={}] eventually collected {} messages and {} bytes in {}ms", + log.debug("Read-ahead task [L{} E{} - E{} len={}] eventually collected {} messages and {} bytes in {}ms", readAheadTaskStatus.ledgerId, readAheadTaskStatus.startEntryId, readAheadTaskStatus.endEntryId, readAheadTaskStatus.readAheadEntries, messages, bytes, TimeUnit.NANOSECONDS.toMillis(MathUtils.elapsedNanos(readAheadStartNanos))); @@ -556,8 +556,15 @@ public void readAheadAsync(long ledgerId, long entryId, long location, int maxMe // add a new ReadAheadTaskStatus before actually read-ahead inProgressReadAheadTaskStatuses.computeIfAbsent(ledgerId, lid -> new TreeMap<>()); inProgressReadAheadTaskStatuses.computeIfPresent(ledgerId, (lid, ledgerReadAheadTaskStatuses) -> { - ledgerReadAheadTaskStatuses.put(entryId, new ReadAheadTaskStatus( - ledgerId, entryId, maxMessages, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); + // ensure that the read-ahead task is unique + if (ledgerReadAheadTaskStatuses.containsKey(entryId)) { + if (log.isDebugEnabled()) { + log.debug("Read-ahead task of L{} E{} is redundant", ledgerId, entryId); + } + } else { + ledgerReadAheadTaskStatuses.put(entryId, new ReadAheadTaskStatus( + ledgerId, entryId, maxMessages, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); + } return ledgerReadAheadTaskStatuses; }); @@ -621,10 +628,15 @@ public void waitUntilReadCompleted(DbLedgerStorageStats dbLedgerStorageStats) { try { while (!readCompleted) { try { - condition.await(readAheadTimeoutMs, TimeUnit.MILLISECONDS); + boolean signalled = condition.await(readAheadTimeoutMs, TimeUnit.MILLISECONDS); + if (!signalled) { + log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms", + readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs); + readCompleted(); + } } catch (InterruptedException e) { - log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms", - readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs); + log.warn("Failed to read entries={} ahead from L{} E{} due to the interruption of the thread", + readAheadEntries, ledgerId, startEntryId, e); readCompleted(); } } From 3aa07dd07eb3fc1042bc28a3ed37409357e46b0f Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Mon, 18 Apr 2022 18:41:20 +0800 Subject: [PATCH 5/8] address comments resolve conflicts --- .../bookie/storage/ldb/ReadAheadManager.java | 277 +++++++++++++----- .../ldb/SingleDirectoryDbLedgerStorage.java | 118 +------- .../ldb/DbLedgerStorageReadAheadTest.java | 25 +- 3 files changed, 237 insertions(+), 183 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index 3748020cc9c..d2f7ae778a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -55,25 +55,27 @@ public class ReadAheadManager { private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class); - public static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages"; + public static final String READ_AHEAD_MAX_ENTRIES = "dbStorage_readAheadMaxEntries"; public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes"; public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio"; public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs"; public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs"; // operation behavior indicator + public static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync"; public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; - private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; - private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; - private static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75; + public static final int DEFAULT_READ_AHEAD_ENTRIES = 1000; + public static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; + public static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75; - private static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000; - private static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 30 * 1000; + public static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000; + public static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 5 * 1000; - private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; - private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; + public static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false; + public static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; + public static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; private static final class LedgerEntryPosition { @@ -140,28 +142,37 @@ public long getLocation() { } } - private final ExecutorService readAheadExecutor; - private final ScheduledExecutorService cleanupExecutor; - - private final ConcurrentHashMap pendingReadAheadPositions; - private final ConcurrentLinkedQueue pendingDeletePositions; - - private final ConcurrentHashMap> inProgressReadAheadTaskStatuses; - private final ConcurrentLinkedQueue pendingDeleteReadAheadTaskStatuses; - private final EntryLogger entryLogger; private final EntryLocationIndex entryLocationIndex; private final ReadCache cache; private final DbLedgerStorageStats dbLedgerStorageStats; - private final boolean submitReadAheadTaskImmediately; - private final int readAheadMessages; - private final int readAheadBytes; - private final double preTriggerReadAheadRatio; + private final boolean enableReadAheadAsync; - private final long readAheadTaskExpiredTimeMs; - private final long readAheadTimeoutMs; + /** + * The following parameters apply to both sync and async read-ahead mode. + */ + private int readAheadEntries; + private int readAheadBytes; + + /** + * The following parameters only apply to async read-ahead mode. + */ + private ExecutorService readAheadExecutor; + private ScheduledExecutorService cleanupExecutor; + + private ConcurrentHashMap pendingReadAheadPositions; + private ConcurrentLinkedQueue pendingDeletePositions; + + private ConcurrentHashMap> inProgressReadAheadTaskStatuses; + private ConcurrentLinkedQueue pendingDeleteReadAheadTaskStatuses; + + private boolean submitReadAheadTaskImmediately; + private double preTriggerReadAheadRatio; + + private long readAheadTaskExpiredTimeMs; + private long readAheadTimeoutMs; /** * Entrance for test cases. @@ -173,9 +184,9 @@ public long getLocation() { */ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) { - this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, true, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE, - DEFAULT_READ_AHEAD_MESSAGES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO, + DEFAULT_READ_AHEAD_ENTRIES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS); } @@ -191,9 +202,10 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) { this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC), conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY), conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE), - conf.getInt(READ_AHEAD_MAX_MESSAGES, DEFAULT_READ_AHEAD_MESSAGES), + conf.getInt(READ_AHEAD_MAX_ENTRIES, DEFAULT_READ_AHEAD_ENTRIES), conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES), conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO), conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS), @@ -201,22 +213,10 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio } public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, - ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, boolean enableReadAheadAsync, boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize, - int readAheadMessages, int readAheadBytes, double preTriggerReadAheadRatio, + int readAheadEntries, int readAheadBytes, double preTriggerReadAheadRatio, long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) { - // core components initialization - readAheadExecutor = Executors.newFixedThreadPool( - readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead")); - cleanupExecutor = Executors.newSingleThreadScheduledExecutor( - new DefaultThreadFactory("read-ahead-cleanup")); - - pendingReadAheadPositions = new ConcurrentHashMap<>(); - pendingDeletePositions = new ConcurrentLinkedQueue<>(); - - inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>(); - pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>(); - // external assistant components assignment this.entryLogger = entryLogger; this.entryLocationIndex = entryLocationIndex; @@ -225,22 +225,63 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio // metrics this.dbLedgerStorageStats = dbLedgerStorageStats; - // configurable arguments - this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately; - this.readAheadMessages = readAheadMessages; + // mode + this.enableReadAheadAsync = enableReadAheadAsync; + + // common parameters + this.readAheadEntries = readAheadEntries; this.readAheadBytes = readAheadBytes; - this.preTriggerReadAheadRatio = preTriggerReadAheadRatio; - this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; - this.readAheadTimeoutMs = readAheadTimeoutMs; + if (enableReadAheadAsync) { + + // configurable arguments + this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately; + this.preTriggerReadAheadRatio = preTriggerReadAheadRatio; + + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + this.readAheadTimeoutMs = readAheadTimeoutMs; + + // core components initialization + readAheadExecutor = Executors.newFixedThreadPool( + readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead")); + cleanupExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("read-ahead-cleanup")); - cleanupExecutor.scheduleAtFixedRate( - SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS); + pendingReadAheadPositions = new ConcurrentHashMap<>(); + pendingDeletePositions = new ConcurrentLinkedQueue<>(); + + inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>(); + pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>(); + + cleanupExecutor.scheduleAtFixedRate( + SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS); + } } + /** + * Shutdown the thread pools used in async read-ahead mode. + */ public void shutdown() { - this.readAheadExecutor.shutdown(); - this.cleanupExecutor.shutdown(); + if (enableReadAheadAsync) { + this.readAheadExecutor.shutdown(); + this.cleanupExecutor.shutdown(); + } + } + + /** + * Trigger read-ahead and return the corresponding entry. + * + * @param ledgerId + * @param entryId + * @return + * @throws IOException + */ + public ByteBuf readEntry(long ledgerId, long entryId) throws IOException { + if (enableReadAheadAsync) { + return readEntryUnderAsyncReadAhead(ledgerId, entryId); + } else { + return readEntryUnderSyncReadAhead(ledgerId, entryId); + } } private static void recordStatsInNano(OpStatsLogger logger, long startTimeNanos) { @@ -327,7 +368,7 @@ public boolean hitInReadAheadPositions(long ledgerId, long entryId) { (lep, rap) -> { isHit.set(true); readAheadAsync(rap.getLedgerId(), rap.getEntryId(), rap.getLocation(), - readAheadMessages, readAheadBytes); + readAheadEntries, readAheadBytes); if (log.isDebugEnabled()) { log.debug("Submitted read-ahead task. Info: hit-pos=[L{} E{}] / actual-start-pos=[L{} E{}]", @@ -367,7 +408,7 @@ private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) thr if (submitReadAheadTaskImmediately) { // submit the read-ahead task immediately readAheadAsync(ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes(), - readAheadMessages, readAheadBytes); + readAheadEntries, readAheadBytes); } else { // actually execute read-ahead task after hitting this position next time addNextReadPosition(ledgerId, entryId + 1, @@ -382,15 +423,113 @@ private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) thr return entry; } + private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) { + long readAheadStartNano = MathUtils.nowInNano(); + int count = 0; + long size = 0; + + try { + long firstEntryLogId = (firstEntryLocation >> 32); + long currentEntryLogId = firstEntryLogId; + long currentEntryLocation = firstEntryLocation; + + while (count < readAheadEntries + && size < readAheadBytes + && currentEntryLogId == firstEntryLogId) { + ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation, + false /* validateEntry */); + + try { + long currentEntryLedgerId = entry.getLong(0); + long currentEntryId = entry.getLong(8); + + if (currentEntryLedgerId != orginalLedgerId) { + // Found an entry belonging to a different ledger, stopping read-ahead + break; + } + + // Insert entry in read cache + cache.put(orginalLedgerId, currentEntryId, entry); + + count++; + firstEntryId++; + size += entry.readableBytes(); + + currentEntryLocation += 4 + entry.readableBytes(); + currentEntryLogId = currentEntryLocation >> 32; + } finally { + entry.release(); + } + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e); + } + } finally { + dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count); + dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size); + dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano)); + } + } + + /** + * Read an entry under sync mode. + * This method is moved from {@link SingleDirectoryDbLedgerStorage} to here, + * in order to better unify the process of reading an entry. + * + * @param ledgerId + * @param entryId + * @return + * @throws IOException + */ + private ByteBuf readEntryUnderSyncReadAhead(long ledgerId, long entryId) throws IOException { + ByteBuf entry; + // Try reading from read-ahead cache + entry = cache.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getReadCacheHitCounter().inc(); + return entry; + } + + dbLedgerStorageStats.getReadCacheMissCounter().inc(); + + // Read from main storage + long entryLocation; + long locationIndexStartNano = MathUtils.nowInNano(); + try { + entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); + if (entryLocation == 0) { + throw new Bookie.NoEntryException(ledgerId, entryId); + } + } finally { + dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano)); + } + + long readEntryStartNano = MathUtils.nowInNano(); + try { + entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); + } finally { + dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano)); + } + + cache.put(ledgerId, entryId, entry); + + // Try to read more entries + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + + return entry; + } + /** - * Read an entry or wait until it is ready to be read. + * Read an entry under async mode. * * @param ledgerId * @param entryId * @return * @throws IOException */ - public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException { + private ByteBuf readEntryUnderAsyncReadAhead(long ledgerId, long entryId) throws IOException { // check if we need to read ahead boolean isHit = hitInReadAheadPositions(ledgerId, entryId); if (log.isDebugEnabled()) { @@ -437,7 +576,7 @@ public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException { } private void internalReadAhead(long ledgerId, long entryId, long location, - int maxMessages, int maxBytes, long submitStartNanos) { + int maxEntries, int maxBytes, long submitStartNanos) { // record queue time recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncQueueTime(), submitStartNanos); long readAheadStartNanos = MathUtils.nowInNano(); @@ -452,7 +591,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, List entriesPos = new ArrayList<>(); // read from fc - int messages = 0; + int entries = 0; int bytes = 0; // flag to determine whether keeps reading ahead @@ -462,7 +601,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, boolean readFromCacheBefore = false; try { - while (messages <= maxMessages && bytes <= maxBytes) { + while (entries <= maxEntries && bytes <= maxBytes) { ByteBuf entry = cache.get(ledgerId, entryId); if (entry != null) { @@ -509,7 +648,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, } // update stats - messages++; + entries++; bytes += entry.readableBytes(); entryId++; @@ -523,8 +662,8 @@ private void internalReadAhead(long ledgerId, long entryId, long location, return; } finally { // update the actual range - if (messages >= 1) { - readAheadTaskStatus.updateEndEntry(entriesPos.get(messages - 1)); + if (entries >= 1) { + readAheadTaskStatus.updateEndEntry(entriesPos.get(entries - 1)); } // notify all waiting threads @@ -533,26 +672,26 @@ private void internalReadAhead(long ledgerId, long entryId, long location, } // set next read-ahead pos - if (preTriggerReadAheadRatio > 0 && !earlyExit && messages >= 1) { - int index = Math.min(messages - 1, (int) (preTriggerReadAheadRatio * messages)); + if (preTriggerReadAheadRatio > 0 && !earlyExit && entries >= 1) { + int index = Math.min(entries - 1, (int) (preTriggerReadAheadRatio * entries)); long nextHitEntryId = entriesPos.get(index); addNextReadPosition(ledgerId, nextHitEntryId, ledgerId, entryId, location); } if (log.isDebugEnabled()) { - log.debug("Read-ahead task [L{} E{} - E{} len={}] eventually collected {} messages and {} bytes in {}ms", + log.debug("Read-ahead task [L{} E{} - E{} len={}] eventually collected {} entries and {} bytes in {}ms", readAheadTaskStatus.ledgerId, readAheadTaskStatus.startEntryId, readAheadTaskStatus.endEntryId, - readAheadTaskStatus.readAheadEntries, messages, bytes, + readAheadTaskStatus.readAheadEntries, entries, bytes, TimeUnit.NANOSECONDS.toMillis(MathUtils.elapsedNanos(readAheadStartNanos))); } // record exec time - dbLedgerStorageStats.getReadAheadBatchCountCounter().add(messages); + dbLedgerStorageStats.getReadAheadBatchCountCounter().add(entries); dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(bytes); recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncTotalTime(), submitStartNanos); } - public void readAheadAsync(long ledgerId, long entryId, long location, int maxMessages, int maxBytes) { + public void readAheadAsync(long ledgerId, long entryId, long location, int maxEntries, int maxBytes) { // add a new ReadAheadTaskStatus before actually read-ahead inProgressReadAheadTaskStatuses.computeIfAbsent(ledgerId, lid -> new TreeMap<>()); inProgressReadAheadTaskStatuses.computeIfPresent(ledgerId, (lid, ledgerReadAheadTaskStatuses) -> { @@ -563,20 +702,20 @@ public void readAheadAsync(long ledgerId, long entryId, long location, int maxMe } } else { ledgerReadAheadTaskStatuses.put(entryId, new ReadAheadTaskStatus( - ledgerId, entryId, maxMessages, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); + ledgerId, entryId, maxEntries, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); } return ledgerReadAheadTaskStatuses; }); // submit the read-ahead task async readAheadExecutor.submit(SafeRunnable.safeRun( - () -> internalReadAhead(ledgerId, entryId, location, maxMessages, maxBytes, MathUtils.nowInNano()))); + () -> internalReadAhead(ledgerId, entryId, location, maxEntries, maxBytes, MathUtils.nowInNano()))); } /** * A structure that records the transitions of the task status. */ - public static class ReadAheadTaskStatus { + public static final class ReadAheadTaskStatus { private long ledgerId; private long startEntryId; private long endEntryId = -1; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 8d6c31d61a9..004270da1d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -22,6 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.DEFAULT_ENABLE_READ_AHEAD_ASYNC; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.ENABLE_READ_AHEAD_ASYNC; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.READ_AHEAD_MAX_BYTES; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.READ_AHEAD_MAX_ENTRIES; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -142,10 +146,6 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; - private static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync"; - private static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false; - - private final boolean enableReadAheadAsync; private final ReadAheadManager readAheadManager; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, @@ -226,15 +226,16 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le indexDirsManager.addLedgerDirsListener(getLedgerDirsListener()); } - enableReadAheadAsync = conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC); - if (enableReadAheadAsync) { - readAheadManager = new ReadAheadManager( - entryLogger, entryLocationIndex, readCache, dbLedgerStorageStats, conf); + if (conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC)) { log.info("Read-ahead running in async mode."); } else { - readAheadManager = null; + // overwrite and convert + conf.setProperty(READ_AHEAD_MAX_ENTRIES, readAheadCacheBatchSize); + conf.setProperty(READ_AHEAD_MAX_BYTES, maxReadAheadBytesSize); log.info("Read-ahead running in sync mode."); } + readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, readCache, dbLedgerStorageStats, conf); } @Override @@ -571,105 +572,18 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book // Get entry from storage and trigger read-ahead long readAheadTotalStartNano = MathUtils.nowInNano(); - if (enableReadAheadAsync) { - // Async mode - entry = readAheadManager.readEntryOrWait(ledgerId, entryId); - } else { - // Sync mode - // Try reading from read-ahead cache - entry = readCache.get(ledgerId, entryId); - if (entry != null) { - dbLedgerStorageStats.getReadCacheHitCounter().inc(); - return entry; - } - - dbLedgerStorageStats.getReadCacheMissCounter().inc(); - - // Read from main storage - long entryLocation; - long locationIndexStartNano = MathUtils.nowInNano(); - try { - entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); - if (entryLocation == 0) { - // Only a negative result while in limbo equates to unknown - throwIfLimbo(ledgerId); - - throw new NoEntryException(ledgerId, entryId); - } - } finally { - dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( - MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); - } - - long readEntryStartNano = MathUtils.nowInNano(); - try { - entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); - } finally { - dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( - MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); - } - - readCache.put(ledgerId, entryId, entry); - - // Try to read more entries - long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); - fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + try { + entry = readAheadManager.readEntry(ledgerId, entryId); + } catch (IOException e) { + // Only a negative result while in limbo equates to unknown + throwIfLimbo(ledgerId); + throw e; } recordSuccessfulEvent(dbLedgerStorageStats.getReadAheadTotalTime(), readAheadTotalStartNano); return entry; } - private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) { - long readAheadStartNano = MathUtils.nowInNano(); - int count = 0; - long size = 0; - - try { - long firstEntryLogId = (firstEntryLocation >> 32); - long currentEntryLogId = firstEntryLogId; - long currentEntryLocation = firstEntryLocation; - - while (count < readAheadCacheBatchSize - && size < maxReadAheadBytesSize - && currentEntryLogId == firstEntryLogId) { - ByteBuf entry = entryLogger.readEntry(orginalLedgerId, - firstEntryId, currentEntryLocation); - - try { - long currentEntryLedgerId = entry.getLong(0); - long currentEntryId = entry.getLong(8); - - if (currentEntryLedgerId != orginalLedgerId) { - // Found an entry belonging to a different ledger, stopping read-ahead - break; - } - - // Insert entry in read cache - readCache.put(orginalLedgerId, currentEntryId, entry); - - count++; - firstEntryId++; - size += entry.readableBytes(); - - currentEntryLocation += 4 + entry.readableBytes(); - currentEntryLogId = currentEntryLocation >> 32; - } finally { - entry.release(); - } - } - } catch (Exception e) { - if (log.isDebugEnabled()) { - log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e); - } - } finally { - dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count); - dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size); - dbLedgerStorageStats.getReadAheadTime().addLatency( - MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS); - } - } - public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException { throwIfLimbo(ledgerId); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java index 955447f3706..04068475a50 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java @@ -150,12 +150,12 @@ public void testSingleTaskBlockUntilReadAheadTaskComplete() throws Exception { long location = firstEntry.getRight(); assertNull(cache.get(lid, eid)); - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid); // miss + ByteBuf entry = readAheadManager.readEntry(lid, eid); // miss assertEquals(lid, entry.getLong(0)); assertEquals(eid, entry.getLong(8)); assertNotNull(cache.get(lid, eid)); - entry = readAheadManager.readEntryOrWait(lid, eid + 1); // hit + entry = readAheadManager.readEntry(lid, eid + 1); // hit assertEquals(lid, entry.getLong(0)); assertEquals(eid + 1, entry.getLong(8)); } @@ -196,12 +196,12 @@ public void testMultiTaskBlockUntilReadAheadTaskComplete() throws Exception { long location = firstEntry.getRight(); // multi reads - readAheadManager.readEntryOrWait(lid, eid); // miss + readAheadManager.readEntry(lid, eid); // miss new Thread(() -> { try { assertNull(cache.get(lid, eid + 1)); - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 1); + ByteBuf entry = readAheadManager.readEntry(lid, eid + 1); assertEquals(lid, entry.getLong(0)); assertEquals(eid + 1, entry.getLong(8)); } catch (Exception e) { @@ -216,7 +216,7 @@ public void testMultiTaskBlockUntilReadAheadTaskComplete() throws Exception { } try { - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 95); + ByteBuf entry = readAheadManager.readEntry(lid, eid + 95); assertEquals(lid, entry.getLong(0)); assertEquals(eid + 95, entry.getLong(8)); } catch (Exception e) { @@ -260,11 +260,11 @@ public void testReadFewEntryAhead() throws Exception { long eid = firstEntry.getLeft().getRight(); long location = firstEntry.getRight(); - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + entries - 2); // miss + ByteBuf entry = readAheadManager.readEntry(lid, eid + entries - 2); // miss assertEquals(lid, entry.getLong(0)); assertEquals(eid + entries - 2, entry.getLong(8)); - entry = readAheadManager.readEntryOrWait(lid, eid + entries - 1); // hit + entry = readAheadManager.readEntry(lid, eid + entries - 1); // hit assertEquals(lid, entry.getLong(0)); assertEquals(eid + entries - 1, entry.getLong(8)); } @@ -290,7 +290,7 @@ public void testReadLastEntry() throws Exception { entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); assertNull(cache.get(lid, eid)); - entry = readAheadManager.readEntryOrWait(lid, eid); + entry = readAheadManager.readEntry(lid, eid); assertEquals(lid, entry.getLong(0)); assertEquals(eid, entry.getLong(8)); } @@ -323,7 +323,8 @@ public void testCanSkipCachedEntriesWhenReadingAhead() throws Exception { final long maxEntries = 100; ServerConfiguration conf = new ServerConfiguration(); - conf.setProperty(ReadAheadManager.READ_AHEAD_MAX_MESSAGES, maxEntries); + conf.setProperty(ReadAheadManager.ENABLE_READ_AHEAD_ASYNC, true); + conf.setProperty(ReadAheadManager.READ_AHEAD_MAX_ENTRIES, maxEntries); conf.setProperty(ReadAheadManager.READ_AHEAD_TASK_POOL_SIZE, 1); ReadAheadManager readAheadManager = new ReadAheadManager( entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats(), conf); @@ -335,17 +336,17 @@ public void testCanSkipCachedEntriesWhenReadingAhead() throws Exception { long location = firstEntry.getRight(); // set init pos - readAheadManager.readEntryOrWait(lid, eid); // miss + readAheadManager.readEntry(lid, eid); // miss // read some entries int skipEntries = 5; for (int i = 0; i < 5; i++) { // do not read sequentially - readAheadManager.readEntryOrWait(lid, eid + maxEntries - i); + readAheadManager.readEntry(lid, eid + maxEntries - i); } // start reading ahead - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 1); + ByteBuf entry = readAheadManager.readEntry(lid, eid + 1); assertEquals(lid, entry.getLong(0)); assertEquals(eid + 1, entry.getLong(8)); From 935e406c01fd504cf04cf22a2efca4e76c5d9a09 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Tue, 31 May 2022 17:00:03 +0800 Subject: [PATCH 6/8] resolve conflicts --- .../bookie/storage/ldb/ReadAheadManager.java | 7 +++---- .../storage/ldb/DbLedgerStorageReadAheadTest.java | 14 +++++++------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index d2f7ae778a2..b322033352e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -39,7 +39,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -436,8 +436,7 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi while (count < readAheadEntries && size < readAheadBytes && currentEntryLogId == firstEntryLogId) { - ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation, - false /* validateEntry */); + ByteBuf entry = entryLogger.readEntry(orginalLedgerId, firstEntryId, currentEntryLocation); try { long currentEntryLedgerId = entry.getLong(0); @@ -626,7 +625,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, break; } } else { - entry = entryLogger.internalReadEntry(ledgerId, entryId, location, false); + entry = entryLogger.readEntry(ledgerId, entryId, location); } try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java index 04068475a50..67a5c5b2f18 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java @@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieImpl; -import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; @@ -95,7 +95,7 @@ public void testCanTriggerReadAheadTaskAfterHit() throws Exception { entry.writeLong(lid); // ledger id entry.writeLong(eid); // entry id entry.writeBytes(("entry-" + eid).getBytes()); - long location = entryLogger.addEntry(lid, entry, false); + long location = entryLogger.addEntry(lid, entry); entryLocationIndex.addLocation(lid, eid, location); locations.add(Pair.of(Pair.of(lid, eid), location)); } @@ -134,7 +134,7 @@ public void testSingleTaskBlockUntilReadAheadTaskComplete() throws Exception { entry.writeLong(lid); // ledger id entry.writeLong(eid); // entry id entry.writeBytes(("entry-" + eid).getBytes()); - long location = entryLogger.addEntry(lid, entry, false); + long location = entryLogger.addEntry(lid, entry); entryLocationIndex.addLocation(lid, eid, location); locations.add(Pair.of(Pair.of(lid, eid), location)); } @@ -180,7 +180,7 @@ public void testMultiTaskBlockUntilReadAheadTaskComplete() throws Exception { entry.writeLong(lid); // ledger id entry.writeLong(eid); // entry id entry.writeBytes(("entry-" + eid).getBytes()); - long location = entryLogger.addEntry(lid, entry, false); + long location = entryLogger.addEntry(lid, entry); entryLocationIndex.addLocation(lid, eid, location); locations.add(Pair.of(Pair.of(lid, eid), location)); } @@ -245,7 +245,7 @@ public void testReadFewEntryAhead() throws Exception { entry.writeLong(lid); // ledger id entry.writeLong(eid); // entry id entry.writeBytes(("entry-" + eid).getBytes()); - long location = entryLogger.addEntry(lid, entry, false); + long location = entryLogger.addEntry(lid, entry); entryLocationIndex.addLocation(lid, eid, location); locations.add(Pair.of(Pair.of(lid, eid), location)); } @@ -283,7 +283,7 @@ public void testReadLastEntry() throws Exception { entry.writeLong(lid); // ledger id entry.writeLong(eid); // entry id entry.writeBytes(("entry-" + eid).getBytes()); - long location = entryLogger.addEntry(lid, entry, false); + long location = entryLogger.addEntry(lid, entry); entryLocationIndex.addLocation(lid, eid, location); ReadAheadManager readAheadManager = new ReadAheadManager( @@ -315,7 +315,7 @@ public void testCanSkipCachedEntriesWhenReadingAhead() throws Exception { entry.writeLong(lid); // ledger id entry.writeLong(eid); // entry id entry.writeBytes(("entry-" + eid).getBytes()); - long location = entryLogger.addEntry(lid, entry, false); + long location = entryLogger.addEntry(lid, entry); entryLocationIndex.addLocation(lid, eid, location); locations.add(Pair.of(Pair.of(lid, eid), location)); } From 906c288583441e7bd8249654730b7c99ec987a71 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Tue, 31 May 2022 17:03:43 +0800 Subject: [PATCH 7/8] add additional location validation --- .../bookkeeper/bookie/storage/ldb/ReadAheadManager.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index b322033352e..0269d3d7945 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -622,6 +622,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, if (log.isDebugEnabled()) { log.debug("[L{} E{}] Failed to locate entry with location:{}", ledgerId, entryId, location); } + earlyExit = true; break; } } else { @@ -656,6 +657,11 @@ private void internalReadAhead(long ledgerId, long entryId, long location, entry.release(); } } + } catch (IOException e) { + if (log.isDebugEnabled()) { + log.debug("[L{} E{}] Failed to read from file channel", ledgerId, entryId, e); + } + return; } catch (Exception e) { log.error("Exception during reading ahead for L{} E{}", ledgerId, entryId, e); return; @@ -674,6 +680,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, if (preTriggerReadAheadRatio > 0 && !earlyExit && entries >= 1) { int index = Math.min(entries - 1, (int) (preTriggerReadAheadRatio * entries)); long nextHitEntryId = entriesPos.get(index); + // ensure that the location has been validated addNextReadPosition(ledgerId, nextHitEntryId, ledgerId, entryId, location); } From b25b0f8f6242ae6b38d31ad7dfce80818ce2e7f5 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Tue, 31 May 2022 17:10:04 +0800 Subject: [PATCH 8/8] force signal all waiting threads after doing cleanup --- .../bookie/storage/ldb/ReadAheadManager.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index 0269d3d7945..62a8b53d47b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -348,6 +348,9 @@ private void removeExpiredReadAheadTasks() { ledgerReadAheadTaskStatuses.remove(readAheadTaskStatus.startEntryId); return ledgerReadAheadTaskStatuses.isEmpty() ? null : ledgerReadAheadTaskStatuses; }); + + // signal all waiting threads if it has after actually removing from the in-progress map + readAheadTaskStatus.forceSignalAllIfHasWaiters(); } if (log.isDebugEnabled()) { @@ -583,6 +586,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, // obtain the read-ahead info ReadAheadTaskStatus readAheadTaskStatus = getNearestTask(ledgerId, entryId); if (readAheadTaskStatus == null) { + log.info("The RA-task L{} E{} has already been cleaned", ledgerId, entryId); return; } @@ -762,6 +766,20 @@ public long getEndEntryId() { return endEntryId; } + public void forceSignalAllIfHasWaiters() { + lock.lock(); + try { + final boolean hasWaiters = lock.hasWaiters(condition); + if (hasWaiters) { + log.info("RA-task of L{} E{} - {} still has waiters when doing clean-up", + ledgerId, startEntryId, endEntryId); + condition.signalAll(); + } + } finally { + lock.unlock(); + } + } + public boolean isExpired() { return readCompletedTimeMs > 0 && (System.currentTimeMillis() - readCompletedTimeMs) > readAheadTaskExpiredTimeMs;