From f1650b63d02a5803214daeb01fb6740ecd89bd55 Mon Sep 17 00:00:00 2001 From: Alan Francis Date: Fri, 3 May 2024 11:18:48 +0530 Subject: [PATCH] fix: multiple sets are processed fhir is created for all sets even if one of the file is missing a column/field --- src/ahc-hrsn-elt/screening/orchestrate.ts | 413 ++++++++++++---------- 1 file changed, 219 insertions(+), 194 deletions(-) diff --git a/src/ahc-hrsn-elt/screening/orchestrate.ts b/src/ahc-hrsn-elt/screening/orchestrate.ts index 0c0c21ff..0a914b57 100644 --- a/src/ahc-hrsn-elt/screening/orchestrate.ts +++ b/src/ahc-hrsn-elt/screening/orchestrate.ts @@ -433,6 +433,7 @@ export class OrchEngine { readonly issue_type: string; readonly issue_message: string; readonly invalid_value: string; + readonly ingest_table_name: string; }[]; }[]; readonly duckdb: ddbo.DuckDbShell; @@ -683,11 +684,12 @@ export class OrchEngine { tableGroupCheckSql.join("\n"), ); ingestSQL.push( - `SELECT session_entry_id, orch_session_issue_id, issue_type, issue_message, invalid_value FROM orch_session_issue WHERE session_id = '${sessionID}'`, + `SELECT osi.session_entry_id, orch_session_issue_id, issue_type, issue_message, invalid_value, ingest_table_name FROM orch_session_issue osi LEFT JOIN orch_session_entry ose ON osi.session_entry_id = ose.orch_session_entry_id WHERE osi.session_id = '${sessionID}'`, ); const ingestResult = await this.duckdb.jsonResult< (typeof this.ingestables)[number]["issues"][number] >(ingestSQL.join("\n"), osc.current.nbCellID); + const errGroupName: string[] = []; if (ingestResult.json) { // if errors were found, put the problems into the proper ingestable issues for (const row of ingestResult.json) { @@ -695,12 +697,22 @@ export class OrchEngine { (i) => i.sessionEntryID == row.session_entry_id, ); if (ingestable) ingestable.issues.push(row); + for (const groupName of uniqueGroups) { + if (row.ingest_table_name.includes(groupName)) { + errGroupName.push(groupName); + } + } } } - // for the next step (ensureContent) we only want to pursue remainder of // ingestion for those ingestables that didn't have errors during construction - return this.ingestables.filter((i) => i.issues.length == 0); + const filteredRes = this.ingestables.filter((i) => { + const hasErrorGroup = errGroupName.some((groupName) => + i.source.tableName.includes(groupName) + ); + return i.issues.length == 0 && !hasErrorGroup; + }); + return filteredRes; } /** @@ -852,200 +864,16 @@ export class OrchEngine { DETACH DATABASE ${rdbSchemaName}; - CREATE VIEW orch_session_fhir_emit AS - WITH ValidEncounters AS ( - SELECT - DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, - CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , - osic.orch_session_id, - osic.device_id, - osic.version, - osic.orch_session_entry_id, - scr.PAT_MRN_ID , - scr.FACILITY_ID , - osic.disposition, - CASE - WHEN osic.disposition = 'REJECTION' - OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' - ELSE 'YES' - END AS FHIR_EMITTABLE - -- Add name of the fhir json file, session id and other ways to connect this to the proper session - -- Later we might actually store the fhir json in the actual column as well - FROM - orch_session_issue_classification osic - JOIN ${csv.aggrScreeningTableName} scr - ON - osic.ingest_table_name = scr.source_table - WHERE - osic.DISPOSITION IN ('REJECTION', 'STRUCTURAL ISSUE') - - UNION ALL - - SELECT - DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, - CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , - osic.orch_session_id, - osic.device_id, - osic.version, - osic.orch_session_entry_id, - qad.PAT_MRN_ID , - scr.FACILITY_ID , - osic.DISPOSITION, - CASE WHEN osic.disposition = 'REJECTION' OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' ELSE 'YES' END AS FHIR_EMITTABLE - FROM - orch_session_issue_classification osic - JOIN ${csv.aggrQeAdminData} qad - ON osic.ingest_table_name = qad.source_table - JOIN ${csv.aggrScreeningTableName} scr - ON qad.PAT_MRN_ID = scr.PAT_MRN_ID - WHERE - osic.DISPOSITION IN ('REJECTION', 'STRUCTURAL ISSUE') - - UNION ALL - - SELECT - DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, - CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , - osic.orch_session_id, - osic.device_id, - osic.version, - osic.orch_session_entry_id, - adt.PAT_MRN_ID , - scr.FACILITY_ID , - osic.DISPOSITION, - CASE - WHEN osic.disposition = 'REJECTION' - OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' - ELSE 'YES' - END AS FHIR_EMITTABLE - FROM - orch_session_issue_classification osic - JOIN ${csv.aggrPatientDemogrTableName} adt - ON - osic.ingest_table_name = adt.source_table - JOIN ${csv.aggrScreeningTableName} scr - ON - adt.PAT_MRN_ID = scr.PAT_MRN_ID - WHERE - osic.DISPOSITION IN ('REJECTION', 'STRUCTURAL ISSUE') - ), - InvalidEncounters AS ( - - SELECT - DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, - CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , - osic.orch_session_id, - osic.device_id, - osic.version, - osic.orch_session_entry_id, - scr.PAT_MRN_ID , - scr.FACILITY_ID , - osic.disposition, - CASE - WHEN osic.disposition = 'REJECTION' - OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' - ELSE 'YES' - END AS FHIR_EMITTABLE - FROM - orch_session_issue_classification osic - JOIN ${csv.aggrScreeningTableName} scr - ON - osic.ingest_table_name = scr.source_table - - WHERE - osic.DISPOSITION NOT IN ('REJECTION', 'STRUCTURAL ISSUE') - - - UNION ALL - - SELECT - DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, - CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , - osic.orch_session_id, - osic.device_id, - osic.version, - osic.orch_session_entry_id, - qad.PAT_MRN_ID , - scr.FACILITY_ID , - osic.DISPOSITION, - CASE WHEN osic.disposition = 'REJECTION' OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' ELSE 'YES' END AS FHIR_EMITTABLE - FROM - orch_session_issue_classification osic - JOIN ${csv.aggrQeAdminData} qad - ON osic.ingest_table_name = qad.source_table - JOIN ${csv.aggrScreeningTableName} scr - ON qad.PAT_MRN_ID = scr.PAT_MRN_ID - - WHERE - osic.DISPOSITION NOT IN ('REJECTION', 'STRUCTURAL ISSUE') - - UNION ALL - - SELECT - DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, - CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , - osic.orch_session_id, - osic.device_id, - osic.version, - osic.orch_session_entry_id, - adt.PAT_MRN_ID , - scr.FACILITY_ID , - osic.DISPOSITION, - CASE - WHEN osic.disposition = 'REJECTION' - OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' - ELSE 'YES' - END AS FHIR_EMITTABLE - FROM - orch_session_issue_classification osic - JOIN ${csv.aggrPatientDemogrTableName} adt - ON - osic.ingest_table_name = adt.source_table - JOIN ${csv.aggrScreeningTableName} scr - ON - adt.PAT_MRN_ID = scr.PAT_MRN_ID - WHERE - osic.DISPOSITION NOT IN ('REJECTION', 'STRUCTURAL ISSUE') - ) - - SELECT - ENCOUNTER_ID, - orch_session_id AS ORCH_SESSION_ID, - device_id AS DEVICE_ID, - version AS VERSION, - orch_session_entry_id AS ORCH_SESSION_ENTRY_ID, - PAT_MRN_ID, - disposition AS DISPOSITION, - FHIR_EMITTABLE, - CONCAT('fhir-',PAT_MRN_ID,'-',ENCOUNTER_ID,'.json') AS FHIR_JSON_FILE - FROM - ValidEncounters - - UNION ALL - - SELECT - ENCOUNTER_ID, - orch_session_id AS ORCH_SESSION_ID, - device_id AS DEVICE_ID, - version AS VERSION, - orch_session_entry_id AS ORCH_SESSION_ENTRY_ID, - PAT_MRN_ID, - disposition AS DISPOSITION, - FHIR_EMITTABLE, - CASE WHEN FHIR_EMITTABLE='YES' THEN CONCAT('fhir-',PAT_MRN_ID,'-',ENCOUNTER_ID,'.json') ELSE '' END AS FHIR_JSON_FILE - FROM - InvalidEncounters - WHERE - ENCOUNTER_ID NOT IN (SELECT ENCOUNTER_ID FROM ValidEncounters); - - ${afterFinalize.length > 0 ? (afterFinalize.join(";\n") + ";") : "-- no after-finalize SQL provided"}` .SQL(this.govn.emitCtx), isc.current.nbCellID, ); - const fhirViewMainQuery = this.createFhirViewQuery(); const tableCount = await this.checkRequiredTables(); if (tableCount === 3) { + console.log(tableCount); + const fhirEmitQuery = this.createFhirEmitQuery(); + await this.duckdb.execute(fhirEmitQuery); + const fhirViewMainQuery = this.createFhirViewQuery(); await this.duckdb.execute(fhirViewMainQuery); fhirGeneratorCheck = true; } @@ -1067,6 +895,196 @@ export class OrchEngine { return 0; } + createFhirEmitQuery(): string { + return ` + CREATE VIEW orch_session_fhir_emit AS + WITH ValidEncounters AS ( + SELECT + DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, + CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , + osic.orch_session_id, + osic.device_id, + osic.version, + osic.orch_session_entry_id, + scr.PAT_MRN_ID , + scr.FACILITY_ID , + osic.disposition, + CASE + WHEN osic.disposition = 'REJECTION' + OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' + ELSE 'YES' + END AS FHIR_EMITTABLE + -- Add name of the fhir json file, session id and other ways to connect this to the proper session + -- Later we might actually store the fhir json in the actual column as well + FROM + orch_session_issue_classification osic + JOIN ${csv.aggrScreeningTableName} scr + ON + osic.ingest_table_name = scr.source_table + WHERE + osic.DISPOSITION IN ('REJECTION', 'STRUCTURAL ISSUE') + + UNION ALL + + SELECT + DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, + CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , + osic.orch_session_id, + osic.device_id, + osic.version, + osic.orch_session_entry_id, + qad.PAT_MRN_ID , + scr.FACILITY_ID , + osic.DISPOSITION, + CASE WHEN osic.disposition = 'REJECTION' OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' ELSE 'YES' END AS FHIR_EMITTABLE + FROM + orch_session_issue_classification osic + JOIN ${csv.aggrQeAdminData} qad + ON osic.ingest_table_name = qad.source_table + JOIN ${csv.aggrScreeningTableName} scr + ON qad.PAT_MRN_ID = scr.PAT_MRN_ID + WHERE + osic.DISPOSITION IN ('REJECTION', 'STRUCTURAL ISSUE') + + UNION ALL + + SELECT + DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, + CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , + osic.orch_session_id, + osic.device_id, + osic.version, + osic.orch_session_entry_id, + adt.PAT_MRN_ID , + scr.FACILITY_ID , + osic.DISPOSITION, + CASE + WHEN osic.disposition = 'REJECTION' + OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' + ELSE 'YES' + END AS FHIR_EMITTABLE + FROM + orch_session_issue_classification osic + JOIN ${csv.aggrPatientDemogrTableName} adt + ON + osic.ingest_table_name = adt.source_table + JOIN ${csv.aggrScreeningTableName} scr + ON + adt.PAT_MRN_ID = scr.PAT_MRN_ID + WHERE + osic.DISPOSITION IN ('REJECTION', 'STRUCTURAL ISSUE') + ), + InvalidEncounters AS ( + + SELECT + DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, + CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , + osic.orch_session_id, + osic.device_id, + osic.version, + osic.orch_session_entry_id, + scr.PAT_MRN_ID , + scr.FACILITY_ID , + osic.disposition, + CASE + WHEN osic.disposition = 'REJECTION' + OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' + ELSE 'YES' + END AS FHIR_EMITTABLE + FROM + orch_session_issue_classification osic + JOIN ${csv.aggrScreeningTableName} scr + ON + osic.ingest_table_name = scr.source_table + + WHERE + osic.DISPOSITION NOT IN ('REJECTION', 'STRUCTURAL ISSUE') + + + UNION ALL + + SELECT + DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, + CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , + osic.orch_session_id, + osic.device_id, + osic.version, + osic.orch_session_entry_id, + qad.PAT_MRN_ID , + scr.FACILITY_ID , + osic.DISPOSITION, + CASE WHEN osic.disposition = 'REJECTION' OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' ELSE 'YES' END AS FHIR_EMITTABLE + FROM + orch_session_issue_classification osic + JOIN ${csv.aggrQeAdminData} qad + ON osic.ingest_table_name = qad.source_table + JOIN ${csv.aggrScreeningTableName} scr + ON qad.PAT_MRN_ID = scr.PAT_MRN_ID + + WHERE + osic.DISPOSITION NOT IN ('REJECTION', 'STRUCTURAL ISSUE') + + UNION ALL + + SELECT + DISTINCT (CONCAT(scr.ENCOUNTER_ID,scr.FACILITY_ID,'-',scr.PAT_MRN_ID)) AS UNIQUE_ID, + CASE WHEN scr.ENCOUNTER_ID IS NOT NULL THEN scr.ENCOUNTER_ID ELSE CONCAT('encounter-',scr.FACILITY_ID,'-',scr.PAT_MRN_ID)END AS ENCOUNTER_ID , + osic.orch_session_id, + osic.device_id, + osic.version, + osic.orch_session_entry_id, + adt.PAT_MRN_ID , + scr.FACILITY_ID , + osic.DISPOSITION, + CASE + WHEN osic.disposition = 'REJECTION' + OR osic.disposition = 'STRUCTURAL ISSUE' THEN 'NO' + ELSE 'YES' + END AS FHIR_EMITTABLE + FROM + orch_session_issue_classification osic + JOIN ${csv.aggrPatientDemogrTableName} adt + ON + osic.ingest_table_name = adt.source_table + JOIN ${csv.aggrScreeningTableName} scr + ON + adt.PAT_MRN_ID = scr.PAT_MRN_ID + WHERE + osic.DISPOSITION NOT IN ('REJECTION', 'STRUCTURAL ISSUE') + ) + + SELECT + ENCOUNTER_ID, + orch_session_id AS ORCH_SESSION_ID, + device_id AS DEVICE_ID, + version AS VERSION, + orch_session_entry_id AS ORCH_SESSION_ENTRY_ID, + PAT_MRN_ID, + disposition AS DISPOSITION, + FHIR_EMITTABLE, + CONCAT('fhir-',PAT_MRN_ID,'-',ENCOUNTER_ID,'.json') AS FHIR_JSON_FILE + FROM + ValidEncounters + + UNION ALL + + SELECT + ENCOUNTER_ID, + orch_session_id AS ORCH_SESSION_ID, + device_id AS DEVICE_ID, + version AS VERSION, + orch_session_entry_id AS ORCH_SESSION_ENTRY_ID, + PAT_MRN_ID, + disposition AS DISPOSITION, + FHIR_EMITTABLE, + CASE WHEN FHIR_EMITTABLE='YES' THEN CONCAT('fhir-',PAT_MRN_ID,'-',ENCOUNTER_ID,'.json') ELSE '' END AS FHIR_JSON_FILE + FROM + InvalidEncounters + WHERE + ENCOUNTER_ID NOT IN (SELECT ENCOUNTER_ID FROM ValidEncounters); + `; + } + createFhirViewQuery(): string { const cteFhirPatient = this.createCteFhirPatient(); const cteFhirConsent = this.createCteFhirConsent(); @@ -1506,10 +1524,17 @@ export class OrchEngine { ws.unindentWhitespace(` INSTALL spatial; LOAD spatial; -- TODO: join with orch_session table to give all the results in one sheet - COPY (SELECT * FROM orch_session_issue_classification) TO '${diagsXlsx}' WITH (FORMAT GDAL, DRIVER 'xlsx'); - COPY (SELECT * FROM orch_session_fhir_emit) TO '${diagsFhirXlsx}' WITH (FORMAT GDAL, DRIVER 'xlsx');`), + COPY (SELECT * FROM orch_session_issue_classification) TO '${diagsXlsx}' WITH (FORMAT GDAL, DRIVER 'xlsx');`), "emitDiagnostics" ); + if (fhirGeneratorCheck) { + await this.duckdb.execute( + ws.unindentWhitespace(` + INSTALL spatial; LOAD spatial; + COPY (SELECT * FROM orch_session_fhir_emit) TO '${diagsFhirXlsx}' WITH (FORMAT GDAL, DRIVER 'xlsx');`), + "emitDiagnostics", + ); + } } const stringifiableArgs = JSON.parse( @@ -1535,7 +1560,7 @@ export class OrchEngine { ), ); - if (egress.diagsJsonSupplier) { + if (egress.diagsJsonSupplier && fhirGeneratorCheck) { const diagsJson = egress.diagsJsonSupplier(); const fhirDiags = await this.duckdb.jsonResult( this.govn.SQL`