Skip to content

Commit

Permalink
Merge pull request #31 from jembi/allow-golden-id-updates
Browse files Browse the repository at this point in the history
Allow publishing of golden ID updates from JeMPI to Kafka
  • Loading branch information
bradsawadye authored Dec 5, 2023
2 parents d017560 + 728b755 commit e775845
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 11 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"name": "mpi-mediator",
"version": "v1.1.0",
"version": "v1.2.0",
"description": "An OpenHIM mediator to handle all interactions with an MPI component",
"main": "index.ts",
"scripts": {
"dev": "nodemon --config restart.json",
"start": "ts-node -r dotenv/config src/index.ts",
"format": "prettier --check --write '{src,tests}/**/*.ts'",
"lint": "eslint . --ext .ts,.js --fix",
"docker:build": "npm run build; docker build -t jembi/mpi-mediator:v1.1.0 .",
"docker:build": "npm run build; docker build -t jembi/mpi-mediator:latest .",
"build": "npm install; tsc -p tsconfig.build.json",
"test:unit": "LOG_LEVEL='fatal' MODE='testing' mocha -r ts-node/register 'tests/unit/*.ts' --timeout 15000",
"test:cucumber": "CUCUMBER_DEFAULT_TIMEOUT=20000 MODE='testing' cucumber-js 'tests/cucumber/features/**/*.feature' --no-color --exit --logLevel='error' --require 'tests/cucumber/step-definitions/*.ts' --require-module 'ts-node/register' --require-module 'dotenv/config' dotenv_config_path=.env.test --format-options \"{\\\"snippetInterface\\\": \\\"async-await\\\"}\" --format summary --format progress-bar",
Expand Down Expand Up @@ -57,11 +57,11 @@
},
"dependencies": {
"@types/sinon": "^10.0.13",
"date-fns": "^2.29.3",
"express": "^4.18.2",
"express-async-handler": "^1.2.0",
"kafkajs": "^2.2.2",
"date-fns": "^2.29.3",
"http-proxy-middleware": "^2.0.6",
"kafkajs": "^2.2.2",
"node-fetch": "^2.6.7",
"openhim-mediator-utils": "^0.2.4",
"pino": "^8.7.0",
Expand Down
7 changes: 6 additions & 1 deletion src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const getConfig = () => {
kafkaBundleTopic: process.env.KAFKA_BUNDLE_TOPIC || '2xx',
kafkaAsyncBundleTopic: process.env.KAFKA_ASYNC_BUNDLE_TOPIC || '2xx-async',
kafkaErrorTopic: process.env.KAFKA_ERROR_TOPIC || 'errors',
kafkaPatientTopic: process.env.KAFKA_PATIENT_TOPIC ?? 'patient',
mpiKafkaClientId: process.env.MPI_KAFKA_CLIENT_ID || 'mpi-mediator',
runningMode: process.env.MODE || '',
mpiHost: process.env.MPI_HOST || 'santedb-mpi',
Expand All @@ -36,7 +37,11 @@ export const getConfig = () => {
mpiAuthEnabled: process.env.MPI_AUTH_ENABLED === 'false' ? false : true,
mpiClientId: process.env.MPI_CLIENT_ID || '',
mpiClientSecret: process.env.MPI_CLIENT_SECRET || '',
mpiProxyUrl: process.env.MPI_PROXY_URL || '',
cucumberDefaultTimeout: process.env.CUCUMBER_DEFAULT_TIMEOUT || 20000,
disableValidation: process.env.DISABLE_VALIDATION == 'true' ? true : false
disableValidation: process.env.DISABLE_VALIDATION == 'true' ? true : false,
enableJempiGoldenIdUpdate:
process.env.ENABLE_JEMPI_GOLDEN_ID_UPDATE == 'true' ? true : false,
kafkaJempiAuditTopic: process.env.KAFKA_JEMPI_AUDIT_TOPIC ?? 'JeMPI-audit-trail',
});
};
6 changes: 6 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import logger from './logger';
import routes from './routes/index';
import { asyncPatientMatchHandler } from './routes/handlers/kafkaAsyncPatientHandler';
import { setupMediator } from './openhim/openhim';
import { asyncGoldenIdUpdater } from './routes/handlers/kafkaAsyncGoldenIdUpdater';

