Skip to content

Commit

Permalink
More fixes in the engine, correctly updating the schedule based on state
Browse files Browse the repository at this point in the history
  • Loading branch information
Tushar-Naik authored and santanusinha committed Oct 18, 2024
1 parent fde8fd3 commit 9f9963f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.util.Optional;
import java.util.function.Predicate;

import static com.phonepe.epoch.server.utils.EpochUtils.updateTopologySchedule;
import static com.phonepe.epoch.server.utils.EpochUtils.scheduleTopology;
import static com.phonepe.epoch.server.utils.EpochUtils.scheduleUpdatedTopology;
import static com.phonepe.epoch.server.utils.EpochUtils.removeScheduledTopology;
import static com.phonepe.epoch.server.utils.EpochUtils.topologyId;

@RequiredArgsConstructor(onConstructor_ = {@Inject})
Expand Down Expand Up @@ -70,7 +72,7 @@ public Optional<EpochTopologyDetails> createSimpleTopology(final SimpleTopologyC
}
val saved = topologyStore.save(topology);
saved.ifPresent(epochTopologyDetails -> {
updateTopologySchedule(epochTopologyDetails, scheduler);
scheduleTopology(epochTopologyDetails, scheduler);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_STATE_CHANGED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
Expand Down Expand Up @@ -104,21 +106,54 @@ public Optional<EpochTopologyDetails> updateTopology(@PathParam("topologyId") St
new EpochTaskTriggerCron(request.getCron()),
new MailNotificationSpec(List.of(request.getNotifyEmail().split(","))));

val previousTopology = topologyDetails.get();
val updated = topologyStore.update(topologyId, topology);
updated.ifPresent(epochTopologyDetails -> {
updateScheduleAndRaiseEvent(topologyId, epochTopologyDetails, previousTopology, scheduler, eventBus);
updated.ifPresent(updatedTopologyDetails -> {
scheduleUpdatedTopology(topologyDetails.get(), updatedTopologyDetails, scheduler);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_UPDATED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
StateChangeEventDataTag.NEW_STATE, updatedTopologyDetails.getState(),
StateChangeEventDataTag.NEW_TRIGGER, updatedTopologyDetails.getTopology().getTrigger()))
.build());

});
return updated;
}

public Optional<EpochTopologyDetails> save(final EpochTopology topology) {
val stored = topologyStore.save(topology);
stored.ifPresent(epochTopologyDetails -> updateTopologySchedule(epochTopologyDetails, scheduler));
stored.ifPresent(epochTopologyDetails -> {
scheduleTopology(epochTopologyDetails, scheduler);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_STATE_CHANGED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, epochTopologyDetails.getId(),
StateChangeEventDataTag.NEW_STATE, epochTopologyDetails.getState()))
.build());
});
return stored;
}

public Optional<EpochTopologyDetails> get(String topologyId) {
return topologyStore.get(topologyId);
}

public List<EpochTopologyDetails> list(Predicate<EpochTopologyDetails> filter) {
return topologyStore.list(filter);
}

public boolean delete(String topologyId) {
val deleted = topologyStore.delete(topologyId);
if (deleted) {
scheduler.delete(topologyId);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_STATE_CHANGED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
StateChangeEventDataTag.NEW_STATE, EpochTopologyState.DELETED))
.build());
}
return deleted;
}

public Optional<String> scheduleNow(String topologyId) {
return get(topologyId)
.flatMap(topology -> scheduler.scheduleNow(topologyId));
Expand All @@ -132,19 +167,17 @@ public Optional<EpochTopologyDetails> update(String topologyId, final EpochTopol
val previousTopology = topologyDetails.get();
val updated = topologyStore.update(topologyId, topology);
updated.ifPresent(updatedTopologyDetails -> {
updateScheduleAndRaiseEvent(topologyId, updatedTopologyDetails, previousTopology, scheduler, eventBus);
scheduleUpdatedTopology(previousTopology, updatedTopologyDetails, scheduler);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_UPDATED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
StateChangeEventDataTag.NEW_STATE, updatedTopologyDetails.getState(),
StateChangeEventDataTag.NEW_TRIGGER, updatedTopologyDetails.getTopology().getTrigger()))
.build());
});
return updated;
}

public Optional<EpochTopologyDetails> get(String topologyId) {
return topologyStore.get(topologyId);
}

public List<EpochTopologyDetails> list(Predicate<EpochTopologyDetails> filter) {
return topologyStore.list(filter);
}

