Skip to content

Commit

Permalink
Merge pull request #36 from kawbo/feature/add_fifo_with_ttl_support
Browse files Browse the repository at this point in the history
Add support for fifo compaction-style options
  • Loading branch information
ieQu1 authored Jan 8, 2025
2 parents d695c6e + a5ef1a7 commit bdca875
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ REBAR ?= $(shell which rebar3 2>/dev/null)
TEST_DIR=$(CURDIR)/test


TEST_MODULES="db,db_range,iterators,batch,snapshot,column_family,batch,cache,blob_db,checkpoint,db_backup,cleanup,in_mem,merge,rate_limiter,sst_file_manager,transaction,transaction_log,ttl,write_buffer_manager,statistics"
TEST_MODULES="db,db_range,iterators,batch,snapshot,column_family,batch,cache,blob_db,checkpoint,db_backup,cleanup,in_mem,merge,rate_limiter,sst_file_manager,transaction,transaction_log,ttl,write_buffer_manager,statistics,fifo_compaction"

TEST_ALL_MODULES="${TEST_MODULES},compression"

Expand Down
9 changes: 9 additions & 0 deletions c_src/atoms.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ extern ERL_NIF_TERM ATOM_STRATEGY;
extern ERL_NIF_TERM ATOM_MAX_DICT_BYTES;
extern ERL_NIF_TERM ATOM_ZSTD_MAX_TRAIN_BYTES;

// Related to FIFO compaction with TTL
extern ERL_NIF_TERM ATOM_TTL;

extern ERL_NIF_TERM ATOM_NUM_LEVELS;
extern ERL_NIF_TERM ATOM_LEVEL0_FILE_NUM_COMPACTION_TRIGGER;
extern ERL_NIF_TERM ATOM_LEVEL0_SLOWDOWN_WRITES_TRIGGER;
Expand All @@ -108,12 +111,18 @@ extern ERL_NIF_TERM ATOM_TABLE_FACTORY_BLOCK_CACHE_SIZE;
extern ERL_NIF_TERM ATOM_IN_MEMORY_MODE;
extern ERL_NIF_TERM ATOM_IN_MEMORY;
extern ERL_NIF_TERM ATOM_BLOCK_BASED_TABLE_OPTIONS;
extern ERL_NIF_TERM ATOM_COMPACTION_OPTIONS_FIFO;
extern ERL_NIF_TERM ATOM_ALLOW_CONCURRENT_MEMTABLE_WRITE;
extern ERL_NIF_TERM ATOM_ENABLE_WRITE_THREAD_ADAPTATIVE_YIELD;
extern ERL_NIF_TERM ATOM_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES;
extern ERL_NIF_TERM ATOM_OPTIMIZE_FILTERS_FOR_HITS;
extern ERL_NIF_TERM ATOM_PREFIX_EXTRACTOR;

// Related to COMPACTION_OPTIONS_FIFO
extern ERL_NIF_TERM ATOM_ALLOW_COMPACTION;
extern ERL_NIF_TERM ATOM_AGE_FOR_WARM;
extern ERL_NIF_TERM ATOM_MAX_TABLE_FILES_SIZE;

// Related to DBOptions
extern ERL_NIF_TERM ATOM_TOTAL_THREADS;
extern ERL_NIF_TERM ATOM_CREATE_IF_MISSING;
Expand Down
17 changes: 17 additions & 0 deletions c_src/erocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ ERL_NIF_TERM ATOM_STRATEGY;
ERL_NIF_TERM ATOM_MAX_DICT_BYTES;
ERL_NIF_TERM ATOM_ZSTD_MAX_TRAIN_BYTES;

// FIFO's compaction style TTL
ERL_NIF_TERM ATOM_TTL;

ERL_NIF_TERM ATOM_NUM_LEVELS;
ERL_NIF_TERM ATOM_LEVEL0_FILE_NUM_COMPACTION_TRIGGER;
ERL_NIF_TERM ATOM_LEVEL0_SLOWDOWN_WRITES_TRIGGER;
Expand All @@ -276,10 +279,16 @@ ERL_NIF_TERM ATOM_TABLE_FACTORY_BLOCK_CACHE_SIZE;
ERL_NIF_TERM ATOM_IN_MEMORY_MODE;
ERL_NIF_TERM ATOM_IN_MEMORY;
ERL_NIF_TERM ATOM_BLOCK_BASED_TABLE_OPTIONS;
ERL_NIF_TERM ATOM_COMPACTION_OPTIONS_FIFO;
ERL_NIF_TERM ATOM_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES;
ERL_NIF_TERM ATOM_OPTIMIZE_FILTERS_FOR_HITS;
ERL_NIF_TERM ATOM_PREFIX_EXTRACTOR;

