Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial attempt at cleansing via sqlrender #241

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 188 additions & 31 deletions R/execution.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@
}
}

result <- DatabaseConnector::querySql(connection = connection, sql = sql,
errorReportFile = errorReportFile)
delta <- difftime(Sys.time(), start, units = "secs")
.recordResult(result = result, check = check, checkDescription = checkDescription, sql = sql,
executionTime = sprintf("%f %s", delta, attr(delta, "units")))
result <- DatabaseConnector::querySql(connection = connection, sql = sql,
errorReportFile = errorReportFile)
delta <- difftime(Sys.time(), start, units = "secs")
.recordResult(result = result, check = check, checkDescription = checkDescription, sql = sql,
executionTime = sprintf("%f %s", delta, attr(delta, "units")))
},
warning = function(w) {
ParallelLogger::logWarn(sprintf("[Level: %s] [Check: %s] [CDM Table: %s] [CDM Field: %s] %s",
Expand Down Expand Up @@ -156,6 +156,10 @@
#' @param tableCheckThresholdLoc The location of the threshold file for evaluating the table checks. If not specified the default thresholds will be applied.
#' @param fieldCheckThresholdLoc The location of the threshold file for evaluating the field checks. If not specified the default thresholds will be applied.
#' @param conceptCheckThresholdLoc The location of the threshold file for evaluating the concept checks. If not specified the default thresholds will be applied.
#' @param actions List indicating which combinations of actions to perform. CAPTURE=TRUE instructs DQD to
#' to save rows that fail checks to an archive table. CLEANSE=TRUE instructs DQD to remove
#' rows that fail checks from their respective table. EXECUTE=TRUE instructs DQD to run the
#' checks without saving or removing rows that fail checks. The default is actions = list(CAPTURE=FALSE, CLEANSE=FALSE, EXECUTE=TRUE).
#'
#' @return If sqlOnly = FALSE, a list object of results
#'
Expand All @@ -180,7 +184,8 @@ executeDqChecks <- function(connectionDetails,
cdmVersion = "5.3.1",
tableCheckThresholdLoc = "default",
fieldCheckThresholdLoc = "default",
conceptCheckThresholdLoc = "default") {
conceptCheckThresholdLoc = "default",
actions = list(CAPTURE=FALSE, CLEANSE=FALSE, EXECUTE=TRUE)) {

# Check input -------------------------------------------------------------------------------------------------------------------
if (!("connectionDetails" %in% class(connectionDetails))){
Expand Down Expand Up @@ -317,6 +322,57 @@ if (conceptCheckThresholdLoc == "default"){

fieldChecks$cdmFieldName <- toupper(fieldChecks$cdmFieldName)
conceptChecks$cdmFieldName <- toupper(conceptChecks$cdmFieldName)

wasSqlOnly <- sqlOnly # preserve the original setting

if (actions$CAPTURE || actions$CLEANSE) {

# Check if archive tables exist, if not, create them.
prepCleanse(connectionDetails,
cdmDatabaseSchema,
resultsDatabaseSchema,
vocabDatabaseSchema,
cdmSourceName,
outputFolder,
outputFile,
cohortDefinitionId,
cohortDatabaseSchema,
cdmVersion,
tablesToPrep = unique(tableChecks$cdmTableName))

# Create files for cleaning DML and turn off normal execution of dqd.
# This is necessary since we have both SELECT and DML in a single SQL file.

sqlOnly <- TRUE # Set to TRUE to create the DML sql files to execute for capture and cleanse
actions <- list(CAPTURE=actions$CAPTURE, CLEANSE=actions$CLEANSE, EXECUTE=FALSE)

cluster <- ParallelLogger::makeCluster(numberOfThreads = numThreads, singleThreadToMain = TRUE)
resultsList <- ParallelLogger::clusterApply(
cluster = cluster, x = checkDescriptions,
fun = .runCheck,
tableChecks,
fieldChecks,
conceptChecks,
connectionDetails,
connection,
cdmDatabaseSchema,
vocabDatabaseSchema,
cohortDatabaseSchema,
cohortDefinitionId,
outputFolder,
sqlOnly,
actions,
progressBar = TRUE
)
ParallelLogger::stopCluster(cluster = cluster)

# With prep complete and SQL DML files generated, execute the cleanse.
performCleanse(connectionDetails,checksToInclude,outputFolder)
}

# Reset sqlOnly to its original value, turn capture and cleanse off to resume normal (ie, non-cleanse) execution of DQD
sqlOnly <- wasSqlOnly
actions <- list(CAPTURE=FALSE, CLEANSE=FALSE, EXECUTE=TRUE)

cluster <- ParallelLogger::makeCluster(numberOfThreads = numThreads, singleThreadToMain = TRUE)
resultsList <- ParallelLogger::clusterApply(
Expand All @@ -333,6 +389,7 @@ if (conceptCheckThresholdLoc == "default"){
cohortDefinitionId,
outputFolder,
sqlOnly,
actions,
progressBar = TRUE
)
ParallelLogger::stopCluster(cluster = cluster)
Expand Down Expand Up @@ -394,7 +451,8 @@ if (conceptCheckThresholdLoc == "default"){
cohortDatabaseSchema,
cohortDefinitionId,
outputFolder,
sqlOnly) {
sqlOnly,
actions) {

library(magrittr)
ParallelLogger::logInfo(sprintf("Processing check description: %s", checkDescription$checkName))
Expand All @@ -417,33 +475,60 @@ if (conceptCheckThresholdLoc == "default"){
setNames(check[c], c)
})

params <- c(list(dbms = connectionDetails$dbms),
list(sqlFilename = checkDescription$sqlFile),
list(packageName = "DataQualityDashboard"),
list(warnOnMissingParameters = FALSE),
list(cdmDatabaseSchema = cdmDatabaseSchema),
list(cohortDatabaseSchema = cohortDatabaseSchema),
list(cohortDefinitionId = cohortDefinitionId),
list(vocabDatabaseSchema = vocabDatabaseSchema),
list(cohort = cohort),
unlist(columns, recursive = FALSE))
if (actions$CAPTURE || actions$CLEANSE) {
params <- c(list(dbms = connectionDetails$dbms),
list(sqlFilename = checkDescription$sqlFile),
list(packageName = "DataQualityDashboard"),
list(warnOnMissingParameters = FALSE),
list(cdmDatabaseSchema = cdmDatabaseSchema),
list(cohortDatabaseSchema = cohortDatabaseSchema),
list(cohortDefinitionId = cohortDefinitionId),
list(vocabDatabaseSchema = vocabDatabaseSchema),
list(cohort = cohort),
list(CAPTURE = actions$CAPTURE),
list(CLEANSE = actions$CLEANSE),
list(EXECUTE = FALSE),
unlist(columns, recursive = FALSE))

sql <- do.call(SqlRender::loadRenderTranslateSql, params)

sql <- do.call(SqlRender::loadRenderTranslateSql, params)
write(x = sql, file = file.path(outputFolder, sprintf("cleanse_%s.sql", checkDescription$checkName)), append = TRUE)
data.frame()
}

if (actions$EXECUTE) {
params <- c(list(dbms = connectionDetails$dbms),
list(sqlFilename = checkDescription$sqlFile),
list(packageName = "DataQualityDashboard"),
list(warnOnMissingParameters = FALSE),
list(cdmDatabaseSchema = cdmDatabaseSchema),
list(cohortDatabaseSchema = cohortDatabaseSchema),
list(cohortDefinitionId = cohortDefinitionId),
list(vocabDatabaseSchema = vocabDatabaseSchema),
list(cohort = cohort),
list(CAPTURE = FALSE),
list(CLEANSE = FALSE),
list(EXECUTE = TRUE),
unlist(columns, recursive = FALSE))

if (sqlOnly) {
write(x = sql, file = file.path(outputFolder,
sql <- do.call(SqlRender::loadRenderTranslateSql, params)

if (sqlOnly) {
write(x = sql, file = file.path(outputFolder,
sprintf("%s.sql", checkDescription$checkName)), append = TRUE)
data.frame()
} else {
.processCheck(connection = connection,
connectionDetails = connectionDetails,
check = check,
checkDescription = checkDescription,
sql = sql,
outputFolder = outputFolder)
}
data.frame()
} else {
.processCheck(connection = connection,
connectionDetails = connectionDetails,
check = check,
checkDescription = checkDescription,
sql = sql,
outputFolder = outputFolder)
}
}

})
do.call(rbind, dfs)
if (actions$EXECUTE) { do.call(rbind, dfs) }
} else {
ParallelLogger::logWarn(paste0("Warning: Evaluation resulted in no checks: ", filterExpression))
data.frame()
Expand Down Expand Up @@ -747,3 +832,75 @@ writeJsonResultsToTable <- function(connectionDetails,
}
autoCommit
}

# If capturing or cleansing prior to execution, create archive tables if they do not already exist.
# Find the names of the tables that participate in DQD. For each table, create a corresponding "ARCHIVE"
# table to store results prior to deleting or updating.

prepCleanse <- function(connectionDetails,
cdmDatabaseSchema,
resultsDatabaseSchema,
vocabDatabaseSchema,
cdmSourceName,
outputFolder,
outputFile,
cohortDefinitionId,
cohortDatabaseSchema,
cdmVersion,
tablesToPrep)
{
connection <- DatabaseConnector::connect(connectionDetails = connectionDetails)

archiveNames <- paste0(tablesToPrep,"_ARCHIVE")

ParallelLogger::logInfo("Beginning cleanse prep process.")

allTableNames <- DatabaseConnector::getTableNames(connection,cdmDatabaseSchema)

for (k in 1:length(archiveNames)) {
if (!(archiveNames[k] %in% allTableNames)) {
ParallelLogger::logInfo(sprintf("Creating archive table for %s.", tablesToPrep[k]))
sql <- paste0("CREATE TABLE ",cdmDatabaseSchema,".",archiveNames[k],
" AS SELECT * FROM ",cdmDatabaseSchema,".",tablesToPrep[k],
" WHERE 1 = 0;")
DatabaseConnector::executeSql(connection,sql)
sql <- paste0("ALTER TABLE ",cdmDatabaseSchema,".",archiveNames[k],
" ADD COLUMN DQD_ARCHIVE_DATE DATE DEFAULT NULL;")
DatabaseConnector::executeSql(connection,sql)
}
}

ParallelLogger::logInfo("Cleanse prep process complete.")
on.exit(DatabaseConnector::disconnect(connection = connection))
}

# The cleanse is performed by executing the SQL scripts generated via .runCheck.
# It is expected that prepCleanse() and .runCheck() are executed before performCleanse()
# and that the SQL dml scripts were generated.

performCleanse <- function(connectionDetails,checksToInclude,outputFolder)
{

excludedChecks <- c("measurePersonCompleteness","cdmField")
conn <- DatabaseConnector::connect(connectionDetails)

tryCatch(
expr = {
for (k in 1:length(checksToInclude)) {
cleanseFileName <- paste0(outputFolder,"/cleanse_",checksToInclude[k],".sql")
if (file.exists(cleanseFileName) && !(checksToInclude[k] %in% excludedChecks)) {
ParallelLogger::logInfo(paste0("Executing clease script: ",cleanseFileName))
sql <- SqlRender::readSql(cleanseFileName)
DatabaseConnector::executeSql(conn,sql)
}
}
},
warning = function(w) {
ParallelLogger::logWarn(sprintf("[Cleanse file name: %s] %s",cleanseFileName, w$message))
},
error = function(e) {
ParallelLogger::logError(sprintf("[Cleanse file name: %s] %s",cleanseFileName, e$message))
}
)
on.exit(DatabaseConnector::disconnect(conn))
}
86 changes: 53 additions & 33 deletions inst/sql/sql_server/concept_plausible_gender.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*********
CONCEPT LEVEL check:
PLAUSIBLE_GENDER - number of records of a given concept which occur in person with implausible gender for that concept
Expand All @@ -15,39 +14,60 @@ cohortDatabaseSchema = @cohortDatabaseSchema
}
**********/

{@CAPTURE} ? {
INSERT INTO @cdmDatabaseSchema.@cdmTableName_archive
SELECT cdmTable.*, getdate()
FROM @cdmDatabaseSchema.@cdmTableName cdmTable
INNER JOIN @cdmDatabaseSchema.person p
ON cdmTable.person_id = p.person_id
WHERE cdmTable.@cdmFieldName = @conceptId
AND p.gender_concept_id <> {@plausibleGender == 'Male'} ? {8507} : {8532};
}

SELECT num_violated_rows, CASE WHEN denominator.num_rows = 0 THEN 0 ELSE 1.0*num_violated_rows/denominator.num_rows END AS pct_violated_rows,
denominator.num_rows as num_denominator_rows
FROM
(
SELECT COUNT_BIG(*) AS num_violated_rows
FROM
(
SELECT cdmTable.*
{@CLEANSE} ? {
DELETE FROM @cdmDatabaseSchema.@cdmTableName WHERE @cdmTableName_ID IN (
SELECT cdmTable.@cdmTableName_ID
FROM @cdmDatabaseSchema.@cdmTableName cdmTable
INNER JOIN @cdmDatabaseSchema.person p
ON cdmTable.person_id = p.person_id

{@cohort}?{
JOIN @cohortDatabaseSchema.COHORT c
ON cdmTable.PERSON_ID = c.SUBJECT_ID
AND c.COHORT_DEFINITION_ID = @cohortDefinitionId
}

INNER JOIN @cdmDatabaseSchema.person p
ON cdmTable.person_id = p.person_id
WHERE cdmTable.@cdmFieldName = @conceptId
AND p.gender_concept_id <> {@plausibleGender == 'Male'} ? {8507} : {8532}
) violated_rows
) violated_row_count,
(
SELECT COUNT_BIG(*) AS num_rows
FROM @cdmDatabaseSchema.@cdmTableName cdmTable

{@cohort}?{
JOIN @cohortDatabaseSchema.COHORT c
ON cdmTable.PERSON_ID = c.SUBJECT_ID
AND c.COHORT_DEFINITION_ID = @cohortDefinitionId
}

WHERE @cdmFieldName = @conceptId
) denominator
;
);
}

{@EXECUTE} ? {
SELECT num_violated_rows, CASE WHEN denominator.num_rows = 0 THEN 0 ELSE 1.0*num_violated_rows/denominator.num_rows END AS pct_violated_rows,
denominator.num_rows as num_denominator_rows
FROM
(
SELECT COUNT_BIG(*) AS num_violated_rows
FROM
(
SELECT cdmTable.*
FROM @cdmDatabaseSchema.@cdmTableName cdmTable
INNER JOIN @cdmDatabaseSchema.person p
ON cdmTable.person_id = p.person_id

{@cohort}?{
JOIN @cohortDatabaseSchema.COHORT c
ON cdmTable.PERSON_ID = c.SUBJECT_ID
AND c.COHORT_DEFINITION_ID = @cohortDefinitionId
}

WHERE cdmTable.@cdmFieldName = @conceptId
AND p.gender_concept_id <> {@plausibleGender == 'Male'} ? {8507} : {8532}
) violated_rows
) violated_row_count,
(
SELECT COUNT_BIG(*) AS num_rows
FROM @cdmDatabaseSchema.@cdmTableName cdmTable

{@cohort}?{
JOIN @cohortDatabaseSchema.COHORT c
ON cdmTable.PERSON_ID = c.SUBJECT_ID
AND c.COHORT_DEFINITION_ID = @cohortDefinitionId
}

WHERE @cdmFieldName = @conceptId
) denominator;
}
Loading