const config = getConfig();
const app = express();
Expand All @@ -18,6 +19,11 @@ if (config.runningMode !== 'testing') {
if (config.registerMediator) {
setupMediator(path.resolve(__dirname, './openhim/mediatorConfig.json'));
}

asyncPatientMatchHandler();

if (config.enableJempiGoldenIdUpdate) {
asyncGoldenIdUpdater();
}
});
}
2 changes: 1 addition & 1 deletion src/openhim/mediatorConfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"urn": "urn:mediator:mpi-mediator",
"version": "v1.1.0",
"version": "v1.2.0",
"name": "MPI mediator",
"description": "A mediator handling interactions between the OpenHIM Core service, Sante MPI, Hapi-FHIR, and Kafka",
"defaultChannelConfig": [
Expand Down
104 changes: 104 additions & 0 deletions src/routes/handlers/kafkaAsyncGoldenIdUpdater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { Kafka, logLevel } from 'kafkajs';

import { getConfig } from '../../config/config';
import logger from '../../logger';
import { JempiAudit } from '../../types/jempiAudit';
import { fetchMpiResourceByRef } from '../../utils/mpi';
import { Patient } from 'fhir/r3';

export const asyncGoldenIdUpdater = async (): Promise<void> => {
const config = getConfig();

const kafka = new Kafka({
logLevel: logLevel.ERROR,
clientId: config.mpiKafkaClientId,
brokers: config.kafkaBrokers.split(','),
});

const consumer = kafka.consumer({ groupId: 'mpi-mediator-golden-id' });
const producer = kafka.producer();

await consumer.connect();
await consumer.subscribe({
topic: config.kafkaJempiAuditTopic,
});

await producer.connect();

logger.info('Kafka golden ID consumer started');

await consumer.run({
eachMessage: async ({ message }) => {
logger.info('JeMPI audit received from queue');

if (message.value === null) {
logger.info('JeMPI audit message is null');

return;
}

let audit: JempiAudit;

try {
audit = JSON.parse(message.value.toString());
} catch (error) {
logger.error('Error parsing JeMPI audit message', error);

return;
}

if (
audit.event.startsWith('Interaction -> update GoldenID') ||
audit.event.startsWith('Interaction -> new GoldenID')
) {
logger.info(
`Received JeMPI audit for a GoldenID change: Updating patientId ${audit.interactionID} with goldenId ${audit.goldenID}`
);

try {
const resource = await fetchMpiResourceByRef<Patient>(
`Patient/${audit.interactionID}`
);

if (!resource) {
logger.error(`Patient with id ${audit.interactionID} not found in MPI`);

return;
}

if (!resource.link) {
resource.link = [];
}

const goldenIdLink = resource.link.find((link) => link.type === 'refer');

if (goldenIdLink) {
goldenIdLink.other = {
reference: `Patient/${audit.goldenID}`,
};
} else {
resource.link.push({
type: 'refer',
other: {
reference: `Patient/${audit.goldenID}`,
},
});
}

await producer.send({
topic: config.kafkaPatientTopic,
messages: [
{
value: JSON.stringify({
resource,
}),
},
],
});
} catch (err) {
logger.error(`Error sending patient to '${config.kafkaPatientTopic}' topic`, err);
}
}
},
});
};
1 change: 0 additions & 1 deletion src/routes/handlers/kafkaAsyncPatientHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export const asyncPatientMatchHandler = async (): Promise<void> => {
await consumer.connect();
await consumer.subscribe({
topic: config.kafkaAsyncBundleTopic,
fromBeginning: true,
});

logger.info('Kafka consumer started');
Expand Down
6 changes: 6 additions & 0 deletions src/types/jempiAudit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface JempiAudit {
createdAt: string;
interactionID: string;
goldenID: string;
event: string;
}
2 changes: 2 additions & 0 deletions src/utils/kafkaFhir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ export const processBundle = async (bundle: Bundle): Promise<MpiMediatorResponse

//Add the patient's managing organization and extensions
let transformedPatient = Object.assign({}, clientRegistryResponse.body);

if (patientResource) {
if (patientMpiTransformResult.extension?.length) {
transformedPatient = Object.assign({}, transformedPatient, {
extension: patientMpiTransformResult.extension,
});
}

if (patientMpiTransformResult.managingOrganization) {
transformedPatient = Object.assign({}, transformedPatient, {
managingOrganization: patientMpiTransformResult.managingOrganization,
Expand Down
4 changes: 4 additions & 0 deletions src/utils/mpi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import { ClientOAuth2, OAuth2Token } from './client-oauth2';
// Singleton instance of MPI Token stored in memory
export let mpiToken: OAuth2Token | null = null;

export const resetMpiToken = () => {
mpiToken = null;
};

/**
* Returns an instance of MPI token, it does renew the token when expired.
*/
Expand Down
6 changes: 5 additions & 1 deletion src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export const modifyBundle = (
* This method removes the patient's extension and managing organization. The Client registry only stores Patient resources. Extensions and Managing Organization
* references will result in validation errors.
* The function returns a tranformed patient, extension and managing organization.
*/
*/
export const transformPatientResourceForMPI = (patient: Resource): MpiTransformResult => {
const transformedPatient = JSON.parse(JSON.stringify(patient));

Expand All @@ -203,6 +203,10 @@ export const transformPatientResourceForMPI = (patient: Resource): MpiTransformR
};

export const createNewPatientRef = (patientId: string): string => {
if (config.mpiProxyUrl) {
return `${config.mpiProxyUrl}/fhir/Patient/${patientId}`;
}

return `${config.mpiProtocol}://${config.mpiHost}:${config.mpiPort}/fhir/Patient/${patientId}`;
};

Expand Down
Loading

0 comments on commit e775845

Please sign in to comment.