From ecd4eb22676d23895ca738c638791d5ce658f2e1 Mon Sep 17 00:00:00 2001 From: lushiji Date: Fri, 16 Sep 2022 16:42:46 +0800 Subject: [PATCH 1/3] SortedLedgerStorage supports multiple dirs --- .../bookie/InterleavedLedgerStorage.java | 9 +- .../bookie/SortedLedgerStorage.java | 205 ++++++++++++------ .../bookkeeper/bookie/LedgerCacheTest.java | 19 +- .../bookie/SlowSortedLedgerStorage.java | 6 +- .../SortedLedgerStorageCheckpointTest.java | 49 +++-- 5 files changed, 184 insertions(+), 104 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 6954f65d0a7..b6470f2d5ed 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -120,6 +120,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry private OpStatsLogger getEntryStats; private OpStatsLogger pageScanStats; private Counter retryCounter; + private StateManager stateManager; public InterleavedLedgerStorage() { activeLedgers = new SnapshotMap<>(); @@ -161,7 +162,13 @@ void initializeWithEntryLogListener(ServerConfiguration conf, } @Override - public void setStateManager(StateManager stateManager) {} + public void setStateManager(StateManager stateManager) { + this.stateManager = stateManager; + } + + public StateManager getStateManager() { + return stateManager; + } @Override public void setCheckpointSource(CheckpointSource checkpointSource) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index a7080ba4f3e..26476147058 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -21,11 +21,14 @@ package org.apache.bookkeeper.bookie; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Optional; @@ -34,12 +37,16 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.common.util.MathUtils; +import org.apache.bookkeeper.common.util.ReflectionUtils; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.IteratorUtility; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,18 +63,22 @@ public class SortedLedgerStorage EntryMemTable memTable; private ScheduledExecutorService scheduler; - private StateManager stateManager; private ServerConfiguration conf; private StatsLogger statsLogger; - private final InterleavedLedgerStorage interleavedLedgerStorage; + protected final List interleavedLedgerStorageList; + private int numberOfDirs; + private String interleavedLedgerStorageClazz = InterleavedLedgerStorage.class.getName(); public SortedLedgerStorage() { - this(new InterleavedLedgerStorage()); + this(null); } @VisibleForTesting - protected SortedLedgerStorage(InterleavedLedgerStorage ils) { - interleavedLedgerStorage = ils; + protected SortedLedgerStorage(String interleavedLedgerStorageClazz) { + interleavedLedgerStorageList = Lists.newArrayList(); + if (StringUtils.isNotBlank(interleavedLedgerStorageClazz)) { + this.interleavedLedgerStorageClazz = interleavedLedgerStorageClazz; + } } @Override @@ -80,17 +91,27 @@ public void initialize(ServerConfiguration conf, throws IOException { this.conf = conf; this.statsLogger = statsLogger; - - interleavedLedgerStorage.initializeWithEntryLogListener( - conf, - ledgerManager, - ledgerDirsManager, - indexDirsManager, - // uses sorted ledger storage's own entry log listener - // since it manages entry log rotations and checkpoints. - this, - statsLogger, - allocator); + this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size(); + + for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) { + LedgerDirsManager ldm = BookieResources.createLedgerDirsManager( + conf, ledgerDirsManager.getDiskChecker(), NullStatsLogger.INSTANCE); + LedgerDirsManager idm = BookieResources.createIndexDirsManager( + conf, indexDirsManager.getDiskChecker(), NullStatsLogger.INSTANCE, ledgerDirsManager); + + InterleavedLedgerStorage interleavedLedgerStorage = getInterleavedLedgerStorage(); + interleavedLedgerStorage.initializeWithEntryLogListener( + conf, + ledgerManager, + ldm, + idm, + // uses sorted ledger storage's own entry log listener + // since it manages entry log rotations and checkpoints. + this, + statsLogger, + allocator); + interleavedLedgerStorageList.add(interleavedLedgerStorage); + } this.scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -98,14 +119,28 @@ public void initialize(ServerConfiguration conf, .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build()); } + private InterleavedLedgerStorage getInterleavedLedgerStorage() { + if (InterleavedLedgerStorage.class.getName().equals(this.interleavedLedgerStorageClazz)) { + return new InterleavedLedgerStorage(); + } else { + try { + return ReflectionUtils.newInstance(this.interleavedLedgerStorageClazz, InterleavedLedgerStorage.class); + } catch (Throwable t) { + throw new RuntimeException("Failed to instantiate InterleavedLedgerStorage class : " + + this.interleavedLedgerStorageClazz, t); + } + } + } + + @Override public void setStateManager(StateManager stateManager) { - interleavedLedgerStorage.setStateManager(stateManager); - this.stateManager = stateManager; + interleavedLedgerStorageList.forEach(s -> s.setStateManager(stateManager)); } + @Override public void setCheckpointSource(CheckpointSource checkpointSource) { - interleavedLedgerStorage.setCheckpointSource(checkpointSource); + interleavedLedgerStorageList.forEach(s -> s.setCheckpointSource(checkpointSource)); if (conf.isEntryLogPerLedgerEnabled()) { this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger); @@ -115,7 +150,7 @@ public void setCheckpointSource(CheckpointSource checkpointSource) { } @Override public void setCheckpointer(Checkpointer checkpointer) { - interleavedLedgerStorage.setCheckpointer(checkpointer); + interleavedLedgerStorageList.forEach(s -> s.setCheckpointer(checkpointer)); } @VisibleForTesting @@ -130,7 +165,7 @@ public void start() { } catch (IOException e) { LOG.error("Exception thrown while flushing ledger cache.", e); } - interleavedLedgerStorage.start(); + interleavedLedgerStorageList.forEach(s -> s.start()); } @Override @@ -145,17 +180,23 @@ public void shutdown() throws InterruptedException { } catch (Exception e) { LOG.error("Error while closing the memtable", e); } - interleavedLedgerStorage.shutdown(); + for (LedgerStorage ls : interleavedLedgerStorageList) { + ls.shutdown(); + } + } + + private InterleavedLedgerStorage getLedgerStorage(long ledgerId) { + return interleavedLedgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs)); } @Override public boolean ledgerExists(long ledgerId) throws IOException { // Done this way because checking the skip list is an O(logN) operation compared to // the O(1) for the ledgerCache. - if (!interleavedLedgerStorage.ledgerExists(ledgerId)) { + if (!getLedgerStorage(ledgerId).ledgerExists(ledgerId)) { EntryKeyValue kv = memTable.getLastEntry(ledgerId); if (null == kv) { - return interleavedLedgerStorage.ledgerExists(ledgerId); + return getLedgerStorage(ledgerId).ledgerExists(ledgerId); } } return true; @@ -169,22 +210,22 @@ public boolean entryExists(long ledgerId, long entryId) throws IOException { @Override public boolean setFenced(long ledgerId) throws IOException { - return interleavedLedgerStorage.setFenced(ledgerId); + return getLedgerStorage(ledgerId).setFenced(ledgerId); } @Override public boolean isFenced(long ledgerId) throws IOException { - return interleavedLedgerStorage.isFenced(ledgerId); + return getLedgerStorage(ledgerId).isFenced(ledgerId); } @Override public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { - interleavedLedgerStorage.setMasterKey(ledgerId, masterKey); + getLedgerStorage(ledgerId).setMasterKey(ledgerId, masterKey); } @Override public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { - return interleavedLedgerStorage.readMasterKey(ledgerId); + return getLedgerStorage(ledgerId).readMasterKey(ledgerId); } @Override @@ -194,7 +235,7 @@ public long addEntry(ByteBuf entry) throws IOException { long lac = entry.getLong(entry.readerIndex() + 16); memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this); - interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac); + getLedgerStorage(ledgerId).ledgerCache.updateLastAddConfirmed(ledgerId, lac); return entryId; } @@ -209,7 +250,7 @@ private ByteBuf getLastEntryId(long ledgerId) throws IOException { return kv.getValueAsByteBuffer(); } // If it doesn't exist in the skip list, then fallback to the ledger cache+index. - return interleavedLedgerStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); + return getLedgerStorage(ledgerId).getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); } @Override @@ -219,13 +260,13 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE } ByteBuf buffToRet; try { - buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId); + buffToRet = getLedgerStorage(ledgerId).getEntry(ledgerId, entryId); } catch (Bookie.NoEntryException nee) { EntryKeyValue kv = memTable.getEntry(ledgerId, entryId); if (null == kv) { // The entry might have been flushed since we last checked, so query the ledger cache again. // If the entry truly doesn't exist, then this will throw a NoEntryException - buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId); + buffToRet = getLedgerStorage(ledgerId).getEntry(ledgerId, entryId); } else { buffToRet = kv.getValueAsByteBuffer(); } @@ -236,7 +277,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE @Override public long getLastAddConfirmed(long ledgerId) throws IOException { - return interleavedLedgerStorage.getLastAddConfirmed(ledgerId); + return getLedgerStorage(ledgerId).getLastAddConfirmed(ledgerId); } @Override @@ -244,53 +285,57 @@ public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC, Watcher watcher) throws IOException { - return interleavedLedgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher); + return getLedgerStorage(ledgerId).waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher); } @Override public void cancelWaitForLastAddConfirmedUpdate(long ledgerId, Watcher watcher) throws IOException { - interleavedLedgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher); + getLedgerStorage(ledgerId).cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher); } @Override public void checkpoint(final Checkpoint checkpoint) throws IOException { long numBytesFlushed = memTable.flush(this, checkpoint); - interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed); - interleavedLedgerStorage.checkpoint(checkpoint); + for (InterleavedLedgerStorage s : interleavedLedgerStorageList) { + s.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed); + s.checkpoint(checkpoint); + } } @Override public void deleteLedger(long ledgerId) throws IOException { - interleavedLedgerStorage.deleteLedger(ledgerId); + getLedgerStorage(ledgerId).deleteLedger(ledgerId); } @Override public void registerLedgerDeletionListener(LedgerDeletionListener listener) { - interleavedLedgerStorage.registerLedgerDeletionListener(listener); + interleavedLedgerStorageList.forEach(s -> s.registerLedgerDeletionListener(listener)); } @Override public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException { - interleavedLedgerStorage.setExplicitLac(ledgerId, lac); + getLedgerStorage(ledgerId).setExplicitLac(ledgerId, lac); } @Override public ByteBuf getExplicitLac(long ledgerId) { - return interleavedLedgerStorage.getExplicitLac(ledgerId); + return getLedgerStorage(ledgerId).getExplicitLac(ledgerId); } @Override public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOException { - interleavedLedgerStorage.processEntry(ledgerId, entryId, buffer, false); + getLedgerStorage(ledgerId).processEntry(ledgerId, entryId, buffer, false); } @Override public void flush() throws IOException { memTable.flush(this, Checkpoint.MAX); - interleavedLedgerStorage.flush(); + for (LedgerStorage ls : interleavedLedgerStorageList) { + ls.flush(); + } } // CacheCallback functions. @@ -310,16 +355,19 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { scheduler.execute(new Runnable() { @Override public void run() { - try { - LOG.info("Started flushing mem table."); - interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush(); - memTable.flush(SortedLedgerStorage.this); - if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) { - interleavedLedgerStorage.checkpointer.startCheckpoint(cp); + for (InterleavedLedgerStorage s : interleavedLedgerStorageList) { + try { + LOG.info("Started flushing mem table."); + s.getEntryLogger().prepareEntryMemTableFlush(); + memTable.flush(SortedLedgerStorage.this); + + if (s.getEntryLogger().commitEntryMemTableFlush()) { + s.checkpointer.startCheckpoint(cp); + } + } catch (Exception e) { + s.getStateManager().transitionToReadOnlyMode(); + LOG.error("Exception thrown while flushing skip list cache.", e); } - } catch (Exception e) { - stateManager.transitionToReadOnlyMode(); - LOG.error("Exception thrown while flushing skip list cache.", e); } } }); @@ -333,63 +381,84 @@ public void onRotateEntryLog() { // flushed to the entry log file. } - BookieStateManager getStateManager(){ - return (BookieStateManager) stateManager; - } - - public DefaultEntryLogger getEntryLogger() { - return interleavedLedgerStorage.getEntryLogger(); + public List getEntryLogger() { + List listIt = new ArrayList<>(numberOfDirs); + for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) { + listIt.add(ls.getEntryLogger()); + } + return listIt; } @Override public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException { - return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId, lastLedgerId); + List> listIt = new ArrayList<>(numberOfDirs); + for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) { + listIt.add(ls.getActiveLedgersInRange(firstLedgerId, lastLedgerId)); + } + return Iterables.concat(listIt); } @Override public void updateEntriesLocations(Iterable locations) throws IOException { - interleavedLedgerStorage.updateEntriesLocations(locations); + for (InterleavedLedgerStorage s : interleavedLedgerStorageList) { + s.updateEntriesLocations(locations); + } } @Override public void flushEntriesLocationsIndex() throws IOException { - interleavedLedgerStorage.flushEntriesLocationsIndex(); + for (InterleavedLedgerStorage s : interleavedLedgerStorageList) { + s.flushEntriesLocationsIndex(); + } } @Override public LedgerStorage getUnderlyingLedgerStorage() { - return interleavedLedgerStorage; + return interleavedLedgerStorageList.get(0); } @Override public void forceGC() { - interleavedLedgerStorage.forceGC(); + interleavedLedgerStorageList.forEach(s -> s.forceGC()); } @Override public void forceGC(Boolean forceMajor, Boolean forceMinor) { - interleavedLedgerStorage.forceGC(forceMajor, forceMinor); + interleavedLedgerStorageList.forEach(s -> s.forceGC(forceMajor, forceMinor)); } @Override public List localConsistencyCheck(Optional rateLimiter) throws IOException { - return interleavedLedgerStorage.localConsistencyCheck(rateLimiter); + List listIt = new ArrayList<>(numberOfDirs); + for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) { + listIt.addAll(ls.localConsistencyCheck(rateLimiter)); + } + return listIt; } @Override public boolean isInForceGC() { - return interleavedLedgerStorage.isInForceGC(); + for (InterleavedLedgerStorage s : interleavedLedgerStorageList) { + if (!s.isInForceGC()){ + return false; + } + } + return true; } @Override public List getGarbageCollectionStatus() { - return interleavedLedgerStorage.getGarbageCollectionStatus(); + List listIt = new ArrayList<>(numberOfDirs); + for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) { + listIt.addAll(ls.getGarbageCollectionStatus()); + } + return listIt; } @Override public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException { PrimitiveIterator.OfLong entriesInMemtableItr = memTable.getListOfEntriesOfLedger(ledgerId); - PrimitiveIterator.OfLong entriesFromILSItr = interleavedLedgerStorage.getListOfEntriesOfLedger(ledgerId); + PrimitiveIterator.OfLong entriesFromILSItr = getLedgerStorage(ledgerId).getListOfEntriesOfLedger(ledgerId); return IteratorUtility.mergePrimitiveLongIterator(entriesInMemtableItr, entriesFromILSItr); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index 85c3bca8d7c..f022f4e453f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -706,15 +706,16 @@ public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOExcept public void onSizeLimitReached(final CheckpointSource.Checkpoint cp) throws IOException { LOG.info("Reached size {}", cp); // use synchronous way - try { - LOG.info("Started flushing mem table."); - memTable.flush(FlushTestSortedLedgerStorage.this); - } catch (IOException e) { - getStateManager().doTransitionToReadOnlyMode(); - LOG.error("Exception thrown while flushing skip list cache.", e); - } - } - + for (InterleavedLedgerStorage s : this.interleavedLedgerStorageList) { + try { + LOG.info("Started flushing mem table."); + memTable.flush(FlushTestSortedLedgerStorage.this); + } catch (IOException e) { + s.getStateManager().transitionToReadOnlyMode(); + LOG.error("Exception thrown while flushing skip list cache.", e); + } + } + } } @Test diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java index e12976d04e3..0528b9b6911 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java @@ -27,10 +27,10 @@ public class SlowSortedLedgerStorage extends SortedLedgerStorage { public SlowSortedLedgerStorage() { - this(new SlowInterleavedLedgerStorage()); + this(SlowInterleavedLedgerStorage.class.getName()); } - SlowSortedLedgerStorage(InterleavedLedgerStorage ils) { - super(ils); + SlowSortedLedgerStorage(String interleavedLedgerStorageClazz) { + super(interleavedLedgerStorageClazz); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java index 42cdb943eee..59431e1a351 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java @@ -28,6 +28,7 @@ import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.IOException; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -223,29 +224,31 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { }); // simulate entry log is rotated (due to compaction) - DefaultEntryLogger elogger = storage.getEntryLogger(); - EntryLogManagerForSingleEntryLog entryLogManager = - (EntryLogManagerForSingleEntryLog) elogger.getEntryLogManager(); - entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); - long currentLogId = entryLogManager.getCurrentLogId(); - - readyLatch.countDown(); - assertNull(checkpoints.poll()); - assertEquals(new TestCheckpoint(0), storage.memTable.kvmap.cp); - assertEquals(20, storage.memTable.kvmap.size()); - - // trigger a memtable flush - Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot()); - storage.onSizeLimitReached(checkpointSrc.newCheckpoint()); - assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); - - // all the entries are flushed out - assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp); - assertEquals(0, storage.memTable.kvmap.size()); - assertTrue( - "current log " + currentLogId + " contains entries added from memtable should be forced to disk" - + " but flushed logs are " + elogger.getFlushedLogIds(), - elogger.getFlushedLogIds().contains(currentLogId)); + List eloggers = storage.getEntryLogger(); + for (DefaultEntryLogger elogger : eloggers) { + EntryLogManagerForSingleEntryLog entryLogManager = + (EntryLogManagerForSingleEntryLog) elogger.getEntryLogManager(); + entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID); + long currentLogId = entryLogManager.getCurrentLogId(); + + readyLatch.countDown(); + assertNull(checkpoints.poll()); + assertEquals(new TestCheckpoint(0), storage.memTable.kvmap.cp); + assertEquals(20, storage.memTable.kvmap.size()); + + // trigger a memtable flush + Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot()); + storage.onSizeLimitReached(checkpointSrc.newCheckpoint()); + assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + + // all the entries are flushed out + assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp); + assertEquals(0, storage.memTable.kvmap.size()); + assertTrue( + "current log " + currentLogId + " contains entries added from memtable should be forced to disk" + + " but flushed logs are " + elogger.getFlushedLogIds(), + elogger.getFlushedLogIds().contains(currentLogId)); + } } } From 3e01e4d0a0d603997951bc39288e4231041524c3 Mon Sep 17 00:00:00 2001 From: lushiji Date: Mon, 19 Sep 2022 15:13:41 +0800 Subject: [PATCH 2/3] 1.exception handling optimization 2.dir info print --- .../bookkeeper/bookie/LedgerStorage.java | 5 +- .../bookie/SortedLedgerStorage.java | 56 ++++++++++++++++--- .../bookie/DefaultEntryLogTest.java | 4 +- .../bookkeeper/bookie/LedgerCacheTest.java | 11 ++-- .../bookkeeper/bookie/MockLedgerStorage.java | 2 +- .../SortedLedgerStorageCheckpointTest.java | 2 +- 6 files changed, 60 insertions(+), 20 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 372cd926c02..534f4544f20 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.bookie; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -217,8 +218,8 @@ interface LedgerDeletionListener { ByteBuf getExplicitLac(long ledgerId) throws IOException, BookieException; // for testability - default LedgerStorage getUnderlyingLedgerStorage() { - return this; + default List getUnderlyingLedgerStorage() { + return Lists.newArrayList((E)this); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 26476147058..ad142d40c12 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -46,6 +46,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.IteratorUtility; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,7 +166,13 @@ public void start() { } catch (IOException e) { LOG.error("Exception thrown while flushing ledger cache.", e); } - interleavedLedgerStorageList.forEach(s -> s.start()); + interleavedLedgerStorageList.forEach(s -> { + try { + s.start(); + } catch (Throwable e) { + LOG.error("ledgerStorage({}) start has error.", getEntryLogDirPath(s.getEntryLogger()), e); + } + }); } @Override @@ -311,7 +318,14 @@ public void deleteLedger(long ledgerId) throws IOException { @Override public void registerLedgerDeletionListener(LedgerDeletionListener listener) { - interleavedLedgerStorageList.forEach(s -> s.registerLedgerDeletionListener(listener)); + interleavedLedgerStorageList.forEach(s -> { + try { + s.registerLedgerDeletionListener(listener); + } catch (Throwable e) { + LOG.error("ledgerStorage({}) registerLedgerDeletionListener has error.", + getEntryLogDirPath(s.getEntryLogger()), e); + } + }); } @Override @@ -338,6 +352,16 @@ public void flush() throws IOException { } } + private String getEntryLogDirPath(DefaultEntryLogger entryLog) { + if (entryLog == null + || entryLog.getLedgerDirsManager() == null + || CollectionUtils.isEmpty(entryLog.getLedgerDirsManager().getAllLedgerDirs())) { + return null; + } + // entryLog and ledger's directory one-to-one + return entryLog.getLedgerDirsManager().getAllLedgerDirs().get(0).getName(); + } + // CacheCallback functions. @Override public void onSizeLimitReached(final Checkpoint cp) throws IOException { @@ -357,7 +381,8 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException { public void run() { for (InterleavedLedgerStorage s : interleavedLedgerStorageList) { try { - LOG.info("Started flushing mem table."); + LOG.info("ledgerStorage({}) started flushing mem table.", + getEntryLogDirPath(s.getEntryLogger())); s.getEntryLogger().prepareEntryMemTableFlush(); memTable.flush(SortedLedgerStorage.this); @@ -366,7 +391,8 @@ public void run() { } } catch (Exception e) { s.getStateManager().transitionToReadOnlyMode(); - LOG.error("Exception thrown while flushing skip list cache.", e); + LOG.error("ledgerStorage ({}) exception thrown while flushing skip list cache.", + getEntryLogDirPath(s.getEntryLogger()), e); } } } @@ -381,7 +407,7 @@ public void onRotateEntryLog() { // flushed to the entry log file. } - public List getEntryLogger() { + public List getDefaultEntryLoggers() { List listIt = new ArrayList<>(numberOfDirs); for (InterleavedLedgerStorage ls : interleavedLedgerStorageList) { listIt.add(ls.getEntryLogger()); @@ -413,18 +439,30 @@ public void flushEntriesLocationsIndex() throws IOException { } @Override - public LedgerStorage getUnderlyingLedgerStorage() { - return interleavedLedgerStorageList.get(0); + public List getUnderlyingLedgerStorage() { + return interleavedLedgerStorageList; } @Override public void forceGC() { - interleavedLedgerStorageList.forEach(s -> s.forceGC()); + interleavedLedgerStorageList.forEach(s -> { + try { + s.forceGC(); + } catch (Throwable e) { + LOG.error("ledgerStorage({}) force gc has error.", getEntryLogDirPath(s.getEntryLogger()), e); + } + }); } @Override public void forceGC(Boolean forceMajor, Boolean forceMinor) { - interleavedLedgerStorageList.forEach(s -> s.forceGC(forceMajor, forceMinor)); + interleavedLedgerStorageList.forEach(s -> { + try { + s.forceGC(forceMajor, forceMinor); + } catch (Throwable e) { + LOG.error("ledgerStorage({}) force gc(2 params) has error.", getEntryLogDirPath(s.getEntryLogger()), e); + } + }); } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index cc52b5a3187..3df7db2377e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -309,8 +309,8 @@ public void testAddEntryFailureOnDiskFull() throws Exception { BookieImpl bookie = new TestBookieImpl(conf); DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, bookie.getLedgerDirsManager()); - InterleavedLedgerStorage ledgerStorage = - ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()); + List ils = bookie.ledgerStorage.getUnderlyingLedgerStorage(); + InterleavedLedgerStorage ledgerStorage = ils.get(0); ledgerStorage.entryLogger = entryLogger; // Create ledgers ledgerStorage.setMasterKey(1, "key".getBytes()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index f022f4e453f..ee401564cc6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -88,7 +88,8 @@ public void setUp() throws Exception { bookie = new TestBookieImpl(conf); activeLedgers = new SnapshotMap(); - ledgerCache = ((InterleavedLedgerStorage) bookie.getLedgerStorage().getUnderlyingLedgerStorage()).ledgerCache; + List ils = bookie.getLedgerStorage().getUnderlyingLedgerStorage(); + ledgerCache = ils.get(0).ledgerCache; } @After @@ -115,8 +116,8 @@ private void newLedgerCache() throws IOException { if (ledgerCache != null) { ledgerCache.close(); } - ledgerCache = ((InterleavedLedgerStorage) bookie.getLedgerStorage().getUnderlyingLedgerStorage()) - .ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager()); + List ils = bookie.getLedgerStorage().getUnderlyingLedgerStorage(); + ledgerCache = ils.get(0).ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager()); flushThread = new Thread() { public void run() { while (true) { @@ -274,8 +275,8 @@ public void testLedgerCacheFlushFailureOnDiskFull() throws Exception { conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() }); BookieImpl bookie = new TestBookieImpl(conf); - InterleavedLedgerStorage ledgerStorage = - ((InterleavedLedgerStorage) bookie.getLedgerStorage().getUnderlyingLedgerStorage()); + List ils = bookie.getLedgerStorage().getUnderlyingLedgerStorage(); + InterleavedLedgerStorage ledgerStorage = ils.get(0); LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache; // Create ledger index file ledgerStorage.setMasterKey(1, "key".getBytes()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java index 5b49ca7706b..09df1dd67ac 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java @@ -261,7 +261,7 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException { } @Override - public LedgerStorage getUnderlyingLedgerStorage() { + public List getUnderlyingLedgerStorage() { return CompactableLedgerStorage.super.getUnderlyingLedgerStorage(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java index 59431e1a351..58a93eb06cd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java @@ -224,7 +224,7 @@ public void testCheckpointAfterEntryLogRotated() throws Exception { }); // simulate entry log is rotated (due to compaction) - List eloggers = storage.getEntryLogger(); + List eloggers = storage.getDefaultEntryLoggers(); for (DefaultEntryLogger elogger : eloggers) { EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) elogger.getEntryLogManager(); From c8e6a4f69ff4b6d6adc423d9135fdae5c6ba6404 Mon Sep 17 00:00:00 2001 From: lushiji Date: Mon, 19 Sep 2022 15:26:03 +0800 Subject: [PATCH 3/3] 3. update test method:getUnderlyingLedgerStorage --- .../java/org/apache/bookkeeper/bookie/LedgerStorage.java | 6 +++--- .../org/apache/bookkeeper/bookie/SortedLedgerStorage.java | 2 +- .../org/apache/bookkeeper/bookie/DefaultEntryLogTest.java | 2 +- .../java/org/apache/bookkeeper/bookie/LedgerCacheTest.java | 6 +++--- .../org/apache/bookkeeper/bookie/MockLedgerStorage.java | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 534f4544f20..926c71f0fb1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -217,9 +217,9 @@ interface LedgerDeletionListener { ByteBuf getExplicitLac(long ledgerId) throws IOException, BookieException; - // for testability - default List getUnderlyingLedgerStorage() { - return Lists.newArrayList((E)this); + // for InterleavedLedgerStorage's testability + default List getUnderlyingInterleavedLedgerStorage() { + return Lists.newArrayList((InterleavedLedgerStorage) this); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index ad142d40c12..403df6c3ac8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -439,7 +439,7 @@ public void flushEntriesLocationsIndex() throws IOException { } @Override - public List getUnderlyingLedgerStorage() { + public List getUnderlyingInterleavedLedgerStorage() { return interleavedLedgerStorageList; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index 3df7db2377e..646648b4d6c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -309,7 +309,7 @@ public void testAddEntryFailureOnDiskFull() throws Exception { BookieImpl bookie = new TestBookieImpl(conf); DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, bookie.getLedgerDirsManager()); - List ils = bookie.ledgerStorage.getUnderlyingLedgerStorage(); + List ils = bookie.ledgerStorage.getUnderlyingInterleavedLedgerStorage(); InterleavedLedgerStorage ledgerStorage = ils.get(0); ledgerStorage.entryLogger = entryLogger; // Create ledgers diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index ee401564cc6..02cf2202f02 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { bookie = new TestBookieImpl(conf); activeLedgers = new SnapshotMap(); - List ils = bookie.getLedgerStorage().getUnderlyingLedgerStorage(); + List ils = bookie.getLedgerStorage().getUnderlyingInterleavedLedgerStorage(); ledgerCache = ils.get(0).ledgerCache; } @@ -116,7 +116,7 @@ private void newLedgerCache() throws IOException { if (ledgerCache != null) { ledgerCache.close(); } - List ils = bookie.getLedgerStorage().getUnderlyingLedgerStorage(); + List ils = bookie.getLedgerStorage().getUnderlyingInterleavedLedgerStorage(); ledgerCache = ils.get(0).ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager()); flushThread = new Thread() { public void run() { @@ -275,7 +275,7 @@ public void testLedgerCacheFlushFailureOnDiskFull() throws Exception { conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() }); BookieImpl bookie = new TestBookieImpl(conf); - List ils = bookie.getLedgerStorage().getUnderlyingLedgerStorage(); + List ils = bookie.getLedgerStorage().getUnderlyingInterleavedLedgerStorage(); InterleavedLedgerStorage ledgerStorage = ils.get(0); LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache; // Create ledger index file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java index 09df1dd67ac..df635796508 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java @@ -261,8 +261,8 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException { } @Override - public List getUnderlyingLedgerStorage() { - return CompactableLedgerStorage.super.getUnderlyingLedgerStorage(); + public List getUnderlyingInterleavedLedgerStorage() { + return CompactableLedgerStorage.super.getUnderlyingInterleavedLedgerStorage(); } @Override