// Related to COMPACTION_OPTIONS_FIFO
ERL_NIF_TERM ATOM_ALLOW_COMPACTION;
ERL_NIF_TERM ATOM_AGE_FOR_WARM;
ERL_NIF_TERM ATOM_MAX_TABLE_FILES_SIZE;

// Related to DBOptions
ERL_NIF_TERM ATOM_TOTAL_THREADS;
ERL_NIF_TERM ATOM_CREATE_IF_MISSING;
Expand Down Expand Up @@ -606,6 +615,8 @@ try
ATOM(erocksdb::ATOM_MAX_DICT_BYTES, "max_dict_bytes");
ATOM(erocksdb::ATOM_ZSTD_MAX_TRAIN_BYTES, "zstd_max_train_bytes");

ATOM(erocksdb::ATOM_TTL, "ttl");

ATOM(erocksdb::ATOM_NUM_LEVELS, "num_levels");
ATOM(erocksdb::ATOM_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, "level0_file_num_compaction_trigger");
ATOM(erocksdb::ATOM_LEVEL0_SLOWDOWN_WRITES_TRIGGER, "level0_slowdown_writes_trigger");
Expand All @@ -627,10 +638,16 @@ try
ATOM(erocksdb::ATOM_IN_MEMORY_MODE, "in_memory_mode");
ATOM(erocksdb::ATOM_IN_MEMORY, "in_memory");
ATOM(erocksdb::ATOM_BLOCK_BASED_TABLE_OPTIONS, "block_based_table_options");
ATOM(erocksdb::ATOM_COMPACTION_OPTIONS_FIFO, "compaction_options_fifo");
ATOM(erocksdb::ATOM_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES, "level_compaction_dynamic_level_bytes");
ATOM(erocksdb::ATOM_OPTIMIZE_FILTERS_FOR_HITS, "optimize_filters_for_hits");
ATOM(erocksdb::ATOM_PREFIX_EXTRACTOR, "prefix_extractor");

// Related to COMPACTION_OPTIONS_FIFO
ATOM(erocksdb::ATOM_ALLOW_COMPACTION, "allow_compaction");
ATOM(erocksdb::ATOM_AGE_FOR_WARM, "age_for_warm");
ATOM(erocksdb::ATOM_MAX_TABLE_FILES_SIZE, "max_table_files_size");

// Related to DBOptions
ATOM(erocksdb::ATOM_TOTAL_THREADS, "total_threads");
ATOM(erocksdb::ATOM_CREATE_IF_MISSING, "create_if_missing");
Expand Down
43 changes: 43 additions & 0 deletions c_src/erocksdb_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@
#include "bitset_merge_operator.h"
#include "counter_merge_operator.h"

ERL_NIF_TERM parse_fifo_option(ErlNifEnv* env, ERL_NIF_TERM item, rocksdb::CompactionOptionsFIFO& fifo_opts)
{
int arity;
const ERL_NIF_TERM* tuple;

if (enif_get_tuple(env, item, &arity, &tuple) && arity == 2)
{
if (tuple[0] == erocksdb::ATOM_MAX_TABLE_FILES_SIZE)
{
ErlNifUInt64 max_size;
if (enif_get_uint64(env, tuple[1], &max_size))
{
fifo_opts.max_table_files_size = max_size;
}
}
else if (tuple[0] == erocksdb::ATOM_ALLOW_COMPACTION)
{
fifo_opts.allow_compaction = (tuple[1] == erocksdb::ATOM_TRUE);
}
else if (tuple[0] == erocksdb::ATOM_AGE_FOR_WARM)
{
ErlNifUInt64 age_for_warm;
if (enif_get_uint64(env, tuple[1], &age_for_warm))
{
fifo_opts.age_for_warm = static_cast<int64_t>(age_for_warm);
}
}
}

return erocksdb::ATOM_OK;
}

