Skip to content

Commit

Permalink
Skipping local storage update when controller is not the leader
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Sep 25, 2024
1 parent d75ba90 commit 64454aa
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 31 deletions.
53 changes: 31 additions & 22 deletions drove-controller/configs/config-dr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,17 @@ server:
port: 10011
applicationContextPath: /
requestLog:
appenders:
- type: console
timeZone: IST
gzip:
syncFlush: true
appenders: []
# appenders:
# - type: console
# timeZone: IST

options:
staleCheckInterval: 1m
staleAppAge: 7d
staleInstanceAge: 1m
staleTaskAge: 1d

logging:
level: INFO
loggers:
com.phonepe.drove: DEBUG
com.phonepe.drove.controller.managed.DroveEventLogger: INFO

appenders:
- type: console
Expand All @@ -49,12 +44,13 @@ clusterAuth:

# By default user auth is turned off
# Uncomment the following to enable olympus
#olympusIM:
# olympusIM:
# httpConfig:
# clientId: olympus
# usingZookeeper: false
# host: olympus-im.traefik.stg.phonepe.nb6
# port: 80
# host: olympus-im-stage.phonepe.com
# port: 443
# secure: true
# serviceName: olympusIM
# environment: stage
# connections: 10
Expand All @@ -66,18 +62,23 @@ clusterAuth:
# clientKey: 4ca1c0ad-2995-4c10-9e48-c41ab5427f30
# publicEndpoint: http://localhost:10000
# authEndpoint: https://olympus-im-stage.phonepe.com
# resourcePrefix: /apis

# Uncomment the following to enable basic auth. At one point either basic or olympus will work
#userAuth:
# enabled: true
# users:
# - username: admin
# password: admin
# role: EXTERNAL_READ_WRITE
# - username: guest
# password: guest
# role: EXTERNAL_READ_ONLY
# enabled: true
# encoding: CRYPT
# users:
# - username: admin
## password: admin
# password: "$2a$12$7ADQPo.ahYCksfRggvn6cut/BpwjahoNkvwPARIjCSbIvPdyTbEEK"
# role: EXTERNAL_READ_WRITE
# - username: guest
# password: "$2a$12$YHtK/Rc52bjEmVOF7Tz6jeKZ62kFZP98jJF0Ml9Cuwv4lW8w9ZbNm"
## password: guest
# role: EXTERNAL_READ_ONLY
# - username: noread
# password: "$2a$12$HO2EbvWNs.WOd9Csv3xtOuPGN.cEl/Co16oeL6oyk5stgKExXS6SO"
## password: noread

instanceAuth:
secret: RandomSecret
Expand All @@ -88,3 +89,11 @@ options:
staleAppAge: 2d
staleInstanceAge: 1d
staleTaskAge: 1d
clusterOpParallelism: 4
allowedMountDirs:
- /tmp
- /home
# Turn the following flag off or remove it to enable read access check
# This is useful only if some kind of auth is enabled
disableReadAuth: true
enableRawDeviceAccess: true
10 changes: 4 additions & 6 deletions drove-controller/configs/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ server:
port: 10001
applicationContextPath: /
requestLog:
# appenders: []
appenders:
- type: console
timeZone: IST
gzip:
syncFlush: true
appenders: []
# appenders:
# - type: console
# timeZone: IST

logging:
level: INFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ExecutorObserver implements Managed {
private final CuratorFramework curatorFramework;
private final ObjectMapper mapper;
private final StateUpdater updater;
private final LeadershipEnsurer leadershipEnsurer;
private final DroveEventBus eventBus;
private final Lock refreshLock = new ReentrantLock();
private final ScheduledSignal dataRefresher = new ScheduledSignal(Duration.ofSeconds(10));
Expand All @@ -63,17 +64,25 @@ public ExecutorObserver(
CuratorFramework curatorFramework,
ObjectMapper mapper,
StateUpdater updater,
LeadershipEnsurer leadershipEnsurer,
DroveEventBus eventBus) {
this.curatorFramework = curatorFramework;
this.mapper = mapper;
this.updater = updater;
this.leadershipEnsurer = leadershipEnsurer;
this.eventBus = eventBus;
}

@Override
public void start() throws Exception {
refreshDataFromZK(new Date());
dataRefresher.connect(this::refreshDataFromZK);
leadershipEnsurer.onLeadershipStateChanged()
.connect(leader -> {
if (Boolean.TRUE.equals(leader)) {
log.info("Became leader, doing an emergency update to rebuild cluster metadata");
refreshDataFromZK(new Date());
}
});
}

@Override
Expand All @@ -84,6 +93,10 @@ public void stop() throws Exception {
}

private void refreshDataFromZK(final Date currentDate) {
if (!leadershipEnsurer.isLeader()) {
log.info("Skipping data refresh from zk as i'm not the leader");
return;
}
if (refreshLock.tryLock()) {
try {
val currentExecutors = fetchNodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.phonepe.drove.controller.engine.StateUpdater;
import com.phonepe.drove.controller.event.DroveEventBus;
import com.phonepe.drove.models.info.nodedata.ExecutorNodeData;
import io.appform.signals.signals.ConsumingSyncSignal;
import lombok.SneakyThrows;
import lombok.val;
import org.apache.curator.test.TestingCluster;
Expand Down Expand Up @@ -82,9 +83,13 @@ void testObserver() {
removedIds.addAll(invocationOnMock.getArgument(0));
return null;
}).when(updater).remove(anyCollection());

val obs = new ExecutorObserver(curator, MAPPER, updater, eventBus);
val le = mock(LeadershipEnsurer.class);
when(le.isLeader()).thenReturn(true);
val s = new ConsumingSyncSignal<Boolean>();
when(le.onLeadershipStateChanged()).thenReturn(s);
val obs = new ExecutorObserver(curator, MAPPER, updater, le, eventBus);
obs.start();
s.dispatch(true);
waitForTest(testExecuted, 1);
assertFalse(ids.isEmpty());
assertEquals(100, ids.size());
Expand Down

0 comments on commit 64454aa

Please sign in to comment.