Skip to content

Commit

Permalink
Inline shared memory accessors (#1498)
Browse files Browse the repository at this point in the history
* Add comment on relaxed load

* Refactor the gaia::db::memory_manager namespace into the gaia::common and gaia::db namespaces and remove the gaia_memory_manager static lib

* Don't use CAS to mark allocation bits

* Revert "Increase hash bucket count and misc. cleanup (#1425)"

This reverts commit 0d924f6.

* Fix ID index bucket overflow bug

* Restore memory_manager namespace

* Inline shared memory accessors

* Factor out chunk allocation slow path from allocate_object()

* Fix typo

* Add missing file
  • Loading branch information
senderista authored May 2, 2022
1 parent 9db6e8b commit 8ed7f51
Show file tree
Hide file tree
Showing 19 changed files with 236 additions and 189 deletions.
2 changes: 1 addition & 1 deletion production/catalog/src/ddl_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include "gaia_internal/common/hash.hpp"
#include "gaia_internal/common/logger.hpp"
#include "gaia_internal/common/system_table_types.hpp"
#include "gaia_internal/db/db.hpp"
#include "gaia_internal/db/gaia_ptr.hpp"

#include "db_helpers.hpp"
#include "fbs_generator.hpp"
#include "json_generator.hpp"

Expand Down
10 changes: 7 additions & 3 deletions production/db/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ add_library(gaia_db_client STATIC
src/db_client_config.cpp
src/db_client.cpp
src/db_server_instance.cpp
src/db_shared_data_client.cpp
src/exceptions.cpp
src/gaia_ptr_api.cpp
src/gaia_ptr_client.cpp
Expand Down Expand Up @@ -102,6 +101,8 @@ add_library(rocks_wrapper STATIC
src/rdb_wrapper.cpp
src/rdb_object_converter.cpp
src/persistent_store_manager.cpp)
# Add GAIA_DB_SERVER preprocessor definition for conditional includes.
target_compile_definitions(rocks_wrapper PUBLIC GAIA_DB_SERVER=1)
configure_gaia_target(rocks_wrapper)
target_include_directories(rocks_wrapper PRIVATE
"${GAIA_DB_CORE_PUBLIC_INCLUDES}"
Expand All @@ -127,6 +128,8 @@ add_library(gaia_db_persistence STATIC
src/log_file.cpp
src/async_disk_writer.cpp
src/async_write_batch.cpp)
# Add GAIA_DB_SERVER preprocessor definition for conditional includes.
target_compile_definitions(gaia_db_persistence PUBLIC GAIA_DB_SERVER=1)
configure_gaia_target(gaia_db_persistence)
target_include_directories(gaia_db_persistence PRIVATE
"${GAIA_DB_CORE_PUBLIC_INCLUDES}"
Expand All @@ -142,7 +145,6 @@ set(GAIA_DB_SERVER_SOURCES
src/catalog_core.cpp
src/chunk_manager.cpp
src/db_server.cpp
src/db_shared_data_server.cpp
src/exceptions.cpp
src/gaia_ptr_server.cpp
src/gaia_ptr.cpp
Expand All @@ -154,6 +156,8 @@ set(GAIA_DB_SERVER_SOURCES
)

add_library(gaia_db_server ${GAIA_DB_SERVER_SOURCES})
# Add GAIA_DB_SERVER preprocessor definition for conditional includes.
target_compile_definitions(gaia_db_server PUBLIC GAIA_DB_SERVER=1)
add_dependencies(gaia_db_server ${FBS_SOURCE_FILENAMES})
configure_gaia_target(gaia_db_server)
target_include_directories(gaia_db_server PUBLIC
Expand Down Expand Up @@ -263,7 +267,7 @@ target_link_libraries(gaia_db_catalog_test PRIVATE gaia_common gaia_db_client ga
add_gtest(test_type_index tests/test_type_index.cpp "${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_common;gaia_db_client;${LIB_CAP};${LIB_EXPLAIN}")
add_gtest(test_db_client tests/test_db_client.cpp "${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_db_client")
add_gtest(test_db_caches tests/test_db_caches.cpp "${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_db_client")
add_gtest(test_concurrent_db_client tests/test_concurrent_db_client.cpp "${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_db_client")
add_gtest(test_concurrent_db_client tests/test_concurrent_db_client.cpp "${GEN_DIR};${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_db_client")
add_gtest(test_gaia_ptr_api tests/test_gaia_ptr_api.cpp "${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_common;gaia_db_client;gaia_direct;gaia_catalog")
add_gtest(test_env_instance_name tests/test_db_server_env.cpp "${GAIA_DB_CORE_TEST_INCLUDES}" "gaia_common;gaia_db_client;gaia_direct")

Expand Down
5 changes: 5 additions & 0 deletions production/db/core/src/db_client_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,8 @@ void gaia::db::clear_shared_memory()
{
gaia::db::client_t::clear_shared_memory();
}

gaia::db::gaia_txn_id_t gaia::db::get_current_txn_id()
{
return gaia::db::client_t::get_current_txn_id();
}
16 changes: 16 additions & 0 deletions production/db/core/src/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3360,3 +3360,19 @@ void server_t::run(server_config_t server_conf)
}
}
}

bool server_t::acquire_txn_log_reference_from_commit_ts(gaia_txn_id_t commit_ts)
{
ASSERT_PRECONDITION(transactions::txn_metadata_t::is_commit_ts(commit_ts), "Not a commit timestamp!");
gaia_txn_id_t begin_ts = transactions::txn_metadata_t::get_begin_ts_from_commit_ts(commit_ts);
log_offset_t log_offset = transactions::txn_metadata_t::get_txn_log_offset(commit_ts);
return acquire_txn_log_reference(log_offset, begin_ts);
}

void server_t::release_txn_log_reference_from_commit_ts(gaia_txn_id_t commit_ts)
{
ASSERT_PRECONDITION(transactions::txn_metadata_t::is_commit_ts(commit_ts), "Not a commit timestamp!");
gaia_txn_id_t begin_ts = transactions::txn_metadata_t::get_begin_ts_from_commit_ts(commit_ts);
log_offset_t log_offset = transactions::txn_metadata_t::get_txn_log_offset(commit_ts);
release_txn_log_reference(log_offset, begin_ts);
}
1 change: 1 addition & 0 deletions production/db/core/src/db_server_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "gaia_internal/common/config.hpp"
#include "gaia_internal/common/gaia_version.hpp"
#include "gaia_internal/db/db.hpp"

#include "db_server.hpp"

Expand Down
6 changes: 3 additions & 3 deletions production/db/core/src/gaia_ptr_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace db
void gaia_ptr_t::reset()
{
locators_t* locators = get_locators();
client_t::txn_log(m_locator, to_offset(), c_invalid_gaia_offset);
log_txn_operation(m_locator, to_offset(), c_invalid_gaia_offset);

// TODO[GAIAPLAT-445]: We don't expose delete events.
// if (client_t::is_valid_event(to_ptr()->type))
Expand All @@ -62,13 +62,13 @@ gaia_offset_t gaia_ptr_t::to_offset() const
void gaia_ptr_t::finalize_create()
{
WRITE_PROTECT(to_offset());
client_t::txn_log(m_locator, c_invalid_gaia_offset, to_offset());
log_txn_operation(m_locator, c_invalid_gaia_offset, to_offset());
}

void gaia_ptr_t::finalize_update(gaia_offset_t old_offset)
{
WRITE_PROTECT(to_offset());
client_t::txn_log(m_locator, old_offset, to_offset());
log_txn_operation(m_locator, old_offset, to_offset());
}

gaia_ptr_t gaia_ptr_t::create(gaia_id_t id, gaia_type_t type, reference_offset_t references_count, size_t data_size, const void* data)
Expand Down
3 changes: 2 additions & 1 deletion production/db/core/src/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "data_holder.hpp"
#include "db_helpers.hpp"
#include "db_shared_data.hpp"
#include "field_access.hpp"
#include "hash_index.hpp"
#include "range_index.hpp"
Expand Down Expand Up @@ -88,7 +89,7 @@ index_record_t index_builder_t::make_record(
operation != index_record_operation_t::not_set,
"A valid operation should be set in each index record!");

return index_record_t{get_current_txn_id(), locator, offset, operation, 0};
return index_record_t{get_txn_id(), locator, offset, operation, 0};
}

bool index_builder_t::index_exists(common::gaia_id_t index_id)
Expand Down
1 change: 1 addition & 0 deletions production/db/core/src/persistent_store_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "db_helpers.hpp"
#include "db_internal_types.hpp"
#include "db_shared_data.hpp"
#include "rdb_object_converter.hpp"
#include "rdb_wrapper.hpp"

Expand Down
32 changes: 13 additions & 19 deletions production/db/inc/core/db_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@

#include "chunk_manager.hpp"
#include "client_messenger.hpp"
#include "db_shared_data.hpp"
#include "mapped_data.hpp"
#include "memory_manager.hpp"
#include "messages_generated.h"
#include "type_index.hpp"

namespace gaia
{
Expand All @@ -39,14 +40,17 @@ namespace query_processor
class db_client_proxy_t;
} // namespace query_processor

// For declarations of friend functions.
#include "db_shared_data_interface.inc"

class client_t
{
/**
* @throws no_open_transaction_internal if there is no open transaction.
*/
friend gaia::db::locators_t* gaia::db::get_locators();
friend gaia_txn_id_t gaia::db::get_current_txn_id();
friend gaia::db::txn_log_t* gaia::db::get_txn_log();
friend gaia::db::txn_log_t* get_txn_log_from_offset(gaia::db::log_offset_t offset);

/**
* @throws no_open_session_internal if there is no open session.
Expand All @@ -59,24 +63,20 @@ class client_t
friend gaia::db::index::indexes_t* gaia::db::get_indexes();
friend gaia::db::memory_manager::memory_manager_t* gaia::db::get_memory_manager();
friend gaia::db::memory_manager::chunk_manager_t* gaia::db::get_chunk_manager();
friend gaia::db::gaia_txn_id_t gaia::db::get_txn_id();

friend class gaia::db::query_processor::db_client_proxy_t;

public:
// These functions are exported from gaia_internal/db/db.hpp.
static inline gaia_txn_id_t get_current_txn_id();
static void clear_shared_memory();
static inline void set_commit_trigger(triggers::commit_trigger_fn trigger_fn);

// These functions are exported from and documented in db.hpp.
static inline bool is_session_open();
static inline bool is_ddl_session_open();
static inline bool is_transaction_open();

/**
* Called by the rules engine only during initialization and
* shutdown.
*/
static inline void set_commit_trigger(triggers::commit_trigger_fn trigger_fn);

// This test-only function is exported from gaia_internal/db/db.hpp.
static void clear_shared_memory();

// These public functions are exported from and documented in db.hpp.
static void begin_session(config::session_options_t session_options);
static void end_session();
static void begin_transaction();
Expand All @@ -102,12 +102,6 @@ class client_t
static inline void verify_session_active();
static inline void verify_no_session();

// Called by internal code to log transactional updates.
static inline void txn_log(
gaia_locator_t locator,
gaia_offset_t old_offset,
gaia_offset_t new_offset);

// Called by internal code to log events for rule invocations.
static inline bool is_valid_event(common::gaia_type_t type);
static inline void log_event(
Expand Down
25 changes: 5 additions & 20 deletions production/db/inc/core/db_client.inc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ inline bool client_t::is_transaction_open()
return (s_private_locators.is_set());
}

inline gaia_txn_id_t client_t::get_current_txn_id()
{
return s_txn_id;
}

inline void client_t::set_commit_trigger(triggers::commit_trigger_fn trigger_fn)
{
s_txn_commit_trigger = trigger_fn;
Expand Down Expand Up @@ -65,26 +70,6 @@ inline void client_t::verify_no_session()
}
}

inline void client_t::txn_log(
gaia_locator_t locator,
gaia_offset_t old_offset,
gaia_offset_t new_offset)
{
txn_log_t* txn_log = get_txn_log();
if (txn_log->record_count == c_max_log_records)
{
throw transaction_object_limit_exceeded_internal();
}

// Initialize the new record and increment the record count.
auto& lr = txn_log->log_records[txn_log->record_count++];
// The log record sequence should start at 0.
lr.sequence = txn_log->record_count - 1;
lr.locator = locator;
lr.old_offset = old_offset;
lr.new_offset = new_offset;
}

// This generator wraps a socket which reads a stream of values of `T_element_type` from the server.
template <typename T_element_type>
std::function<std::optional<T_element_type>()>
Expand Down
86 changes: 55 additions & 31 deletions production/db/inc/core/db_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ inline void apply_log_to_locators(locators_t* locators, txn_log_t* txn_log, size
}
}

inline gaia::db::txn_log_t* get_txn_log_from_offset(log_offset_t offset)
{
DEBUG_ASSERT_PRECONDITION(offset.is_valid(), "Txn log offset is invalid!");
gaia::db::logs_t* logs = gaia::db::get_logs();
return &((*logs)[offset]);
}

inline void apply_log_from_offset(locators_t* locators, log_offset_t log_offset, size_t starting_log_record_index = 0)
{
txn_log_t* txn_log = get_txn_log_from_offset(log_offset);
Expand All @@ -179,6 +172,36 @@ inline index::db_index_t id_to_index(common::gaia_id_t index_id)
return (it != get_indexes()->end()) ? it->second : nullptr;
}

// This method exists purely to isolate the chunk allocation slow path from
// allocate_object(), so that it can be more easily inlined.
inline void allocate_new_chunk(
memory_manager::memory_manager_t* memory_manager,
memory_manager::chunk_manager_t* chunk_manager)
{
if (chunk_manager->initialized())
{
// The current chunk is out of memory, so retire it and allocate a new chunk.
// In case it is already empty, try to deallocate it after retiring it.

// Get the session's chunk version for safe deallocation.
chunk_version_t version = chunk_manager->get_version();
// Now retire the chunk.
chunk_manager->retire_chunk(version);
// Release ownership of the chunk.
chunk_manager->release();
}

// Allocate a new chunk.
chunk_offset_t new_chunk_offset = memory_manager->allocate_chunk();
if (!new_chunk_offset.is_valid())
{
throw memory_allocation_error_internal();
}

// Initialize the new chunk.
chunk_manager->initialize(new_chunk_offset);
}

// Allocate an object from the "data" shared memory segment.
// The `size` argument *does not* include the object header size!
inline void allocate_object(
Expand All @@ -193,30 +216,10 @@ inline void allocate_object(
gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size);
if (!object_offset.is_valid())
{
if (chunk_manager->initialized())
{
// The current chunk is out of memory, so retire it and allocate a new chunk.
// In case it is already empty, try to deallocate it after retiring it.

// Get the session's chunk version for safe deallocation.
chunk_version_t version = chunk_manager->get_version();
// Now retire the chunk.
chunk_manager->retire_chunk(version);
// Release ownership of the chunk.
chunk_manager->release();
}

// Allocate a new chunk.
chunk_offset_t new_chunk_offset = memory_manager->allocate_chunk();
if (!new_chunk_offset.is_valid())
{
throw memory_allocation_error_internal();
}

// Initialize the new chunk.
chunk_manager->initialize(new_chunk_offset);

// Allocate from new chunk.
// Initialize the chunk manager with a new chunk.
allocate_new_chunk(memory_manager, chunk_manager);

// Allocate from the new chunk.
object_offset = chunk_manager->allocate(size + c_db_object_header_size);
}

Expand All @@ -228,6 +231,27 @@ inline void allocate_object(
update_locator(locator, object_offset);
}

// Record a transactional operation in the txn log.
inline void log_txn_operation(
gaia_locator_t locator,
gaia_offset_t old_offset,
gaia_offset_t new_offset)
{
txn_log_t* txn_log = get_txn_log();
if (txn_log->record_count == c_max_log_records)
{
throw transaction_object_limit_exceeded_internal();
}

// Initialize the new record and increment the record count.
auto& lr = txn_log->log_records[txn_log->record_count++];
// The log record sequence should start at 0.
lr.sequence = txn_log->record_count - 1;
lr.locator = locator;
lr.old_offset = old_offset;
lr.new_offset = new_offset;
}

inline bool acquire_txn_log_reference(log_offset_t log_offset, gaia_txn_id_t begin_ts)
{
txn_log_t* txn_log = get_txn_log_from_offset(log_offset);
Expand Down
Loading

0 comments on commit 8ed7f51

Please sign in to comment.