Skip to content

Commit

Permalink
Merge pull request #71 from OHDSI/duckdb2
Browse files Browse the repository at this point in the history
Merge duckdb2 branch into develop
  • Loading branch information
schuemie authored Jan 15, 2025
2 parents a853ae7 + 0d1a7a8 commit 4ce9fdc
Show file tree
Hide file tree
Showing 22 changed files with 317 additions and 285 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/R_CMD_check_Hades.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ jobs:
- uses: r-lib/actions/check-r-package@v2
with:
args: 'c("--no-manual", "--as-cran")'
build_args: 'c("--compact-vignettes=both")'
error-on: '"warning"'
check-dir: '"check"'

- name: Upload source package
if: success() && runner.os == 'macOS' && github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: package_tarball
path: check/*.tar.gz
Expand Down Expand Up @@ -168,7 +169,7 @@ jobs:
- name: Download package tarball
if: ${{ env.new_version != '' }}
uses: actions/download-artifact@v2
uses: actions/download-artifact@v4.1.7
with:
name: package_tarball

Expand All @@ -181,3 +182,4 @@ jobs:
if: ${{ env.new_version != '' }}
run: |
curl --data "build=true" -X POST https://registry.hub.docker.com/u/ohdsi/broadsea-methodslibrary/trigger/f0b51cec-4027-4781-9383-4b38b42dd4f5/
19 changes: 9 additions & 10 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ BugReports: https://github.com/OHDSI/Andromeda/issues
Depends:
dplyr
Imports:
RSQLite,
DBI,
zip,
methods,
dbplyr,
tidyselect,
cli,
rlang,
pillar,
hms
DBI,
zip,
methods,
dbplyr,
tidyselect,
cli,
rlang,
pillar,
duckdb
Suggests:
testthat,
stringr,
Expand Down
4 changes: 2 additions & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ exportMethods(close)
exportMethods(length)
exportMethods(names)
exportMethods(show)
import(RSQLite)
import(dplyr)
import(hms)
import(duckdb)
importClassesFrom(DBI,DBIConnection)
importClassesFrom(DBI,DBIObject)
importClassesFrom(duckdb,duckdb_connection)
importFrom(methods,slotNames)
importFrom(rlang,abort)
importFrom(rlang,inform)
Expand Down
74 changes: 26 additions & 48 deletions R/Indices.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ createIndex <- function(tbl, columnNames, unique = FALSE, indexName = NULL) {
as.character(dbplyr::remote_name(tbl)),
paste(columnNames, collapse = ", "))

RSQLite::dbExecute(conn = dbplyr::remote_con(tbl), statement = statement)
DBI::dbExecute(conn = dbplyr::remote_con(tbl), statement = statement)
invisible(tbl)
}

Expand Down Expand Up @@ -91,38 +91,13 @@ createIndex <- function(tbl, columnNames, unique = FALSE, indexName = NULL) {
#'
#' @export
listIndices <- function(tbl) {
if (!inherits(tbl, "tbl_dbi"))
abort("Argument must be an Andromeda (or DBI) table")
if (!inherits(tbl, "tbl_dbi")) abort("Argument must be an Andromeda (or DBI) table")

tableName <- as.character(dbplyr::remote_name(tbl))
connection <- dbplyr::remote_con(tbl)
indices <- RSQLite::dbGetQuery(conn = connection,
statement = sprintf("PRAGMA index_list('%s');", tableName)) %>%
conn <- dbplyr::remote_con(tbl)
sql <- sprintf("select * from duckdb_indexes where table_name = '%s';", tableName)
DBI::dbGetQuery(conn, sql) %>%
dplyr::as_tibble()
if (nrow(indices) == 0) {
return(dplyr::tibble())
}
getIndexInfo <- function(indexName) {
indexInfo <- RSQLite::dbGetQuery(conn = connection,
statement = sprintf("PRAGMA index_info('%s');", indexName)) %>%
dplyr::as_tibble()
indexInfo$indexName <- indexName
return(indexInfo)
}
indexInfo <- lapply(indices$name, getIndexInfo)
indexInfo <- bind_rows(indexInfo) %>%
select(indexName = "indexName",
columnSequenceId = "seqno",
columnName = "name")

result <- indices %>%
mutate(unique = case_when(.data$unique == 1 ~ TRUE, TRUE ~ FALSE)) %>%
select(indexSequenceId = "seq",
indexName = "name",
unique = "unique") %>%
inner_join(indexInfo, by = "indexName")

return(result)
}

#' Removes an index from an Andromeda table
Expand Down Expand Up @@ -158,34 +133,37 @@ listIndices <- function(tbl) {
#'
#' @export
removeIndex <- function(tbl, columnNames = NULL, indexName = NULL) {
if (!inherits(tbl, "tbl_dbi"))
if (!inherits(tbl, "tbl_dbi"))
abort("First argument must be an Andromeda (or DBI) table")

if (!((is.character(columnNames) && length(columnNames) > 0) || (is.character(indexName) && length(indexName) > 0)))
abort("Either columnNames or indexName must be supplied.")

tableName <- as.character(dbplyr::remote_name(tbl))
connection <- dbplyr::remote_con(tbl)
indices <- RSQLite::dbGetQuery(conn = connection,
statement = sprintf("PRAGMA index_list('%s');", tableName))
indices <- listIndices(tbl)

if (is.null(indexName)) {
for (indexName in indices$name) {
indexInfo <- RSQLite::dbGetQuery(conn = connection,
statement = sprintf("PRAGMA index_info('%s');", indexName))
if (all(columnNames %in% indexInfo$name)) {
indexName <- indexName
break;
}
}
if (is.null(indexName)) {
if (is.character(columnNames) && length(columnNames) > 0) {
# get the index name that matches the columnNames
indexName <- indices %>%
# filter to indices with the number of columns passed in
filter(stringr::str_count(.data$sql, ",") == (length(columnNames) - 1)) %>%
# get index that matches all column names
filter(all(stringr::str_detect(.data$sql, columnNames))) %>%
pull(.data$index_name)

if (length(indexName) == 0) {
abort(sprintf("Could not find an index on column(s) %s", paste(columnNames, collapse = ", ")))
}
} else {
if (!indexName %in% indices$name) {
abort(sprintf("Index with name '%s' not found", indexName))
for(i in indexName) {
if (!(indexName %in% indices$index_name)) abort(sprintf("Index with name '%s' not found", i))
}
}

statement <- sprintf("DROP INDEX %s;", indexName)

RSQLite::dbExecute(conn = dbplyr::remote_con(tbl), statement = statement)
for(i in indexName) {
statement <- sprintf("DROP INDEX IF EXISTS %s;", i)
DBI::dbExecute(conn = dbplyr::remote_con(tbl), statement = statement)
}
invisible(tbl)
}
63 changes: 35 additions & 28 deletions R/LoadingSaving.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#' @description
#' Saves the [`Andromeda`] object in a zipped file. Note that by default the [`Andromeda`] object is
#' automatically closed by saving it to disk. This is due to a limitation of the underlying technology
#' ('RSQLite'). To keep the connection open, use `maintainConnection = TRUE`. This will first
#' ('duckdb'). To keep the connection open, use `maintainConnection = TRUE`. This will first
#' create a temporary copy of the [`Andromeda`] object. Note that this can be substantially slower.
#'
#' @return
Expand Down Expand Up @@ -78,16 +78,19 @@ saveAndromeda <- function(andromeda, fileName, maintainConnection = FALSE, overw
attributesFileName <- tempfile(tmpdir = andromedaTempFolder, fileext = ".rds")
saveRDS(attribs, attributesFileName)


if (maintainConnection) {
# Can't zip while connected, so make copy:
tempFileName <- tempfile(tmpdir = andromedaTempFolder, fileext = ".sqlite")
RSQLite::sqliteCopyDatabase(andromeda, tempFileName)
# Can't zip while connected so make a copy
andromedaCopy <- copyAndromeda(andromeda)
tempFileName <- andromedaCopy@driver@dbdir
duckdb::dbDisconnect(andromedaCopy, shutdown = TRUE)
file.exists(tempFileName)
zip::zipr(fileName, c(attributesFileName, tempFileName), compression_level = 2)
unlink(tempFileName)
close(andromedaCopy)
} else {
RSQLite::dbDisconnect(andromeda)
duckdb::dbDisconnect(andromeda, shutdown = TRUE)
zip::zipr(fileName, c(attributesFileName, andromeda@dbname), compression_level = 2)
unlink(andromeda@dbname)
close(andromeda)
inform("Disconnected Andromeda. This data object can no longer be used")
}
unlink(attributesFileName)
Expand All @@ -96,6 +99,8 @@ saveAndromeda <- function(andromeda, fileName, maintainConnection = FALSE, overw
#' Load Andromeda from file
#'
#' @param fileName The path where the object was saved using [`saveAndromeda()`].
#' @param options A list containing Andromeda options. Currently the only supported option is 'threads'.
#' Setting `options = list(threads = 10)` will set the database used by Andromeda to use 10 threads.
#'
#' @seealso
#' [`saveAndromeda()`]
Expand All @@ -120,13 +125,12 @@ saveAndromeda <- function(andromeda, fileName, maintainConnection = FALSE, overw
#' unlink(fileName)
#'
#' @export
#' @import hms
loadAndromeda <- function(fileName) {
loadAndromeda <- function(fileName, options = list()) {
if (!file.exists(fileName)) {
abort(sprintf("File %s does not exist", fileName))
}
fileNamesInZip <- utils::unzip(fileName, list = TRUE)$Name
sqliteFilenameInZip <- fileNamesInZip[grepl(".sqlite$", fileNamesInZip)]
duckdbFilenameInZip <- fileNamesInZip[grepl(".duckdb$", fileNamesInZip)]
rdsFilenameInZip <- fileNamesInZip[grepl(".rds$", fileNamesInZip)]

andromedaTempFolder <- .getAndromedaTempFolder()
Expand All @@ -139,23 +143,16 @@ loadAndromeda <- function(fileName) {
zip::unzip(fileName, exdir = tempDir)

# Rename unzipped files:
newFileName <- tempfile(tmpdir = andromedaTempFolder, fileext = ".sqlite")
file.rename(file.path(tempDir, sqliteFilenameInZip), newFileName)
newFileName <- tempfile(tmpdir = andromedaTempFolder, fileext = ".duckdb")
file.rename(file.path(tempDir, duckdbFilenameInZip), newFileName)
attributes <- readRDS(file.path(tempDir, rdsFilenameInZip))
andromeda <- RSQLite::dbConnect(RSQLite::SQLite(), newFileName, extended_types = TRUE)
finalizer <- function(ptr) {
# Suppress R Check note:
missing(ptr)
close(andromeda)
}
reg.finalizer(andromeda@ptr, finalizer, onexit = TRUE)
for (name in names(attributes)) {

andromeda <- .createAndromeda(dbdir = newFileName, options = options)

# Restore user defined attributes that don't overwrite Andromeda's reserved attributes
for (name in dplyr::setdiff(names(attributes), names(attributes(andromeda)))) {
attr(andromeda, name) <- attributes[[name]]
}
RSQLite::dbExecute(andromeda, "PRAGMA journal_mode = OFF")
RSQLite::dbExecute(andromeda, sprintf("PRAGMA temp_store_directory = '%s'", andromedaTempFolder))
class(andromeda) <- "Andromeda"
attr(class(andromeda), "package") <- "Andromeda"
return(andromeda)
}

Expand All @@ -169,7 +166,7 @@ loadAndromeda <- function(fileName) {
if (is.null(andromeda)) {
folder <- .getAndromedaTempFolder()
} else {
folder <- dirname(andromeda@dbname)
folder <- dirname(andromeda@driver@dbdir)
}
if (exists("lowDiskWarnings", envir = andromedaGlobalEnv)) {
lowDiskWarnings <- get("lowDiskWarnings", envir = andromedaGlobalEnv)
Expand Down Expand Up @@ -224,8 +221,8 @@ loadAndromeda <- function(fileName) {
#' #123.456
#'
#' @export
getAndromedaTempDiskSpace <- function(andromeda = NULL) {
if (!is.null(andromeda) && !inherits(andromeda, "SQLiteConnection"))
getAndromedaTempDiskSpace <- function(andromeda = NULL) {
if (!is.null(andromeda) && !inherits(andromeda, "duckdb_connection"))
abort("Andromeda argument must be of type 'Andromeda'.")

# Using Java because no cross-platform functions available in R:
Expand All @@ -235,7 +232,7 @@ getAndromedaTempDiskSpace <- function(andromeda = NULL) {
if (is.null(andromeda)) {
folder <- .getAndromedaTempFolder()
} else {
folder <- dirname(andromeda@dbname)
folder <- dirname(andromeda@driver@dbdir)
}
space <- tryCatch({
rJava::.jinit()
Expand All @@ -256,3 +253,13 @@ getAndromedaTempDiskSpace <- function(andromeda = NULL) {
error = function(e) NA)
return(!is.na(installedVersion))
}

# get the user defined attributes of an andromeda object as a named list
.userDefinedAttributes <- function(andromeda) {
attribs <- attributes(andromeda)
for (name in slotNames(andromeda)) {
attribs[[name]] <- NULL
}
attribs[["class"]] <- NULL
attribs
}
Loading

0 comments on commit 4ce9fdc

Please sign in to comment.