Skip to content

Commit

Permalink
fix regionAwarePolicy can not update rackInfo between bookie left and…
Browse files Browse the repository at this point in the history
… join
  • Loading branch information
fanjianye committed Nov 1, 2023
1 parent 3221aa3 commit 765b7a7
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
topology.add(node);
knownBookies.put(addr, node);
historyBookies.put(addr, node);
String region = getLocalRegion(node);
String region = parseBookieRegion(addr);
address2Region.put(addr, region);
if (null == perRegionPlacement.get(region)) {
perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
.initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -58,6 +59,7 @@
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1930,4 +1932,108 @@ public void testNotifyRackChangeWithNewRegion() throws Exception {
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
}

public void testNotifyRackChangeBetweenBookieLeftAndJoin() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);

// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-3");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-4");

// Update cluster
Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
repp.onClusterChanged(addrs, new HashSet<>());

assertEquals(4, repp.knownBookies.size());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-3", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-4", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(2, repp.perRegionPlacement.size());
TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(2, region1Placement.knownBookies.keySet().size());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());

TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(2, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-3", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-4", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));

// addr1 bookie shutdown and left
addrs.remove(addr1.toBookieId());
repp.onClusterChanged(addrs, new HashSet<>());

assertEquals(3, repp.knownBookies.size());
assertNull(repp.knownBookies.get(addr1.toBookieId()));
assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-3", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-4", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(2, repp.perRegionPlacement.size());
region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(1, region1Placement.knownBookies.keySet().size());
assertNull(region1Placement.knownBookies.get(addr1.toBookieId()));
assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());

region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(2, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-3", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-4", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));

// Update the rack.
// change addr1 rack info. /region1/rack-1 -> /region3/rack-1.
List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
List<String> rackList = new ArrayList<>();
bookieAddressList.add(addr1);
rackList.add("/region3/rack-1");
StaticDNSResolver.changeRack(bookieAddressList, rackList);

// addr1 bookie start and join
addrs.add(addr1.toBookieId());
repp.onClusterChanged(addrs, new HashSet<>());

assertEquals(4, repp.knownBookies.size());
assertEquals("/region3/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-3", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-4", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(3, repp.perRegionPlacement.size());
region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(1, region1Placement.knownBookies.keySet().size());
assertNull(region1Placement.knownBookies.get(addr1.toBookieId()));
assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());

region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(2, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-3", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-4", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

TopologyAwareEnsemblePlacementPolicy region3Placement = repp.perRegionPlacement.get("region3");
assertEquals(1, region3Placement.knownBookies.keySet().size());
assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());

assertEquals("region3", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
}
}

0 comments on commit 765b7a7

Please sign in to comment.