diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index ed37159ee19..290e22d115d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -1932,6 +1932,170 @@ public void testRemoveBookieFromCluster() { repp.onClusterChanged(addrs, new HashSet()); } + @Test + public void testLoadWeightedPlacementAndNewEnsemble() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r1"); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + // Update cluster + Set addrs = new HashSet<>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + + + conf.setLoadWeightBasedPlacementEnabled(true); + conf.setLowLoadBookieRatio(0.5); + conf.setLoadThreshold(70); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + repp.onClusterChanged(addrs, new HashSet<>()); + Map bookieInfoMap = new HashMap<>(); + bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L, 75, 60, 60, 1024 * 1024)); + repp.updateBookieInfo(bookieInfoMap); + + + // low-load-bookie's number <= all-selected-bookie's number * ratio + // fallback to random select in LoadWeightedSelectionImpl + // so addr4 still can be selected + Map selectionCounts = new HashMap<>(); + for (BookieId b : addrs) { + selectionCounts.put(b, 0L); + } + int numTries = 50000; + Set excludeList = new HashSet<>(); + EnsemblePlacementPolicy.PlacementResult> ensembleResponse; + List ensemble; + int ensembleSize = 3; + int writeQuorumSize = 2; + int acqQuorumSize = 2; + for (int i = 0; i < numTries; i++) { + ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList); + ensemble = ensembleResponse.getResult(); + assertTrue( + "Rackaware selection not happening " + + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver), + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver) >= 2); + for (BookieId b : ensemble) { + selectionCounts.put(b, selectionCounts.get(b) + 1); + } + } + assertTrue(selectionCounts.get(addr3.toBookieId()) != 0); + assertTrue(selectionCounts.get(addr4.toBookieId()) != 0); + + + // add addr5, then enough bookie in the same rack + // so LoadWeightedSelectionImpl would exclude addr4 which is high load + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + repp.handleBookiesThatJoined(Collections.singleton(addr5.toBookieId())); + bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + repp.updateBookieInfo(bookieInfoMap); + + addrs.add(addr5.toBookieId()); + selectionCounts = new HashMap<>(); + for (BookieId b : addrs) { + selectionCounts.put(b, 0L); + } + for (int i = 0; i < numTries; i++) { + ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList); + ensemble = ensembleResponse.getResult(); + assertTrue( + "Rackaware selection not happening " + + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver), + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver) >= 2); + for (BookieId b : ensemble) { + selectionCounts.put(b, selectionCounts.get(b) + 1); + } + } + assertTrue(selectionCounts.get(addr3.toBookieId()) != 0); + assertTrue(selectionCounts.get(addr4.toBookieId()) == 0); + assertTrue(selectionCounts.get(addr5.toBookieId()) != 0); + } + + @Test + public void testLoadWeightedPlacementAndReplaceBookie() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r1"); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_REGION + "/r3"); + // Update cluster + Set addrs = new HashSet<>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + + conf.setLoadWeightBasedPlacementEnabled(true); + conf.setLowLoadBookieRatio(0.5); + conf.setLoadThreshold(70); + repp.initialize(conf, Optional.empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + repp.onClusterChanged(addrs, new HashSet<>()); + Map bookieInfoMap = new HashMap<>(); + bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024)); + bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L, 75, 60, 60, 1024 * 1024)); + repp.updateBookieInfo(bookieInfoMap); + + // although addr2 is in excludedBookies + // since it can be added to LoadWeightedSelectionImpl + // low load bookie's number is enough and addr4 would be excluded + Map selectionCounts = new HashMap<>(); + for (BookieId b : addrs) { + selectionCounts.put(b, 0L); + } + int numTries = 50000; + EnsemblePlacementPolicy.PlacementResult replaceBookieResponse; + PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy; + BookieId replacedBookie; + for (int i = 0; i < numTries; i++) { + // replace node under r3 + replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), + addr2.toBookieId(), new HashSet<>()); + replacedBookie = replaceBookieResponse.getResult(); + isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); + assertTrue("replaced : " + replacedBookie, addr3.toBookieId().equals(replacedBookie)); + assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy); + selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1); + } + assertTrue(selectionCounts.get(addr3.toBookieId()) != 0); + assertTrue(selectionCounts.get(addr4.toBookieId()) == 0); + } + @Test public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);