Skip to content

Commit

Permalink
Support caching of bookkeeping files locally for cloud workspaces/arr…
Browse files Browse the repository at this point in the history
…ays (#177)

* Support caching of bookkeeping files locally for cloud workspaces/arrays
* remove azurite for now, needs new certificates
* Codecov now needs token even for public repositories
* Test TILEDB_CACHE and TILEDB_BOOKKEEPING_STATS flows
* Add tests for code coverage
* Refactor and expose cache_fragment_metadata() as api (#178)
* Add cache_fragment_metadata to TileDBUtils to expose this functionality as api
* Don't leak file handles in error conditions
* Copyright update
  • Loading branch information
nalinigans authored Nov 23, 2024
1 parent 62a27ad commit 41f88bb
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 149 deletions.
5 changes: 3 additions & 2 deletions .github/scripts/run_dfs_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ run_azure_tests() {
echo "az schema storage buffer test" &&
$CMAKE_BUILD_DIR/test/test_storage_buffer --test-dir "az://$AZURE_CONTAINER_NAME@$AZURE_STORAGE_ACCOUNT.blob.core.windows.net/$TEST" &&
echo "az schema examples" &&
time $GITHUB_WORKSPACE/examples/run_examples.sh "az://$AZURE_CONTAINER_NAME@$AZURE_STORAGE_ACCOUNT.blob.core.windows.net/$TEST" &&
TILEDB_CACHE=1 time $GITHUB_WORKSPACE/examples/run_examples.sh "az://$AZURE_CONTAINER_NAME@$AZURE_STORAGE_ACCOUNT.blob.core.windows.net/$TEST" &&
echo "az schema small size storage test" &&
(TILEDB_MAX_STREAM_SIZE=32 $CMAKE_BUILD_DIR/test/test_azure_blob_storage --test-dir "az://$AZURE_CONTAINER_NAME@$AZURE_STORAGE_ACCOUNT.blob.core.windows.net/$TEST" [read-write-small] || echo "az schema small size storage test failed") &&
echo "Running with $1 DONE" &&
Expand Down Expand Up @@ -119,7 +119,8 @@ elif [[ $INSTALL_TYPE == azurite ]]; then
$CMAKE_BUILD_DIR/test/test_storage_buffer --test-dir "az://[email protected]/$TEST" &&
AZURE_STORAGE_ACCOUNT=$TEMP_VAR
$GITHUB_WORKSPACE/examples/run_examples.sh "az://[email protected]/$TEST" &&
$GITHUB_WORKSPACE/examples/run_examples.sh "azb://test/$TEST?account=devstoreaccount1"
$GITHUB_WORKSPACE/examples/run_examples.sh "azb://test/$TEST?account=devstoreaccount1" &&
TILEDB_CACHE=1 $GITHUB_WORKSPACE/examples/run_examples.sh "az://test/$TEST?account=devstoreaccount1"

elif [[ $INSTALL_TYPE == aws ]]; then
TILEDB_BENCHMARK=1
Expand Down
35 changes: 15 additions & 20 deletions .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,19 @@ jobs:
matrix:
os: [ubuntu-20.04]
# All supported types
type: [basic, basic-no-hdfs, basic-codec, hdfs, gcs, azure, azurite, aws]
# type: [basic, basic-no-hdfs, basic-codec, hdfs, gcs, azure, azurite, aws]
type: [basic, basic-no-hdfs, basic-codec, hdfs, gcs, azure, aws]
include:
- os: macos-11
- os: macos-13
type: basic
openssl-version: 1.1
- os: macos-11
type: basic
openssl-version: 3
- os: macos-12
type: basic
openssl-version: 1.1
- os: macos-12
- os: macos-13
type: basic
openssl-version: 3
- os: ubuntu-22.04
type: basic
- os: ubuntu-22.04
type: azurite
#- os: ubuntu-22.04
#type: azurite
- os: ubuntu-22.04
type: aws
- os: ubuntu-22.04
Expand All @@ -79,7 +74,7 @@ jobs:
runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Create Build Environment
shell: bash
Expand All @@ -90,27 +85,27 @@ jobs:
mkdir -p /usr/local/share/vcpkg/ports
- name: Cache AWS SDK
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/awssdk-install
key: awssdk-${{env.AWSSDK_VER}}-${{matrix.os}}-openssl-${{matrix.openssl-version}}-v1

- name: Cache GCS SDK
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/gcssdk-install
key: gcssdk-${{env.GCSSDK_VER}}-${{matrix.os}}-openssl-${{matrix.openssl-version}}-v1

- name: Cache Catch2 artifacts
if: startsWith(matrix.os,'ubuntu')
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/catch2-install
key: catch2-${{env.CATCH2_VER}}-v0-${{matrix.os}}

- name: Cache Distributed FileSystems
if: matrix.type == 'hdfs' || matrix.type == 'gcs'
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ${{runner.workspace}}/hadoop-${{env.HADOOP_VER}}
key: dfs-${{env.HADOOP_VER}}-v1-${{matrix.os}}-openssl-v1
Expand Down Expand Up @@ -164,7 +159,7 @@ jobs:
make -j4
make tests -j 4
export TILEDB_BENCHMARK=1
time test/test_sparse_array_benchmark
time TILEDB_BOOKKEEPING_STATS=1 TILEDB_CACHE=1 test/test_sparse_array_benchmark
export TILEDB_DISABLE_FILE_LOCKING=1
time test/test_sparse_array_benchmark
export TILEDB_KEEP_FILE_HANDLES_OPEN=1
Expand Down Expand Up @@ -198,6 +193,6 @@ jobs:
AWS_TAR: ${{secrets.AWS_TAR}}

- name: Upload Report to CodeCov
uses: codecov/codecov-action@v3
with:
gcov: true
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}}
7 changes: 7 additions & 0 deletions core/EnvironmentVariables.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ The use of these Environment Variables will alter the behavior of TileDB. These

* TILEDB_MAX_STREAM_SIZE
For azure blob storage, use download_blob_to_stream to read lengths < TILEDB_MAX_STREAM_SIZE. If this is not set, the default is 1024 bytes defined in core/include/storage_manager/storage_azure_blob.h.


* TILEDB_CACHE
Cache bookkeeping and other files as necessary

* TILEDB_BOOKKEEPING_STATS
Print memory and time statistics for reading and loading bookkeeping files
4 changes: 3 additions & 1 deletion core/include/c_api/tiledb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*
* @copyright Copyright (c) 2018 Omics Data Automation Inc. and Intel Corporation
* @copyright Copyright (c) 2020-2021 Omics Data Automation Inc.
* @copyright Copyright (c) 2023 dātma, inc™
* @copyright Copyright (c) 2023-2024 dātma, inc™
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -61,6 +61,8 @@ std::vector<std::string> get_array_names(const std::string& workspace);

std::vector<std::string> get_fragment_names(const std::string& workspace);

int cache_fragment_metadata(const std::string& workspace, const std::string& array_name);

bool is_dir(const std::string& dirpath);

std::string real_dir(const std::string& dirpath);
Expand Down
3 changes: 3 additions & 0 deletions core/include/misc/mem_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*
* @copyright Copyright (c) 2016 MIT and Intel Corporation
* @copyright Copyright (c) 2021 Omics Data Automation Inc.
* @copyright Copyright (c) 2024 dātma, inc™
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -33,6 +34,8 @@

#pragma once

void print_rusage(const std::string& msg);

void print_memory_stats(const std::string& msg);

void trim_memory();
14 changes: 14 additions & 0 deletions core/include/misc/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*
* @copyright Copyright (c) 2016 MIT and Intel Corporation
* @copyright Copyright (c) 2021 Omics Data Automation Inc.
* @copyright Copyright (c) 2024 dātma, inc™
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -246,6 +247,19 @@ bool is_hdfs_path(const std::string& pathURL);
*/
bool is_env_set(const std::string& name);

/**
* Given a path, retrieve the last segment(filename)
* @param path to file
* @return last segment of path
*/
std::string get_filename_from_path(const std::string& path);

/**
* Get the book-keeping cache, used with cloud paths generally
* @return the path to the book-keeping cache
*/
std::string get_fragment_metadata_cache_dir();

/**
* Creates a new directory.
*
Expand Down
118 changes: 116 additions & 2 deletions core/src/c_api/tiledb_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*
* @copyright Copyright (c) 2018 Omics Data Automation Inc. and Intel Corporation
* @copyright Copyright (c) 2019-2021 Omics Data Automation Inc.
* @copyright Copyright (c) 2023 dātma, inc™
* @copyright Copyright (c) 2023-2024 dātma, inc™
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -37,14 +37,17 @@
#include "tiledb_utils.h"

#include "codec.h"
#include "error.h"
#include "tiledb_storage.h"
#include "storage_fs.h"
#include "storage_posixfs.h"
#include "utils.h"

#include <cstring>
#include <error.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/file.h>
#include <trace.h>

namespace TileDBUtils {
Expand Down Expand Up @@ -139,6 +142,35 @@ int initialize_workspace(TileDB_CTX **ptiledb_ctx, const std::string& workspace,
} \
} while(false)

#define RETURN_ERRMSG(MSG) \
TILEDB_ERROR(TILEDB_UT_ERRMSG, MSG, tiledb_ut_errmsg); \
strcpy(tiledb_errmsg, tiledb_ut_errmsg.c_str()); \
if (tiledb_ctx) { \
finalize(tiledb_ctx); \
} \
return TILEDB_ERR

#define RETURN_ERRMSG_PATH(MSG, PATH) \
SYSTEM_ERROR(TILEDB_UT_ERRMSG, MSG, PATH, tiledb_ut_errmsg); \
strcpy(tiledb_errmsg, tiledb_ut_errmsg.c_str()); \
if (tiledb_ctx) { \
finalize(tiledb_ctx); \
} \
return TILEDB_ERR

#define RETURN_ERRMSG_RELEASE_ARTIFACTS(MSG, PATH) \
free(buffer); \
unlock_file(fd); \
if (posix_fs.is_file(bookkeeping_path)) { \
posix_fs.delete_file(bookkeeping_path); \
} \
SYSTEM_ERROR(TILEDB_UT_ERRMSG, MSG, PATH, tiledb_ut_errmsg); \
strcpy(tiledb_errmsg, tiledb_ut_errmsg.c_str()); \
if (tiledb_ctx) { \
finalize(tiledb_ctx); \
} \
return TILEDB_ERR;

int create_workspace(const std::string& workspace, bool replace)
{
TileDB_CTX *tiledb_ctx;
Expand Down Expand Up @@ -193,6 +225,87 @@ std::vector<std::string> get_array_names(const std::string& workspace)
return array_names;
}

int lock_file(const std::string& filename) {
int fd;
fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC | O_SYNC, S_IRWXU);
if (fd > 0 && flock(fd, LOCK_EX)) {
close(fd);
return TILEDB_ERR;
}
return fd;
}

int unlock_file(int fd) {
flock(fd, LOCK_UN);
return close(fd);
}

int cache_fragment_metadata(const std::string& workspace, const std::string& array_name)
{
TileDB_CTX *tiledb_ctx;
if (setup(&tiledb_ctx, workspace)) {
FINALIZE;
}

std::vector<std::string> dirs = get_dirs(tiledb_ctx, StorageFS::slashify(workspace)+array_name);
for (std::vector<std::string>::iterator dir = dirs.begin(); dir != dirs.end(); dir++) {
std::string fragment(*dir);
auto bookkeeping_path = StorageFS::slashify(fragment)+TILEDB_BOOK_KEEPING_FILENAME+TILEDB_FILE_SUFFIX+TILEDB_GZIP_SUFFIX;
if (is_file(tiledb_ctx, bookkeeping_path)) {
size_t pos = fragment.find_last_of("\\/");
if (pos != std::string::npos) {
fragment = fragment.substr(pos+1);
}
auto cache = get_fragment_metadata_cache_dir();
auto cached_file = cache + fragment;
PosixFS posix_fs;
if (!posix_fs.is_file(cached_file)) {
if (!posix_fs.is_dir(cache)) {
if (posix_fs.create_dir(cache)) {
RETURN_ERRMSG_PATH("Could not create directory in temp_directory_path", cache);
}
}
int fd = lock_file(cached_file);
if (fd > 0) {
auto chunk = TILEDB_UT_MAX_WRITE_COUNT;
auto buffer = malloc(chunk);
if (!buffer) {
unlock_file(fd);
RETURN_ERRMSG("Out-of-memory exception while allocating memory");
}
auto size = file_size(tiledb_ctx, bookkeeping_path);
if (size == TILEDB_FS_ERR) {
RETURN_ERRMSG_RELEASE_ARTIFACTS("Could not get filesize", bookkeeping_path);
}
auto remaining = size;
size_t nbytes;
while (remaining > 0) {
nbytes = remaining<chunk?remaining:chunk;
if (read_file(tiledb_ctx, bookkeeping_path, size-remaining, buffer, nbytes)) {
RETURN_ERRMSG_RELEASE_ARTIFACTS("Could not read from file", bookkeeping_path);
}
auto written = write(fd, buffer, nbytes);
if (written == -1 || (size_t)written != nbytes) {
RETURN_ERRMSG_RELEASE_ARTIFACTS("Could not write to file", cached_file);
}
remaining-=nbytes;
}
free(buffer);
if (unlock_file(fd)) {
if (posix_fs.is_file(bookkeeping_path)) {
posix_fs.delete_file(bookkeeping_path);
}
RETURN_ERRMSG_PATH("Could not close file", bookkeeping_path);
}
assert(posix_fs.file_size(cached_file) == size);
}
}
}
}
finalize(tiledb_ctx);
return TILEDB_OK;
}

std::vector<std::string> get_fragment_names(const std::string& workspace)
{
TileDB_CTX *tiledb_ctx;
Expand Down Expand Up @@ -470,6 +583,7 @@ int move_across_filesystems(const std::string& src, const std::string& dest)
rc = write_file(tiledb_ctx, dest, buffer, size);
rc |= close_file(tiledb_ctx, dest);
finalize(tiledb_ctx);
free(buffer);
return rc;
}

Expand Down
Loading

0 comments on commit 41f88bb

Please sign in to comment.