diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java index 3748020cc9c..d2f7ae778a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java @@ -55,25 +55,27 @@ public class ReadAheadManager { private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class); - public static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages"; + public static final String READ_AHEAD_MAX_ENTRIES = "dbStorage_readAheadMaxEntries"; public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes"; public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio"; public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs"; public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs"; // operation behavior indicator + public static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync"; public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; - private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; - private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; - private static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75; + public static final int DEFAULT_READ_AHEAD_ENTRIES = 1000; + public static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; + public static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75; - private static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000; - private static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 30 * 1000; + public static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000; + public static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 5 * 1000; - private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; - private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; + public static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false; + public static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; + public static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; private static final class LedgerEntryPosition { @@ -140,28 +142,37 @@ public long getLocation() { } } - private final ExecutorService readAheadExecutor; - private final ScheduledExecutorService cleanupExecutor; - - private final ConcurrentHashMap pendingReadAheadPositions; - private final ConcurrentLinkedQueue pendingDeletePositions; - - private final ConcurrentHashMap> inProgressReadAheadTaskStatuses; - private final ConcurrentLinkedQueue pendingDeleteReadAheadTaskStatuses; - private final EntryLogger entryLogger; private final EntryLocationIndex entryLocationIndex; private final ReadCache cache; private final DbLedgerStorageStats dbLedgerStorageStats; - private final boolean submitReadAheadTaskImmediately; - private final int readAheadMessages; - private final int readAheadBytes; - private final double preTriggerReadAheadRatio; + private final boolean enableReadAheadAsync; - private final long readAheadTaskExpiredTimeMs; - private final long readAheadTimeoutMs; + /** + * The following parameters apply to both sync and async read-ahead mode. + */ + private int readAheadEntries; + private int readAheadBytes; + + /** + * The following parameters only apply to async read-ahead mode. + */ + private ExecutorService readAheadExecutor; + private ScheduledExecutorService cleanupExecutor; + + private ConcurrentHashMap pendingReadAheadPositions; + private ConcurrentLinkedQueue pendingDeletePositions; + + private ConcurrentHashMap> inProgressReadAheadTaskStatuses; + private ConcurrentLinkedQueue pendingDeleteReadAheadTaskStatuses; + + private boolean submitReadAheadTaskImmediately; + private double preTriggerReadAheadRatio; + + private long readAheadTaskExpiredTimeMs; + private long readAheadTimeoutMs; /** * Entrance for test cases. @@ -173,9 +184,9 @@ public long getLocation() { */ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) { - this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, true, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE, - DEFAULT_READ_AHEAD_MESSAGES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO, + DEFAULT_READ_AHEAD_ENTRIES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS); } @@ -191,9 +202,10 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) { this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC), conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY), conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE), - conf.getInt(READ_AHEAD_MAX_MESSAGES, DEFAULT_READ_AHEAD_MESSAGES), + conf.getInt(READ_AHEAD_MAX_ENTRIES, DEFAULT_READ_AHEAD_ENTRIES), conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES), conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO), conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS), @@ -201,22 +213,10 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio } public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, - ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, boolean enableReadAheadAsync, boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize, - int readAheadMessages, int readAheadBytes, double preTriggerReadAheadRatio, + int readAheadEntries, int readAheadBytes, double preTriggerReadAheadRatio, long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) { - // core components initialization - readAheadExecutor = Executors.newFixedThreadPool( - readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead")); - cleanupExecutor = Executors.newSingleThreadScheduledExecutor( - new DefaultThreadFactory("read-ahead-cleanup")); - - pendingReadAheadPositions = new ConcurrentHashMap<>(); - pendingDeletePositions = new ConcurrentLinkedQueue<>(); - - inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>(); - pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>(); - // external assistant components assignment this.entryLogger = entryLogger; this.entryLocationIndex = entryLocationIndex; @@ -225,22 +225,63 @@ public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocatio // metrics this.dbLedgerStorageStats = dbLedgerStorageStats; - // configurable arguments - this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately; - this.readAheadMessages = readAheadMessages; + // mode + this.enableReadAheadAsync = enableReadAheadAsync; + + // common parameters + this.readAheadEntries = readAheadEntries; this.readAheadBytes = readAheadBytes; - this.preTriggerReadAheadRatio = preTriggerReadAheadRatio; - this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; - this.readAheadTimeoutMs = readAheadTimeoutMs; + if (enableReadAheadAsync) { + + // configurable arguments + this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately; + this.preTriggerReadAheadRatio = preTriggerReadAheadRatio; + + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + this.readAheadTimeoutMs = readAheadTimeoutMs; + + // core components initialization + readAheadExecutor = Executors.newFixedThreadPool( + readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead")); + cleanupExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("read-ahead-cleanup")); - cleanupExecutor.scheduleAtFixedRate( - SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS); + pendingReadAheadPositions = new ConcurrentHashMap<>(); + pendingDeletePositions = new ConcurrentLinkedQueue<>(); + + inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>(); + pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>(); + + cleanupExecutor.scheduleAtFixedRate( + SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS); + } } + /** + * Shutdown the thread pools used in async read-ahead mode. + */ public void shutdown() { - this.readAheadExecutor.shutdown(); - this.cleanupExecutor.shutdown(); + if (enableReadAheadAsync) { + this.readAheadExecutor.shutdown(); + this.cleanupExecutor.shutdown(); + } + } + + /** + * Trigger read-ahead and return the corresponding entry. + * + * @param ledgerId + * @param entryId + * @return + * @throws IOException + */ + public ByteBuf readEntry(long ledgerId, long entryId) throws IOException { + if (enableReadAheadAsync) { + return readEntryUnderAsyncReadAhead(ledgerId, entryId); + } else { + return readEntryUnderSyncReadAhead(ledgerId, entryId); + } } private static void recordStatsInNano(OpStatsLogger logger, long startTimeNanos) { @@ -327,7 +368,7 @@ public boolean hitInReadAheadPositions(long ledgerId, long entryId) { (lep, rap) -> { isHit.set(true); readAheadAsync(rap.getLedgerId(), rap.getEntryId(), rap.getLocation(), - readAheadMessages, readAheadBytes); + readAheadEntries, readAheadBytes); if (log.isDebugEnabled()) { log.debug("Submitted read-ahead task. Info: hit-pos=[L{} E{}] / actual-start-pos=[L{} E{}]", @@ -367,7 +408,7 @@ private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) thr if (submitReadAheadTaskImmediately) { // submit the read-ahead task immediately readAheadAsync(ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes(), - readAheadMessages, readAheadBytes); + readAheadEntries, readAheadBytes); } else { // actually execute read-ahead task after hitting this position next time addNextReadPosition(ledgerId, entryId + 1, @@ -382,15 +423,113 @@ private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) thr return entry; } + private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) { + long readAheadStartNano = MathUtils.nowInNano(); + int count = 0; + long size = 0; + + try { + long firstEntryLogId = (firstEntryLocation >> 32); + long currentEntryLogId = firstEntryLogId; + long currentEntryLocation = firstEntryLocation; + + while (count < readAheadEntries + && size < readAheadBytes + && currentEntryLogId == firstEntryLogId) { + ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation, + false /* validateEntry */); + + try { + long currentEntryLedgerId = entry.getLong(0); + long currentEntryId = entry.getLong(8); + + if (currentEntryLedgerId != orginalLedgerId) { + // Found an entry belonging to a different ledger, stopping read-ahead + break; + } + + // Insert entry in read cache + cache.put(orginalLedgerId, currentEntryId, entry); + + count++; + firstEntryId++; + size += entry.readableBytes(); + + currentEntryLocation += 4 + entry.readableBytes(); + currentEntryLogId = currentEntryLocation >> 32; + } finally { + entry.release(); + } + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e); + } + } finally { + dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count); + dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size); + dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano)); + } + } + + /** + * Read an entry under sync mode. + * This method is moved from {@link SingleDirectoryDbLedgerStorage} to here, + * in order to better unify the process of reading an entry. + * + * @param ledgerId + * @param entryId + * @return + * @throws IOException + */ + private ByteBuf readEntryUnderSyncReadAhead(long ledgerId, long entryId) throws IOException { + ByteBuf entry; + // Try reading from read-ahead cache + entry = cache.get(ledgerId, entryId); + if (entry != null) { + dbLedgerStorageStats.getReadCacheHitCounter().inc(); + return entry; + } + + dbLedgerStorageStats.getReadCacheMissCounter().inc(); + + // Read from main storage + long entryLocation; + long locationIndexStartNano = MathUtils.nowInNano(); + try { + entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); + if (entryLocation == 0) { + throw new Bookie.NoEntryException(ledgerId, entryId); + } + } finally { + dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano)); + } + + long readEntryStartNano = MathUtils.nowInNano(); + try { + entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); + } finally { + dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano)); + } + + cache.put(ledgerId, entryId, entry); + + // Try to read more entries + long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); + fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + + return entry; + } + /** - * Read an entry or wait until it is ready to be read. + * Read an entry under async mode. * * @param ledgerId * @param entryId * @return * @throws IOException */ - public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException { + private ByteBuf readEntryUnderAsyncReadAhead(long ledgerId, long entryId) throws IOException { // check if we need to read ahead boolean isHit = hitInReadAheadPositions(ledgerId, entryId); if (log.isDebugEnabled()) { @@ -437,7 +576,7 @@ public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException { } private void internalReadAhead(long ledgerId, long entryId, long location, - int maxMessages, int maxBytes, long submitStartNanos) { + int maxEntries, int maxBytes, long submitStartNanos) { // record queue time recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncQueueTime(), submitStartNanos); long readAheadStartNanos = MathUtils.nowInNano(); @@ -452,7 +591,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, List entriesPos = new ArrayList<>(); // read from fc - int messages = 0; + int entries = 0; int bytes = 0; // flag to determine whether keeps reading ahead @@ -462,7 +601,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, boolean readFromCacheBefore = false; try { - while (messages <= maxMessages && bytes <= maxBytes) { + while (entries <= maxEntries && bytes <= maxBytes) { ByteBuf entry = cache.get(ledgerId, entryId); if (entry != null) { @@ -509,7 +648,7 @@ private void internalReadAhead(long ledgerId, long entryId, long location, } // update stats - messages++; + entries++; bytes += entry.readableBytes(); entryId++; @@ -523,8 +662,8 @@ private void internalReadAhead(long ledgerId, long entryId, long location, return; } finally { // update the actual range - if (messages >= 1) { - readAheadTaskStatus.updateEndEntry(entriesPos.get(messages - 1)); + if (entries >= 1) { + readAheadTaskStatus.updateEndEntry(entriesPos.get(entries - 1)); } // notify all waiting threads @@ -533,26 +672,26 @@ private void internalReadAhead(long ledgerId, long entryId, long location, } // set next read-ahead pos - if (preTriggerReadAheadRatio > 0 && !earlyExit && messages >= 1) { - int index = Math.min(messages - 1, (int) (preTriggerReadAheadRatio * messages)); + if (preTriggerReadAheadRatio > 0 && !earlyExit && entries >= 1) { + int index = Math.min(entries - 1, (int) (preTriggerReadAheadRatio * entries)); long nextHitEntryId = entriesPos.get(index); addNextReadPosition(ledgerId, nextHitEntryId, ledgerId, entryId, location); } if (log.isDebugEnabled()) { - log.debug("Read-ahead task [L{} E{} - E{} len={}] eventually collected {} messages and {} bytes in {}ms", + log.debug("Read-ahead task [L{} E{} - E{} len={}] eventually collected {} entries and {} bytes in {}ms", readAheadTaskStatus.ledgerId, readAheadTaskStatus.startEntryId, readAheadTaskStatus.endEntryId, - readAheadTaskStatus.readAheadEntries, messages, bytes, + readAheadTaskStatus.readAheadEntries, entries, bytes, TimeUnit.NANOSECONDS.toMillis(MathUtils.elapsedNanos(readAheadStartNanos))); } // record exec time - dbLedgerStorageStats.getReadAheadBatchCountCounter().add(messages); + dbLedgerStorageStats.getReadAheadBatchCountCounter().add(entries); dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(bytes); recordStatsInNano(dbLedgerStorageStats.getReadAheadAsyncTotalTime(), submitStartNanos); } - public void readAheadAsync(long ledgerId, long entryId, long location, int maxMessages, int maxBytes) { + public void readAheadAsync(long ledgerId, long entryId, long location, int maxEntries, int maxBytes) { // add a new ReadAheadTaskStatus before actually read-ahead inProgressReadAheadTaskStatuses.computeIfAbsent(ledgerId, lid -> new TreeMap<>()); inProgressReadAheadTaskStatuses.computeIfPresent(ledgerId, (lid, ledgerReadAheadTaskStatuses) -> { @@ -563,20 +702,20 @@ public void readAheadAsync(long ledgerId, long entryId, long location, int maxMe } } else { ledgerReadAheadTaskStatuses.put(entryId, new ReadAheadTaskStatus( - ledgerId, entryId, maxMessages, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); + ledgerId, entryId, maxEntries, readAheadTaskExpiredTimeMs, readAheadTimeoutMs)); } return ledgerReadAheadTaskStatuses; }); // submit the read-ahead task async readAheadExecutor.submit(SafeRunnable.safeRun( - () -> internalReadAhead(ledgerId, entryId, location, maxMessages, maxBytes, MathUtils.nowInNano()))); + () -> internalReadAhead(ledgerId, entryId, location, maxEntries, maxBytes, MathUtils.nowInNano()))); } /** * A structure that records the transitions of the task status. */ - public static class ReadAheadTaskStatus { + public static final class ReadAheadTaskStatus { private long ledgerId; private long startEntryId; private long endEntryId = -1; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index d866d356b56..b5bfdbbfa82 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -22,6 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.DEFAULT_ENABLE_READ_AHEAD_ASYNC; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.ENABLE_READ_AHEAD_ASYNC; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.READ_AHEAD_MAX_BYTES; +import static org.apache.bookkeeper.bookie.storage.ldb.ReadAheadManager.READ_AHEAD_MAX_ENTRIES; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -143,10 +147,6 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; - private static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync"; - private static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false; - - private final boolean enableReadAheadAsync; private final ReadAheadManager readAheadManager; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, @@ -212,15 +212,16 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); - enableReadAheadAsync = conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC); - if (enableReadAheadAsync) { - readAheadManager = new ReadAheadManager( - entryLogger, entryLocationIndex, readCache, dbLedgerStorageStats, conf); + if (conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC)) { log.info("Read-ahead running in async mode."); } else { - readAheadManager = null; + // overwrite and convert + conf.setProperty(READ_AHEAD_MAX_ENTRIES, readAheadCacheBatchSize); + conf.setProperty(READ_AHEAD_MAX_BYTES, maxReadAheadBytesSize); log.info("Read-ahead running in sync mode."); } + readAheadManager = new ReadAheadManager( + entryLogger, entryLocationIndex, readCache, dbLedgerStorageStats, conf); } @Override @@ -549,102 +550,18 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book // Get entry from storage and trigger read-ahead long readAheadTotalStartNano = MathUtils.nowInNano(); - if (enableReadAheadAsync) { - // Async mode - entry = readAheadManager.readEntryOrWait(ledgerId, entryId); - } else { - // Sync mode - // Try reading from read-ahead cache - entry = readCache.get(ledgerId, entryId); - if (entry != null) { - dbLedgerStorageStats.getReadCacheHitCounter().inc(); - return entry; - } - - dbLedgerStorageStats.getReadCacheMissCounter().inc(); - - // Read from main storage - long entryLocation; - long locationIndexStartNano = MathUtils.nowInNano(); - try { - entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); - if (entryLocation == 0) { - // Only a negative result while in limbo equates to unknown - throwIfLimbo(ledgerId); - - throw new NoEntryException(ledgerId, entryId); - } - } finally { - dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano)); - } - - long readEntryStartNano = MathUtils.nowInNano(); - try { - entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); - } finally { - dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano)); - } - - readCache.put(ledgerId, entryId, entry); - - // Try to read more entries - long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); - fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); + try { + entry = readAheadManager.readEntry(ledgerId, entryId); + } catch (IOException e) { + // Only a negative result while in limbo equates to unknown + throwIfLimbo(ledgerId); + throw e; } recordSuccessfulEvent(dbLedgerStorageStats.getReadAheadTotalTime(), readAheadTotalStartNano); return entry; } - private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) { - long readAheadStartNano = MathUtils.nowInNano(); - int count = 0; - long size = 0; - - try { - long firstEntryLogId = (firstEntryLocation >> 32); - long currentEntryLogId = firstEntryLogId; - long currentEntryLocation = firstEntryLocation; - - while (count < readAheadCacheBatchSize - && size < maxReadAheadBytesSize - && currentEntryLogId == firstEntryLogId) { - ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation, - false /* validateEntry */); - - try { - long currentEntryLedgerId = entry.getLong(0); - long currentEntryId = entry.getLong(8); - - if (currentEntryLedgerId != orginalLedgerId) { - // Found an entry belonging to a different ledger, stopping read-ahead - break; - } - - // Insert entry in read cache - readCache.put(orginalLedgerId, currentEntryId, entry); - - count++; - firstEntryId++; - size += entry.readableBytes(); - - currentEntryLocation += 4 + entry.readableBytes(); - currentEntryLogId = currentEntryLocation >> 32; - } finally { - entry.release(); - } - } - } catch (Exception e) { - if (log.isDebugEnabled()) { - log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e); - } - } finally { - dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count); - dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size); - dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano)); - } - } - public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException { throwIfLimbo(ledgerId); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java index 955447f3706..04068475a50 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadAheadTest.java @@ -150,12 +150,12 @@ public void testSingleTaskBlockUntilReadAheadTaskComplete() throws Exception { long location = firstEntry.getRight(); assertNull(cache.get(lid, eid)); - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid); // miss + ByteBuf entry = readAheadManager.readEntry(lid, eid); // miss assertEquals(lid, entry.getLong(0)); assertEquals(eid, entry.getLong(8)); assertNotNull(cache.get(lid, eid)); - entry = readAheadManager.readEntryOrWait(lid, eid + 1); // hit + entry = readAheadManager.readEntry(lid, eid + 1); // hit assertEquals(lid, entry.getLong(0)); assertEquals(eid + 1, entry.getLong(8)); } @@ -196,12 +196,12 @@ public void testMultiTaskBlockUntilReadAheadTaskComplete() throws Exception { long location = firstEntry.getRight(); // multi reads - readAheadManager.readEntryOrWait(lid, eid); // miss + readAheadManager.readEntry(lid, eid); // miss new Thread(() -> { try { assertNull(cache.get(lid, eid + 1)); - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 1); + ByteBuf entry = readAheadManager.readEntry(lid, eid + 1); assertEquals(lid, entry.getLong(0)); assertEquals(eid + 1, entry.getLong(8)); } catch (Exception e) { @@ -216,7 +216,7 @@ public void testMultiTaskBlockUntilReadAheadTaskComplete() throws Exception { } try { - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 95); + ByteBuf entry = readAheadManager.readEntry(lid, eid + 95); assertEquals(lid, entry.getLong(0)); assertEquals(eid + 95, entry.getLong(8)); } catch (Exception e) { @@ -260,11 +260,11 @@ public void testReadFewEntryAhead() throws Exception { long eid = firstEntry.getLeft().getRight(); long location = firstEntry.getRight(); - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + entries - 2); // miss + ByteBuf entry = readAheadManager.readEntry(lid, eid + entries - 2); // miss assertEquals(lid, entry.getLong(0)); assertEquals(eid + entries - 2, entry.getLong(8)); - entry = readAheadManager.readEntryOrWait(lid, eid + entries - 1); // hit + entry = readAheadManager.readEntry(lid, eid + entries - 1); // hit assertEquals(lid, entry.getLong(0)); assertEquals(eid + entries - 1, entry.getLong(8)); } @@ -290,7 +290,7 @@ public void testReadLastEntry() throws Exception { entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats()); assertNull(cache.get(lid, eid)); - entry = readAheadManager.readEntryOrWait(lid, eid); + entry = readAheadManager.readEntry(lid, eid); assertEquals(lid, entry.getLong(0)); assertEquals(eid, entry.getLong(8)); } @@ -323,7 +323,8 @@ public void testCanSkipCachedEntriesWhenReadingAhead() throws Exception { final long maxEntries = 100; ServerConfiguration conf = new ServerConfiguration(); - conf.setProperty(ReadAheadManager.READ_AHEAD_MAX_MESSAGES, maxEntries); + conf.setProperty(ReadAheadManager.ENABLE_READ_AHEAD_ASYNC, true); + conf.setProperty(ReadAheadManager.READ_AHEAD_MAX_ENTRIES, maxEntries); conf.setProperty(ReadAheadManager.READ_AHEAD_TASK_POOL_SIZE, 1); ReadAheadManager readAheadManager = new ReadAheadManager( entryLogger, entryLocationIndex, cache, singleDirStorage.getDbLedgerStorageStats(), conf); @@ -335,17 +336,17 @@ public void testCanSkipCachedEntriesWhenReadingAhead() throws Exception { long location = firstEntry.getRight(); // set init pos - readAheadManager.readEntryOrWait(lid, eid); // miss + readAheadManager.readEntry(lid, eid); // miss // read some entries int skipEntries = 5; for (int i = 0; i < 5; i++) { // do not read sequentially - readAheadManager.readEntryOrWait(lid, eid + maxEntries - i); + readAheadManager.readEntry(lid, eid + maxEntries - i); } // start reading ahead - ByteBuf entry = readAheadManager.readEntryOrWait(lid, eid + 1); + ByteBuf entry = readAheadManager.readEntry(lid, eid + 1); assertEquals(lid, entry.getLong(0)); assertEquals(eid + 1, entry.getLong(8));