ERL_NIF_TERM parse_bbt_option(ErlNifEnv* env, ERL_NIF_TERM item, rocksdb::BlockBasedTableOptions& opts) {
int arity;
const ERL_NIF_TERM* option;
Expand Down Expand Up @@ -515,6 +547,12 @@ ERL_NIF_TERM parse_cf_option(ErlNifEnv* env, ERL_NIF_TERM item, rocksdb::ColumnF
else
opts.bottommost_compression_opts = compression_opts;
}
else if (option[0] == erocksdb::ATOM_TTL)
{
ErlNifUInt64 ttl;
if (enif_get_uint64(env, option[1], &ttl))
opts.ttl = ttl;
}
else if (option[0] == erocksdb::ATOM_NUM_LEVELS)
{
int num_levels;
Expand Down Expand Up @@ -627,6 +665,11 @@ ERL_NIF_TERM parse_cf_option(ErlNifEnv* env, ERL_NIF_TERM item, rocksdb::ColumnF
fold(env, option[1], parse_bbt_option, bbtOpts);
opts.table_factory = std::shared_ptr<rocksdb::TableFactory>(rocksdb::NewBlockBasedTableFactory(bbtOpts));
}
else if (option[0] == erocksdb::ATOM_COMPACTION_OPTIONS_FIFO) {
rocksdb::CompactionOptionsFIFO fifo_opts;
fold(env, option[1], parse_fifo_option, fifo_opts);
opts.compaction_options_fifo = fifo_opts;
}
else if (option[0] == erocksdb::ATOM_IN_MEMORY_MODE)
{
if (option[1] == erocksdb::ATOM_TRUE)
Expand Down
9 changes: 8 additions & 1 deletion doc/rocksdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ block_based_table_options() = [{no_block_cache, boolean()} | {block_size, pos_in
</code></pre>


### <a name="type-compaction_options_fifo">compaction_options_fifo()</a> ###


<pre><code>
compaction_options_fifo() = [{allow_compaction, boolean()} | {age_for_warm, non_neg_integer()} | {max_table_files_size, pos_integer()}]
</code></pre>



### <a name="type-cache_handle">cache_handle()</a> ###
Expand Down Expand Up @@ -101,7 +108,7 @@ __abstract datatype__: `cf_handle()`


<pre><code>
cf_options() = [{block_cache_size_mb_for_point_lookup, non_neg_integer()} | {memtable_memory_budget, pos_integer()} | {write_buffer_size, pos_integer()} | {max_write_buffer_number, pos_integer()} | {min_write_buffer_number_to_merge, pos_integer()} | {enable_blob_files, boolean()} | {min_blob_size, non_neg_integer()} | {blob_file_size, non_neg_integer()} | {blob_compression_type, <a href="#type-compression_type">compression_type()</a>} | {enable_blob_garbage_collection, boolean()} | {blob_garbage_collection_age_cutoff, float()} | {blob_garbage_collection_force_threshold, float()} | {blob_compaction_readahead_size, non_neg_integer()} | {blob_file_starting_level, non_neg_integer()} | {blob_cache, <a href="#type-cache_handle">cache_handle()</a>} | {prepopulate_blob_cache, <a href="#type-prepopulate_blob_cache">prepopulate_blob_cache()</a>} | {compression, <a href="#type-compression_type">compression_type()</a>} | {bottommost_compression, <a href="#type-compression_type">compression_type()</a>} | {compression_opts, <a href="#type-compression_opts">compression_opts()</a>} | {bottommost_compression_opts, <a href="#type-compression_opts">compression_opts()</a>} | {num_levels, pos_integer()} | {level0_file_num_compaction_trigger, integer()} | {level0_slowdown_writes_trigger, integer()} | {level0_stop_writes_trigger, integer()} | {target_file_size_base, pos_integer()} | {target_file_size_multiplier, pos_integer()} | {max_bytes_for_level_base, pos_integer()} | {max_bytes_for_level_multiplier, pos_integer()} | {max_compaction_bytes, pos_integer()} | {arena_block_size, integer()} | {disable_auto_compactions, boolean()} | {compaction_style, <a href="#type-compaction_style">compaction_style()</a>} | {compaction_pri, <a href="#type-compaction_pri">compaction_pri()</a>} | {filter_deletes, boolean()} | {max_sequential_skip_in_iterations, pos_integer()} | {inplace_update_support, boolean()} | {inplace_update_num_locks, pos_integer()} | {table_factory_block_cache_size, pos_integer()} | {in_memory_mode, boolean()} | {block_based_table_options, <a href="#type-block_based_table_options">block_based_table_options()</a>} | {level_compaction_dynamic_level_bytes, boolean()} | {optimize_filters_for_hits, boolean()} | {prefix_extractor, {fixed_prefix_transform, integer()} | {capped_prefix_transform, integer()}} | {merge_operator, <a href="#type-merge_operator">merge_operator()</a>}]
cf_options() = [{ttl, non_neg_integer()} | {block_cache_size_mb_for_point_lookup, non_neg_integer()} | {memtable_memory_budget, pos_integer()} | {write_buffer_size, pos_integer()} | {max_write_buffer_number, pos_integer()} | {min_write_buffer_number_to_merge, pos_integer()} | {enable_blob_files, boolean()} | {min_blob_size, non_neg_integer()} | {blob_file_size, non_neg_integer()} | {blob_compression_type, <a href="#type-compression_type">compression_type()</a>} | {enable_blob_garbage_collection, boolean()} | {blob_garbage_collection_age_cutoff, float()} | {blob_garbage_collection_force_threshold, float()} | {blob_compaction_readahead_size, non_neg_integer()} | {blob_file_starting_level, non_neg_integer()} | {blob_cache, <a href="#type-cache_handle">cache_handle()</a>} | {prepopulate_blob_cache, <a href="#type-prepopulate_blob_cache">prepopulate_blob_cache()</a>} | {compression, <a href="#type-compression_type">compression_type()</a>} | {bottommost_compression, <a href="#type-compression_type">compression_type()</a>} | {compression_opts, <a href="#type-compression_opts">compression_opts()</a>} | {bottommost_compression_opts, <a href="#type-compression_opts">compression_opts()</a>} | {num_levels, pos_integer()} | {level0_file_num_compaction_trigger, integer()} | {level0_slowdown_writes_trigger, integer()} | {level0_stop_writes_trigger, integer()} | {target_file_size_base, pos_integer()} | {target_file_size_multiplier, pos_integer()} | {max_bytes_for_level_base, pos_integer()} | {max_bytes_for_level_multiplier, pos_integer()} | {max_compaction_bytes, pos_integer()} | {arena_block_size, integer()} | {disable_auto_compactions, boolean()} | {compaction_style, <a href="#type-compaction_style">compaction_style()</a>} | {compaction_pri, <a href="#type-compaction_pri">compaction_pri()</a>} | {filter_deletes, boolean()} | {max_sequential_skip_in_iterations, pos_integer()} | {inplace_update_support, boolean()} | {inplace_update_num_locks, pos_integer()} | {table_factory_block_cache_size, pos_integer()} | {in_memory_mode, boolean()} | {compaction_options_fifo, <a href="#type-compaction_options_fifo">compaction_options_fifo()</a>} | {block_based_table_options, <a href="#type-block_based_table_options">block_based_table_options()</a>} | {level_compaction_dynamic_level_bytes, boolean()} | {optimize_filters_for_hits, boolean()} | {prefix_extractor, {fixed_prefix_transform, integer()} | {capped_prefix_transform, integer()}} | {merge_operator, <a href="#type-merge_operator">merge_operator()</a>}]
</code></pre>


Expand Down
6 changes: 6 additions & 0 deletions src/rocksdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@
{format_version, 0 | 1 | 2 | 3 | 4 | 5} |
{cache_index_and_filter_blocks, boolean()}].

-type compaction_options_fifo() :: [{allow_compaction, boolean()} |
{age_for_warm, non_neg_integer()} |
{max_table_files_size, pos_integer()}].

-type merge_operator() :: erlang_merge_operator |
bitset_merge_operator |
{bitset_merge_operator, non_neg_integer()} |
Expand All @@ -298,6 +302,7 @@
{enable_blob_files, boolean()} |
{min_blob_size, non_neg_integer()} |
{blob_file_size, non_neg_integer()} |
{ttl, non_neg_integer()} |
{blob_compression_type, compression_type()} |
{enable_blob_garbage_collection, boolean()} |
{blob_garbage_collection_age_cutoff, float()} |
Expand Down Expand Up @@ -330,6 +335,7 @@
{table_factory_block_cache_size, pos_integer()} |
{in_memory_mode, boolean()} |
{block_based_table_options, block_based_table_options()} |
{compaction_options_fifo, compaction_options_fifo()} |
{level_compaction_dynamic_level_bytes, boolean()} |
{optimize_filters_for_hits, boolean()} |
{prefix_extractor, {fixed_prefix_transform, integer()} | 
Expand Down
17 changes: 5 additions & 12 deletions test/blob_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

basic_test() ->
?rm_rf("test_blobdb"),
{ok, Db} =
{ok, Db} =
rocksdb:open(
"test_blobdb",
"test_blobdb",
[{create_if_missing, true}
,{enable_blob_files, true}
,{min_blob_size, 0}
Expand All @@ -28,9 +28,9 @@ basic_test() ->
cache_test() ->
?rm_rf("test_cacheblobdb"),
{ok, CHandle} = rocksdb:new_cache(lru, 2097152),
{ok, Db} =
{ok, Db} =
rocksdb:open(
"test_cacheblobdb",
"test_cacheblobdb",
[{create_if_missing, true}
,{enable_blob_files, true}
,{blob_cache, CHandle}]
Expand All @@ -41,13 +41,6 @@ cache_test() ->
{ok, <<"blob_value">>} = rocksdb:get(Db, <<"key">>, [])
after
ok = rocksdb:close(Db),
?rm_rf("test_blobdb")
?rm_rf("test_cacheblobdb")
end,
ok.







96 changes: 96 additions & 0 deletions test/fifo_compaction.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
%% https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style

-module(fifo_compaction).
-include_lib("eunit/include/eunit.hrl").

-define(rm_rf(Dir), rocksdb_test_util:rm_rf(Dir)).

no_fifo_compaction_test() ->
DbName = "erocksdb.fifo.default",
?rm_rf(DbName),
{ok, Db} = rocksdb:open(
DbName,
[
{create_if_missing, true}
]
),
%% Check LOG file to see if the compaction_style is not FIFO
{ok, Log0} = file:read_file(lists:concat([DbName, "/LOG"])),
?assertMatch(nomatch, re:run(Log0, "Options.compaction_style: kCompactionStyleFIFO")),
ok = rocksdb:close(Db),
?rm_rf(DbName).

fifo_compaction_with_ttl_4_test() ->
test_with_ttl(4).

fifo_compaction_with_ttl_8_test() ->
test_with_ttl(4).

fifo_compaction_options_allow_compaction_settings_1_test() ->
DbName = "erocksdb.fifo.allow_compaction_settings_1",
?rm_rf(DbName),
{ok, Db} = rocksdb:open(
DbName,
[
{create_if_missing, true},
{compaction_style, fifo},
{compaction_options_fifo, [
{allow_compaction, true},
{age_for_warm, 10},
{max_table_files_size, 20}
]}
]
),
%% Check LOG file to see if the compaction_style is not FIFO
{ok, Log0} = file:read_file(lists:concat([DbName, "/LOG"])),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_style: kCompactionStyleFIFO")),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_options_fifo.allow_compaction: 1")),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_options_fifo.max_table_files_size: 20")),

ok = rocksdb:close(Db),
?rm_rf(DbName).

fifo_compaction_options_allow_compaction_settings_2_test() ->
DbName = "erocksdb.fifo.allow_compaction_settings_2",
?rm_rf(DbName),
{ok, Db} = rocksdb:open(
DbName,
[
{create_if_missing, true},
{compaction_style, fifo},
{compaction_options_fifo, [
{allow_compaction, false},
{age_for_warm, 40},
{max_table_files_size, 50}
]}
]
),
%% Check LOG file to see if the compaction_style is not FIFO
{ok, Log0} = file:read_file(lists:concat([DbName, "/LOG"])),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_style: kCompactionStyleFIFO")),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_options_fifo.allow_compaction: 0")),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_options_fifo.max_table_files_size: 50")),

ok = rocksdb:close(Db),
?rm_rf(DbName).


test_with_ttl(TTL) ->
TTLAsList = integer_to_list(TTL),
DbName = lists:concat(["erocksdb.fifo",".",TTLAsList]),
?rm_rf(DbName),
{ok, Db} = rocksdb:open(
DbName,
[
{create_if_missing, true},
{compaction_style, fifo},
{ttl, TTL}
]
),
%% Check LOG file to see if the compaction_style fifo and ttl was correctly passed
{ok, Log0} = file:read_file(lists:concat([DbName, "/LOG"])),
?assertMatch({match, _}, re:run(Log0, "Options.compaction_style: kCompactionStyleFIFO")),
?assertMatch({match, _}, re:run(Log0, lists:concat(["Options.ttl: ",TTLAsList]))),
ok = rocksdb:close(Db),
?rm_rf(DbName).

0 comments on commit bdca875

Please sign in to comment.