Skip to content

Commit

Permalink
refactor CoordinationPlace to improve extensibility and reduce comple…
Browse files Browse the repository at this point in the history
…xity (#877)
  • Loading branch information
fbruton authored Aug 7, 2024
1 parent 511b047 commit dc903b5
Showing 1 changed file with 98 additions and 27 deletions.
125 changes: 98 additions & 27 deletions src/main/java/emissary/place/CoordinationPlace.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,22 @@ protected boolean shouldContinue(IBaseDataObject d, IServiceProviderPlace p) {
return true;
}

/**
* Return whether to continue traversing the list of coordinated places when an error occurs in one of them
*
* @param p place that is currently processing the ibdo
* @param errorOccurred true if an error occurred
*
* @return false if processing should not continue
*/
protected boolean shouldContinue(IServiceProviderPlace p, boolean errorOccurred) {
if (!continueOnError() && errorOccurred) {
logger.info("Error terminating coordination step at {}", p);
return false;
}
return true;
}

/**
* Evaluate whether to skip processing. This will allow the coordination place to continue to the next configured place.
* Note that shouldContinue method takes precedence. Use one or the other and be cautious when using both Classes can
Expand All @@ -178,9 +194,9 @@ protected void sproutHook(List<IBaseDataObject> sproutList, IBaseDataObject pare

/**
* Consume a data object and coordinate its processing
*
*
* @param d the payload to process
* @param hd true if doing heavy duty processing
* @param hd true if doing heavy-duty processing
* @return the list of sprouted data objects
*/
protected List<IBaseDataObject> coordinate(IBaseDataObject d, boolean hd) {
Expand All @@ -197,12 +213,7 @@ protected List<IBaseDataObject> coordinate(IBaseDataObject d, boolean hd) {
continue;
}

if (updateTransformHistory) {
DirectoryEntry de = p.getDirectoryEntry();
de.setDataType(d.currentForm());
// append to the transform history, with flag indicating that the visit was coordinated
d.appendTransformHistory(de.getKey(), true);
}
updateTransformHistory(d, p);

// Collect attachments for hd processing
List<IBaseDataObject> sprouts = null;
Expand All @@ -220,16 +231,14 @@ protected List<IBaseDataObject> coordinate(IBaseDataObject d, boolean hd) {
}
errorOccurred = d.currentForm().equals(Form.ERROR);
} catch (Exception ex) {
logger.warn("agentProcess {} called from Coordinate problem", (hd ? "HeavyDuty" : "Call"), ex);
errorOccurred = true;
errorOccurred = handlePlaceException(p, hd, ex);
} finally {
if (Thread.interrupted()) {
logger.warn("Place {} was interrupted during execution.", p);
}
}

if (errorOccurred) {
logger.info("Error terminating coordination step at {}", p);
if (!shouldContinue(p, errorOccurred)) {
break;
}

Expand All @@ -239,30 +248,92 @@ protected List<IBaseDataObject> coordinate(IBaseDataObject d, boolean hd) {
}
}

if (!errorOccurred) {
// Process the ouptut form according to configuration
if (outputForm != null) {
if (pushForm) {
d.pushCurrentForm(outputForm);
} else {
d.setCurrentForm(outputForm);
}
}
applyForm(d, errorOccurred);

// Clean up my proxies
nukeMyProxies(d);
sproutHook(d, hd, sproutCollection);

}
// Allow derived classes a shot to clean up the parent
cleanUpHook(d);

return sproutCollection;
}

private void sproutHook(IBaseDataObject d, boolean hd, List<IBaseDataObject> sproutCollection) {
// Allow derived classes a shot at the sprouts
if (hd) {
sproutHook(sproutCollection, d);
}
}

// Allow derived classes a shot to clean up the parent
cleanUpHook(d);
/**
* Allow derived classes a shot to handle a place exception
*
* @param p place that was processing when the exception was thrown
* @param hd true if doing heavy-duty processing
* @param ex exception thrown by the place
*
* @return if an error occurred
*/
protected boolean handlePlaceException(IServiceProviderPlace p, boolean hd, Exception ex) {
logger.warn("agentProcess{} called from Coordinate problem", (hd ? "HeavyDuty" : "Call"), ex);
return true;
}

return sproutCollection;
private void updateTransformHistory(IBaseDataObject d, IServiceProviderPlace p) {
if (updateTransformHistory) {
DirectoryEntry de = p.getDirectoryEntry();
de.setDataType(d.currentForm());
// append to the transform history, with flag indicating that the visit was coordinated
d.appendTransformHistory(de.getKey(), true);
}
}

/**
* How to handle applying the output form if an error occurred during processing
*
* @param d the ibdo to process
* @param errorOccurred true if an error occurred
*/
protected void applyForm(IBaseDataObject d, boolean errorOccurred) {
if (!errorOccurred || shouldApplyOutputFormOnError()) {
applyOutputForm(d);

// Clean up my proxies
nukeMyProxies(d);
}
}

/**
* If true, process the output form the same for the default and error condition
*
* @return boolean to allow processing of output for when an error occurs
*/
protected boolean shouldApplyOutputFormOnError() {
return false;
}

/**
* Apply the output form according to configuration
*
* @param d the ibdo to process
*/
protected void applyOutputForm(IBaseDataObject d) {
if (outputForm != null) {
if (pushForm) {
d.pushCurrentForm(outputForm);
} else {
d.setCurrentForm(outputForm);
}
}
}

/**
* If false, do not continue processing other places after an error occurs
*
* @return boolean to not continue processing after an error
*/
protected boolean continueOnError() {
return false;
}

protected TimedResource resourceWatcherStart(final IServiceProviderPlace place) {
Expand Down

0 comments on commit dc903b5

Please sign in to comment.