Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Auditor.checkAllLedgers should use Auditor's instance variables #1588

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
Expand All @@ -59,7 +58,6 @@
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
Expand All @@ -79,6 +77,7 @@
public class Auditor {
private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
private final ServerConfiguration conf;
private ZooKeeper zkc;
private BookKeeper bkc;
private BookKeeperAdmin admin;
private BookieLedgerIndexer bookieLedgerIndexer;
Expand Down Expand Up @@ -106,6 +105,7 @@ public Auditor(final String bookieIdentifier, ServerConfiguration conf,
this.conf = conf;
this.bookieIdentifier = bookieIdentifier;
this.statsLogger = statsLogger;
this.zkc = zkc;

numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
uRLPublishTimeForLostBookies = this.statsLogger
Expand All @@ -120,7 +120,7 @@ public Auditor(final String bookieIdentifier, ServerConfiguration conf,
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);

initialize(conf, zkc);
initialize(conf);

executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
Expand All @@ -132,7 +132,7 @@ public Thread newThread(Runnable r) {
});
}

private void initialize(ServerConfiguration conf, ZooKeeper zkc)
private void initialize(ServerConfiguration conf)
throws UnavailableException {
try {
ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
Expand Down Expand Up @@ -608,104 +608,89 @@ public void operationComplete(int rc, Set<LedgerFragment> fragments) {
* be run very often.
*/
void checkAllLedgers() throws BKAuditException, BKException,
IOException, InterruptedException, KeeperException {
ZooKeeper newzk = ZooKeeperClient.newBuilder()
.connectString(ZKMetadataDriverBase.resolveZkServers(conf))
.sessionTimeoutMs(conf.getZkTimeout())
.build();
IOException, InterruptedException, KeeperException {
final LedgerChecker checker = new LedgerChecker(bkc);

final BookKeeper client = new BookKeeper(new ClientConfiguration(conf),
newzk);
final BookKeeperAdmin admin = new BookKeeperAdmin(client, statsLogger);
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);

try {
final LedgerChecker checker = new LedgerChecker(client);

final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);

Processor<Long> checkLedgersProcessor = new Processor<Long>() {
@Override
public void process(final Long ledgerId,
final AsyncCallback.VoidCallback callback) {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
processDone.countDown();
return;
}
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
Processor<Long> checkLedgersProcessor = new Processor<Long>() {
@Override
public void process(final Long ledgerId,
final AsyncCallback.VoidCallback callback) {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
processDone.countDown();
return;
}
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
processDone.countDown();
return;
}

LedgerHandle lh = null;
try {
lh = admin.openLedgerNoRecovery(ledgerId);
checker.checkLedger(lh,
LedgerHandle lh = null;
try {
lh = admin.openLedgerNoRecovery(ledgerId);
checker.checkLedger(lh,
new ProcessLostFragmentsCb(lh, callback),
conf.getAuditorLedgerVerificationPercentage());
// we collect the following stats to get a measure of the
// distribution of a single ledger within the bk cluster
// the higher the number of fragments/bookies, the more distributed it is
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
numLedgersChecked.inc();
} catch (BKException.BKNoSuchLedgerExistsException bknsle) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger was deleted before we could check it", bknsle);
}
callback.processResult(BKException.Code.OK,
null, null);
return;
} catch (BKException bke) {
LOG.error("Couldn't open ledger " + ledgerId, bke);
callback.processResult(BKException.Code.BookieHandleNotAvailableException,
null, null);
return;
} catch (InterruptedException ie) {
LOG.error("Interrupted opening ledger", ie);
Thread.currentThread().interrupt();
callback.processResult(BKException.Code.InterruptedException, null, null);
return;
} finally {
if (lh != null) {
try {
lh.close();
} catch (BKException bke) {
LOG.warn("Couldn't close ledger " + ledgerId, bke);
} catch (InterruptedException ie) {
LOG.warn("Interrupted closing ledger " + ledgerId, ie);
Thread.currentThread().interrupt();
}
// we collect the following stats to get a measure of the
// distribution of a single ledger within the bk cluster
// the higher the number of fragments/bookies, the more distributed it is
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
numLedgersChecked.inc();
} catch (BKException.BKNoSuchLedgerExistsException bknsle) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger was deleted before we could check it", bknsle);
}
callback.processResult(BKException.Code.OK,
null, null);
return;
} catch (BKException bke) {
LOG.error("Couldn't open ledger " + ledgerId, bke);
callback.processResult(BKException.Code.BookieHandleNotAvailableException,
null, null);
return;
} catch (InterruptedException ie) {
LOG.error("Interrupted opening ledger", ie);
Thread.currentThread().interrupt();
callback.processResult(BKException.Code.InterruptedException, null, null);
return;
} finally {
if (lh != null) {
try {
lh.close();
} catch (BKException bke) {
LOG.warn("Couldn't close ledger " + ledgerId, bke);
} catch (InterruptedException ie) {
LOG.warn("Interrupted closing ledger " + ledgerId, ie);
Thread.currentThread().interrupt();
}
}
}
};

ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object obj) {
returnCode.set(rc);
processDone.countDown();
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
processDone.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BKAuditException(
"Exception while checking ledgers", e);
}
if (returnCode.get() != BKException.Code.OK) {
throw BKException.create(returnCode.get());
};

ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object obj) {
returnCode.set(rc);
processDone.countDown();
}
} finally {
admin.close();
client.close();
newzk.close();
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
processDone.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BKAuditException(
"Exception while checking ledgers", e);
}
if (returnCode.get() != BKException.Code.OK) {
throw BKException.create(returnCode.get());
}
}

Expand Down