Skip to content

Commit

Permalink
Added use of central cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jkiddo committed Jun 11, 2024
1 parent 641febe commit a923b1c
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,7 @@ default IBase resolveReference(@Nonnull IIdType theReference, @Nullable IBase th
* @param beforeContext
* @return
*/
default List<IBase> resolveConstant(Object appContext, String name, boolean beforeContext) {return null; }
default List<IBase> resolveConstant(Object appContext, String name, boolean beforeContext) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.config.SubscriptionConfig;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -33,11 +34,12 @@
@Import(SubscriptionConfig.class)
public class SubscriptionTopicConfig {
@Bean
SubscriptionTopicMatchingSubscriber subscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
SubscriptionTopicMatchingSubscriber subscriptionTopicMatchingSubscriber(
FhirContext theFhirContext, MemoryCacheService memoryCacheService) {
switch (theFhirContext.getVersion().getVersion()) {
case R5:
case R4B:
return new SubscriptionTopicMatchingSubscriber(theFhirContext);
return new SubscriptionTopicMatchingSubscriber(theFhirContext, memoryCacheService);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;

Expand All @@ -29,10 +30,15 @@
public class SubscriptionTopicMatcher {
private final SubscriptionTopicSupport mySubscriptionTopicSupport;
private final SubscriptionTopic myTopic;
private final MemoryCacheService myMemoryCacheService;

public SubscriptionTopicMatcher(SubscriptionTopicSupport theSubscriptionTopicSupport, SubscriptionTopic theTopic) {
public SubscriptionTopicMatcher(
SubscriptionTopicSupport theSubscriptionTopicSupport,
SubscriptionTopic theTopic,
MemoryCacheService memoryCacheService) {
mySubscriptionTopicSupport = theSubscriptionTopicSupport;
myTopic = theTopic;
myMemoryCacheService = memoryCacheService;
}

public InMemoryMatchResult match(ResourceModifiedMessage theMsg) {
Expand All @@ -43,7 +49,7 @@ public InMemoryMatchResult match(ResourceModifiedMessage theMsg) {
for (SubscriptionTopic.SubscriptionTopicResourceTriggerComponent next : triggers) {
if (resourceName.equals(next.getResource())) {
SubscriptionTriggerMatcher matcher =
new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, theMsg, next);
new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, theMsg, next, myMemoryCacheService);
InMemoryMatchResult result = matcher.match();
if (result.matched()) {
// as soon as one trigger matches, we're done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.util.Logs;
Expand Down Expand Up @@ -67,8 +68,11 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
@Autowired
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;

public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
private MemoryCacheService myMemoryCacheService;

public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext, MemoryCacheService memoryCacheService) {
myFhirContext = theFhirContext;
this.myMemoryCacheService = memoryCacheService;
}

@Override
Expand Down Expand Up @@ -110,7 +114,8 @@ private void matchActiveSubscriptionTopicsAndDeliver(ResourceModifiedMessage the

Collection<SubscriptionTopic> topics = mySubscriptionTopicRegistry.getAll();
for (SubscriptionTopic topic : topics) {
SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic);
SubscriptionTopicMatcher matcher =
new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic, myMemoryCacheService);
InMemoryMatchResult result = matcher.match(theMsg);
if (result.matched()) {
int deliveries = deliverToTopicSubscriptions(theMsg, topic, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.storage.PreviousVersionReader;
Expand All @@ -50,11 +51,13 @@ public class SubscriptionTriggerMatcher {
private final IFhirResourceDao myDao;
private final PreviousVersionReader myPreviousVersionReader;
private final SystemRequestDetails mySrd;
private final MemoryCacheService myMemoryCacheService;

public SubscriptionTriggerMatcher(
SubscriptionTopicSupport theSubscriptionTopicSupport,
ResourceModifiedMessage theMsg,
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent theTrigger) {
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent theTrigger,
MemoryCacheService theMemoryCacheService) {
mySubscriptionTopicSupport = theSubscriptionTopicSupport;
myOperation = theMsg.getOperationType();
myResource = theMsg.getPayload(theSubscriptionTopicSupport.getFhirContext());
Expand All @@ -63,6 +66,7 @@ public SubscriptionTriggerMatcher(
myTrigger = theTrigger;
myPreviousVersionReader = new PreviousVersionReader(myDao);
mySrd = new SystemRequestDetails();
myMemoryCacheService = theMemoryCacheService;
}

public InMemoryMatchResult match() {
Expand Down Expand Up @@ -114,39 +118,51 @@ private InMemoryMatchResult match(
}
// WIP STR5 implement resultForCreate and resultForDelete
if (theQueryCriteria.getRequireBoth()) {
return InMemoryMatchResult.and(InMemoryMatchResult.and(previousMatches, currentMatches), fhirPathCriteriaEvaluationResult);
return InMemoryMatchResult.and(
InMemoryMatchResult.and(previousMatches, currentMatches), fhirPathCriteriaEvaluationResult);
} else {
return InMemoryMatchResult.and(InMemoryMatchResult.or(previousMatches, currentMatches), fhirPathCriteriaEvaluationResult);
return InMemoryMatchResult.and(
InMemoryMatchResult.or(previousMatches, currentMatches), fhirPathCriteriaEvaluationResult);
}
}


private InMemoryMatchResult evaluateFhirPathCriteria(String theFhirPathCriteria) {
if (!Strings.isNullOrEmpty(theFhirPathCriteria)) {
IFhirPath fhirPathEngine = mySubscriptionTopicSupport.getFhirContext().newFhirPath();
fhirPathEngine.setEvaluationContext(new IFhirPathEvaluationContext(){
IFhirPath fhirPathEngine =
mySubscriptionTopicSupport.getFhirContext().newFhirPath();
fhirPathEngine.setEvaluationContext(new IFhirPathEvaluationContext() {

@Override
public List<IBase> resolveConstant(Object appContext, String name, boolean beforeContext) {
if("current".equalsIgnoreCase(name))
return List.of(myResource);
if ("current".equalsIgnoreCase(name)) return List.of(myResource);

if("previous".equalsIgnoreCase(name))
{
if ("previous".equalsIgnoreCase(name)) {
Optional previousResource = myPreviousVersionReader.readPreviousVersion(myResource);
if(previousResource.isPresent())
return List.of((IBase) previousResource.get());
if (previousResource.isPresent()) return List.of((IBase) previousResource.get());
}

return null;
}
});
try {
IFhirPath.IParsedExpression expression = fhirPathEngine.parse(theFhirPathCriteria);
IFhirPath.IParsedExpression expression = myMemoryCacheService.get(
MemoryCacheService.CacheEnum.FHIRPATH_EXPRESSION, theFhirPathCriteria, exp -> {
try {
return fhirPathEngine.parse(exp);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

List<IBase> result = fhirPathEngine.evaluate(myResource, expression, IBase.class);
return InMemoryMatchResult.fromBoolean( result != null && result.size() == 1 && ((BooleanType)result.get(0)).booleanValue());
return InMemoryMatchResult.fromBoolean(
result != null && result.size() == 1 && ((BooleanType) result.get(0)).booleanValue());
} catch (Exception e) {
ourLog.warn("Subscription topic {} has a fhirPathCriteria that is not valid: {}", myTrigger.getId(), theFhirPathCriteria, e);
ourLog.warn(
"Subscription topic {} has a fhirPathCriteria that is not valid: {}",
myTrigger.getId(),
theFhirPathCriteria,
e);
return InMemoryMatchResult.unsupportedFromReason(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package ca.uhn.fhir.jpa.topic;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import org.hl7.fhir.r5.model.Encounter;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.IdType;
Expand All @@ -32,11 +34,14 @@ class SubscriptionTriggerMatcherTest {
@Mock
SearchParamMatcher mySearchParamMatcher;

MemoryCacheService myMemoryCacheService;

private SubscriptionTopicSupport mySubscriptionTopicSupport;
private Encounter myEncounter;

@BeforeEach
public void before() {
myMemoryCacheService = new MemoryCacheService(new JpaStorageSettings());
mySubscriptionTopicSupport = new SubscriptionTopicSupport(ourFhirContext, myDaoRegistry, mySearchParamMatcher);
myEncounter = new Encounter();
myEncounter.setIdElement(new IdType("Encounter", "123", "2"));
Expand All @@ -50,7 +55,7 @@ public void testCreateEmptryTriggerNoMatch() {
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -67,7 +72,7 @@ public void testCreateSimpleTriggerMatches() {
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.CREATE);

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -84,7 +89,7 @@ public void testCreateWrongOpNoMatch() {
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.UPDATE);

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -101,7 +106,7 @@ public void testUpdateMatch() {
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.UPDATE);

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -126,7 +131,7 @@ public void testUpdateWithPrevCriteriaMatch() {
when(mySearchParamMatcher.match(any(), any(), any())).thenReturn(InMemoryMatchResult.successfulMatch());

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -144,7 +149,7 @@ public void testFalseFhirPathCriteriaEvaluation() {
trigger.setFhirPathCriteria("false");

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -162,7 +167,7 @@ public void testInvalidFhirPathCriteriaEvaluation() {
trigger.setFhirPathCriteria("random text");

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -180,7 +185,7 @@ public void testInvalidBooleanOutcomeOfFhirPathCriteriaEvaluation() {
trigger.setFhirPathCriteria("id");

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -198,7 +203,7 @@ public void testValidFhirPathCriteriaEvaluation() {
trigger.setFhirPathCriteria("id = " + myEncounter.getIdElement().getIdPart());

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -216,7 +221,7 @@ public void testValidFhirPathCriteriaEvaluationUsingCurrent() {
trigger.setFhirPathCriteria("%current.id = " + myEncounter.getIdElement().getIdPart());

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -234,7 +239,7 @@ public void testValidFhirPathCriteriaEvaluationReturningNonBoolean() {
trigger.setFhirPathCriteria("%current.id");

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -257,7 +262,7 @@ public void testValidFhirPathReturningCollection() {
when(mockEncounterDao.read(any(), any(), eq(false))).thenReturn(encounterPreviousVersion);

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -283,7 +288,7 @@ public void testUpdateWithPrevCriteriaMatchAndFailingFhirPathCriteria() {
when(mySearchParamMatcher.match(any(), any(), any())).thenReturn(InMemoryMatchResult.successfulMatch());

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -310,7 +315,7 @@ public void testUpdateWithPrevCriteriaMatchAndFhirPathCriteriaUsingPreviousVersi
when(mySearchParamMatcher.match(any(), any(), any())).thenReturn(InMemoryMatchResult.successfulMatch());

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
Expand All @@ -335,10 +340,37 @@ public void testUpdateOnlyFhirPathCriteriaUsingPreviousVersion() {
when(mockEncounterDao.read(any(), any(), eq(false))).thenReturn(encounterPreviousVersion);

// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();

// verify
assertTrue(result.matched());
}

@Test
public void testCacheUsage() {
myEncounter.setStatus(Enumerations.EncounterStatus.INPROGRESS);
ResourceModifiedMessage msg = new ResourceModifiedMessage(ourFhirContext, myEncounter, ResourceModifiedMessage.OperationTypeEnum.UPDATE);

// setup
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();
trigger.setResource("Encounter");
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.UPDATE);
String fhirPathCriteria = "%current.status='in-progress'";
trigger.setFhirPathCriteria(fhirPathCriteria);


IFhirResourceDao mockEncounterDao = mock(IFhirResourceDao.class);
when(myDaoRegistry.getResourceDao("Encounter")).thenReturn(mockEncounterDao);

assertFalse(myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.FHIRPATH_EXPRESSION, fhirPathCriteria));
// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger, myMemoryCacheService);
InMemoryMatchResult result = svc.match();


// verify
assertTrue(result.matched());
assertTrue(myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.FHIRPATH_EXPRESSION, fhirPathCriteria) != null);
}
}
Loading

0 comments on commit a923b1c

Please sign in to comment.