public Optional<EpochTopologyDetails> updateState(String topologyId, EpochTopologyState state) {
val topologyDetails = get(topologyId);
if (topologyDetails.isEmpty()) {
Expand All @@ -153,39 +186,21 @@ public Optional<EpochTopologyDetails> updateState(String topologyId, EpochTopolo
val previousTopology = topologyDetails.get();
val updated = topologyStore.updateState(topologyId, state);
updated.ifPresent(updatedTopologyDetails -> {
updateScheduleAndRaiseEvent(topologyId, updatedTopologyDetails, previousTopology, scheduler, eventBus);
if (updatedTopologyDetails.getState() == EpochTopologyState.ACTIVE) {
scheduleTopology(updatedTopologyDetails, scheduler);
} else {
removeScheduledTopology(previousTopology, scheduler);
}
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_UPDATED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
StateChangeEventDataTag.NEW_STATE, updatedTopologyDetails.getState(),
StateChangeEventDataTag.NEW_TRIGGER, updatedTopologyDetails.getTopology().getTrigger()))
.build());
});
return updated;
}

public boolean delete(String topologyId) {
val deleted = topologyStore.delete(topologyId);
if (deleted) {
scheduler.delete(topologyId);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_STATE_CHANGED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
StateChangeEventDataTag.NEW_STATE, EpochTopologyState.DELETED))
.build());
}
return deleted;
}

private static void updateScheduleAndRaiseEvent(final String topologyId,
final EpochTopologyDetails updatedTopologyDetails,
final EpochTopologyDetails previousTopology,
final Scheduler scheduler,
final EpochEventBus eventBus) {
updateTopologySchedule(previousTopology, scheduler);
updateTopologySchedule(updatedTopologyDetails, scheduler);
eventBus.publish(EpochStateChangeEvent.builder()
.type(EpochEventType.TOPOLOGY_UPDATED)
.metadata(Map.of(StateChangeEventDataTag.TOPOLOGY_ID, topologyId,
StateChangeEventDataTag.NEW_STATE, updatedTopologyDetails.getState(),
StateChangeEventDataTag.NEW_TRIGGER, updatedTopologyDetails.getTopology().getTrigger()))
.build());
}

private static void validateCronExpression(final String cronExpression) {
if (!QuartzCronUtility.isValidCronExpression(cronExpression)) {
throw EpochError.raise(EpochErrorCode.INPUT_VALIDATION_ERROR, Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.phonepe.epoch.server.utils.EpochUtils.updateTopologySchedule;
import static com.phonepe.epoch.server.utils.EpochUtils.scheduleTopology;

/**
*
Expand Down Expand Up @@ -100,7 +100,7 @@ private void recoverTopologyRuns() {
}
});
try {
updateTopologySchedule(topologyDetails, scheduler);
scheduleTopology(topologyDetails, scheduler);
}
catch (Exception e) {
log.error("Could not reschedule topology " + topologyDetails.getId() + ". Error: " + EpochUtils.errorMessage(e), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,25 @@ public static String topologyId(final String topologyName) {
public static EpochTopologyDetails detailsFrom(final EpochTopology topology) {
return new EpochTopologyDetails(topologyId(topology), topology, EpochTopologyState.ACTIVE, new Date(), new Date());
}
public static void updateTopologySchedule(final EpochTopologyDetails topologyDetails,

public static void scheduleUpdatedTopology(final EpochTopologyDetails previousTopologyDetails,
final EpochTopologyDetails newTopologyDetails,
final Scheduler scheduler) {
removeScheduledTopology(previousTopologyDetails, scheduler);
scheduleTopology(newTopologyDetails, scheduler);
}

public static void removeScheduledTopology(final EpochTopologyDetails topologyDetails,
final Scheduler scheduler) {
val scheduleId = EpochUtils.scheduleId(topologyDetails);
scheduler.delete(scheduleId);
log.info("Removed schedule with id: {}", scheduleId);
}

public static void scheduleTopology(final EpochTopologyDetails topologyDetails,
final Scheduler scheduler) {
if (topologyDetails.getState() != EpochTopologyState.ACTIVE) {
removeScheduledTopology(topologyDetails, scheduler);
/* double check here for recovery flows */
log.info("Not scheduling topology {} as it is not active", topologyDetails.getId());
return;
}
Expand All @@ -94,13 +109,6 @@ public static void updateTopologySchedule(final EpochTopologyDetails topologyDet
}
}

private static void removeScheduledTopology(final EpochTopologyDetails topologyDetails,
final Scheduler scheduler) {
val scheduleId = EpochUtils.scheduleId(topologyDetails);
scheduler.delete(scheduleId);
log.info("Removed schedule with id: {}", scheduleId);
}

@IgnoreInJacocoGeneratedReport(reason = "Not possible to simulate properly")
private static String readHostname() {
try {
Expand Down

0 comments on commit 9f9963f

Please sign in to comment.