diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 711f989..8a59854 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -48,6 +48,6 @@ jobs: - name: Generate coverage reports run: rebar3 covertool generate - - uses: codecov/codecov-action@v1 + - uses: codecov/codecov-action@v3 with: file: _build/test/covertool/cuckoo_filter.covertool.xml diff --git a/rebar.config b/rebar.config index 9176668..50ab687 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,5 @@ {erl_opts, [debug_info]}. +{deps, [{spinlock, "0.2.1"}]}. {profiles, [ {test, [{deps, [{xxh3, "0.3.5"}]}]} @@ -11,3 +12,11 @@ {plugins, [covertool]}. {covertool, [{coverdata_files, ["ct.coverdata"]}]}. + +{hex, [{doc, ex_doc}]}. + +{ex_doc, [ + {source_url, <<"https://github.com/farhadi/cuckoo_filter">>}, + {extras, [<<"README.md">>, <<"LICENSE">>]}, + {main, <<"readme">>} +]}. diff --git a/rebar.lock b/rebar.lock index 57afcca..7d48896 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1 +1,8 @@ -[]. +{"1.2.0", +[{<<"spinlock">>,{pkg,<<"spinlock">>,<<"0.2.1">>},0}]}. +[ +{pkg_hash,[ + {<<"spinlock">>, <<"0B19C946EE7C3F005B1FF856F712352540095DD823EE03F5352D9818602E9D60">>}]}, +{pkg_hash_ext,[ + {<<"spinlock">>, <<"1A7FA7EFD642C1D36EC253D3C6EF6A50863B3E934318404A29F418564D68981B">>}]} +]. diff --git a/src/cuckoo_filter.app.src b/src/cuckoo_filter.app.src index 9149e87..bc6b51a 100644 --- a/src/cuckoo_filter.app.src +++ b/src/cuckoo_filter.app.src @@ -5,7 +5,8 @@ {registered, []}, {applications, [ kernel, - stdlib + stdlib, + spinlock ]}, {env, []}, {modules, []}, diff --git a/src/cuckoo_filter.erl b/src/cuckoo_filter.erl index 440fddd..a9fdb7f 100644 --- a/src/cuckoo_filter.erl +++ b/src/cuckoo_filter.erl @@ -10,12 +10,12 @@ new/1, new/2, add/2, add/3, contains/2, - delete/2, delete/3, + delete/2, hash/2, add_hash/2, add_hash/3, contains_hash/2, contains_fingerprint/3, - delete_hash/2, delete_hash/3, + delete_hash/2, capacity/1, size/1, whereis/1, @@ -47,6 +47,19 @@ -define(DEFAULT_EVICTIONS, 100). -define(FILTER(FilterName), persistent_term:get({?MODULE, FilterName})). +-define(TRANSACTION(Filter, Expr), + case Filter of + #cuckoo_filter{max_evictions = 0} -> + Expr; + #cuckoo_filter{lock = Lock} -> + LockId = spinlock:acquire(Lock), + try Expr of + Res -> Res + after + spinlock:release(Lock, LockId) + end + end +). %% @equiv new(Capacity, []) -spec new(pos_integer()) -> cuckoo_filter(). @@ -100,9 +113,14 @@ new(Capacity, Opts) -> ), NumBuckets = 1 bsl ceil(math:log2(ceil(Capacity / BucketSize))), MaxHash = NumBuckets bsl FingerprintSize - 1, - AtomicsSize = ceil(NumBuckets * BucketSize * FingerprintSize / 64) + 2, + AtomicsSize = ceil(NumBuckets * BucketSize * FingerprintSize / 64) + 3, + AtomicsRef = atomics:new(AtomicsSize, [{signed, false}]), Filter = #cuckoo_filter{ - buckets = atomics:new(AtomicsSize, [{signed, false}]), + buckets = AtomicsRef, + lock = spinlock:new([ + {atomics_ref, AtomicsRef}, + {max_retry, max(100_000, MaxEvictions * 10_000)} + ]), num_buckets = NumBuckets, max_hash = MaxHash, bucket_size = BucketSize, @@ -118,52 +136,50 @@ new(Capacity, Opts) -> Filter end. -%% @equiv add(Filter, Element, infinity) +%% @equiv add(Filter, Element, false) -spec add(cuckoo_filter() | filter_name(), term()) -> ok | {error, not_enough_space}. add(Filter = #cuckoo_filter{}, Element) -> - add_hash(Filter, hash(Filter, Element), infinity); + add_hash(Filter, hash(Filter, Element), false); add(FilterName, Element) -> add(?FILTER(FilterName), Element). -%% @equiv add_hash(Filter, Element, infinity) +%% @equiv add_hash(Filter, Element, false) -spec add_hash(cuckoo_filter() | filter_name(), hash()) -> ok | {error, not_enough_space}. add_hash(Filter = #cuckoo_filter{}, Hash) -> - add_hash(Filter, Hash, infinity); + add_hash(Filter, Hash, false); add_hash(FilterName, Hash) -> - add_hash(?FILTER(FilterName), Hash, infinity). + add_hash(?FILTER(FilterName), Hash, false). %% @doc Adds an element to a filter. %% %% Returns `ok' if the insertion was successful, but could return %% `{error, not_enough_space}', when the filter is nearing its capacity. %% -%% When `LockTimeout' is given, it could return `{error, timeout}', if it -%% can not acquire the lock within `LockTimeout' milliseconds. -%% -%% If `force' is given as the 3rd argument, and there is no room for the element -%% to be inserted, another random element is removed, and the removed element -%% is returned as `{ok, {Index, Fingerprint}}'. In this case, elements are not -%% relocated, and no lock is acquired. +%% `Forced' argument is a boolean that indicates whether the insertion should +%% be forced or not. A forced insertion will never return a `not_enough_space' +%% error, instead when the filter is full, another random element is removed, +%% and the removed element is returned as `{ok, {Index, Fingerprint}}'. +%% In this case, elements are not relocated, and no lock is acquired. %% %% Forced insertion can only be used with `max_evictions' set to 0. -spec add - (cuckoo_filter() | filter_name(), term(), timeout()) -> - ok | {error, not_enough_space | timeout}; - (cuckoo_filter() | filter_name(), term(), force) -> + (cuckoo_filter() | filter_name(), term(), false) -> + ok | {error, not_enough_space}; + (cuckoo_filter() | filter_name(), term(), true) -> ok | {ok, Evicted :: {index(), fingerprint()}}. -add(Filter = #cuckoo_filter{}, Element, LockTimeout) -> - add_hash(Filter, hash(Filter, Element), LockTimeout); -add(FilterName, Element, LockTimeout) -> - add(?FILTER(FilterName), Element, LockTimeout). +add(Filter = #cuckoo_filter{}, Element, Forced) -> + add_hash(Filter, hash(Filter, Element), Forced); +add(FilterName, Element, Forced) -> + add(?FILTER(FilterName), Element, Forced). %% @doc Adds an element to a filter by its hash. %% %% Same as {@link add/3} except that it accepts the hash of the element instead %% of the element. -spec add_hash - (cuckoo_filter() | filter_name(), hash(), timeout()) -> - ok | {error, not_enough_space | timeout}; - (cuckoo_filter() | filter_name(), hash(), force) -> + (cuckoo_filter() | filter_name(), hash(), false) -> + ok | {error, not_enough_space}; + (cuckoo_filter() | filter_name(), hash(), true) -> ok | {ok, Evicted :: {index(), fingerprint()}}. add_hash( Filter = #cuckoo_filter{ @@ -172,7 +188,7 @@ add_hash( hash_function = HashFunction }, Hash, - LockTimeout + Forced ) -> {Index, Fingerprint} = index_and_fingerprint(Hash, FingerprintSize), case insert_at_index(Filter, Index, Fingerprint) of @@ -187,11 +203,16 @@ add_hash( RState = rand:mwc59_seed(), Rand = rand:mwc59_value32(RState) bsr 31 + 1, RandIndex = element(Rand, {Index, AltIndex}), - try_insert(Filter, RandIndex, Fingerprint, RState, LockTimeout) + case Forced of + true -> + force_insert(Filter, RandIndex, Fingerprint, RState); + false -> + try_insert(Filter, RandIndex, Fingerprint, RState) + end end end; -add_hash(FilterName, Hash, LockTimeout) -> - add_hash(?FILTER(FilterName), Hash, LockTimeout). +add_hash(FilterName, Hash, Forced) -> + add_hash(?FILTER(FilterName), Hash, Forced). %% @doc Checks if an element is in a filter. -spec contains(cuckoo_filter() | filter_name(), term()) -> boolean(). @@ -210,77 +231,65 @@ contains_hash(FilterName, Hash) -> %% @doc Checks whether a filter contains a fingerprint at the given index or its alternative index. -spec contains_fingerprint(cuckoo_filter() | filter_name(), index(), fingerprint()) -> boolean(). -contains_fingerprint(Filter = #cuckoo_filter{max_evictions = 0}, Index, Fingerprint) -> - contains_fingerprint(Filter, Index, undefined, Fingerprint, 1); -contains_fingerprint(Filter = #cuckoo_filter{}, Index, Fingerprint) -> - contains_fingerprint(Filter, Index, undefined, Fingerprint, 2); +contains_fingerprint( + Filter = #cuckoo_filter{ + num_buckets = NumBuckets, + hash_function = HashFunction, + max_evictions = MaxEvictions + }, + Index, + Fingerprint +) -> + Retry = + case MaxEvictions of + 0 -> 1; + _ -> 2 + end, + AltIndex = fun() -> alt_index(Index, Fingerprint, NumBuckets, HashFunction) end, + contains_fingerprint(Filter, {Index, AltIndex}, Fingerprint, Retry); contains_fingerprint(FilterName, Index, Fingerprint) -> contains_fingerprint(?FILTER(FilterName), Index, Fingerprint). -%% @equiv delete(Filter, Element, infinity) --spec delete(cuckoo_filter() | filter_name(), term()) -> ok | {error, not_found}. -delete(Filter = #cuckoo_filter{}, Element) -> - delete_hash(Filter, hash(Filter, Element), infinity); -delete(FilterName, Element) -> - delete(?FILTER(FilterName), Element). - -%% @equiv delete_hash(Filter, Element, infinity) --spec delete_hash(cuckoo_filter() | filter_name(), hash()) -> ok | {error, not_found}. -delete_hash(Filter = #cuckoo_filter{}, Hash) -> - delete_hash(Filter, Hash, infinity); -delete_hash(FilterName, Hash) -> - delete_hash(?FILTER(FilterName), Hash, infinity). - %% @doc Deletes an element from a filter. %% %% Returns `ok' if the deletion was successful, and returns {error, not_found} %% if the element could not be found in the filter. %% -%% When `LockTimeout' is given, it could return `{error, timeout}', if it -%% can not acquire the lock within `LockTimeout' milliseconds. -%% %% Note: A cuckoo filter can only delete items that are known to be -%% inserted before. Deleting of non inserted items might lead to deletion of -%% another random element. --spec delete(cuckoo_filter() | filter_name(), term(), timeout()) -> - ok | {error, not_found | timeout}. -delete(Filter = #cuckoo_filter{}, Element, LockTimeout) -> - delete_hash(Filter, hash(Filter, Element), LockTimeout); -delete(FilterName, Element, LockTimeout) -> - delete(?FILTER(FilterName), Element, LockTimeout). +%% inserted before. Deleting non inserted items might lead to deletion of +%% another element in case of a hash collision. +-spec delete(cuckoo_filter() | filter_name(), term()) -> ok | {error, not_found}. +delete(Filter = #cuckoo_filter{}, Element) -> + delete_hash(Filter, hash(Filter, Element)); +delete(FilterName, Element) -> + delete(?FILTER(FilterName), Element). %% @doc Deletes an element from a filter by its hash. %% -%% Same as {@link delete/3} except that it uses the hash of the element instead +%% Same as {@link delete/2} except that it uses the hash of the element instead %% of the element. --spec delete_hash(cuckoo_filter() | filter_name(), hash(), timeout()) -> - ok | {error, not_found | timeout}. +-spec delete_hash(cuckoo_filter() | filter_name(), hash()) -> ok | {error, not_found}. delete_hash( Filter = #cuckoo_filter{ fingerprint_size = FingerprintSize, num_buckets = NumBuckets, hash_function = HashFunction }, - Hash, - LockTimeout + Hash ) -> {Index, Fingerprint} = index_and_fingerprint(Hash, FingerprintSize), - case write_lock(Filter, LockTimeout) of - ok -> - case delete_fingerprint(Filter, Fingerprint, Index) of - ok -> - release_write_lock(Filter); - {error, not_found} -> - AltIndex = alt_index(Index, Fingerprint, NumBuckets, HashFunction), - Result = delete_fingerprint(Filter, Fingerprint, AltIndex), - release_write_lock(Filter), - Result - end; - {error, timeout} -> - {error, timeout} - end; -delete_hash(FilterName, Hash, LockTimeout) -> - delete_hash(?FILTER(FilterName), Hash, LockTimeout). + ?TRANSACTION( + Filter, + case delete_fingerprint(Filter, Fingerprint, Index) of + ok -> + ok; + {error, not_found} -> + AltIndex = alt_index(Index, Fingerprint, NumBuckets, HashFunction), + delete_fingerprint(Filter, Fingerprint, AltIndex) + end + ); +delete_hash(FilterName, Hash) -> + delete_hash(?FILTER(FilterName), Hash). %% @doc Returns the hash value of an element using the hash function of the filter. -spec hash(cuckoo_filter() | filter_name(), term()) -> hash(). @@ -305,7 +314,7 @@ capacity(FilterName) -> %% @doc Returns number of items in a filter. -spec size(cuckoo_filter() | filter_name()) -> non_neg_integer(). size(#cuckoo_filter{buckets = Buckets}) -> - atomics:get(Buckets, 2); + atomics:get(Buckets, 3); size(FilterName) -> ?MODULE:size(?FILTER(FilterName)). @@ -320,14 +329,14 @@ whereis(FilterName) -> %% {@link import/2} function. -spec export(cuckoo_filter() | filter_name()) -> binary(). export(Filter = #cuckoo_filter{buckets = Buckets}) -> - ok = write_lock(Filter, infinity), AtomicsSize = maps:get(size, atomics:info(Buckets)), - Result = << - <<(atomics:get(Buckets, I)):64/big-unsigned-integer>> - || I <- lists:seq(2, AtomicsSize) - >>, - release_write_lock(Filter), - Result; + ?TRANSACTION( + Filter, + << + <<(atomics:get(Buckets, I)):64/big-unsigned-integer>> + || I <- lists:seq(3, AtomicsSize) + >> + ); export(FilterName) -> export(?FILTER(FilterName)). @@ -337,12 +346,10 @@ export(FilterName) -> %% if the size of the given binary does not match the size of the filter. -spec import(cuckoo_filter() | filter_name(), binary()) -> ok | {error, invalid_data_size}. import(Filter = #cuckoo_filter{buckets = Buckets}, Data) when is_binary(Data) -> - ByteSize = (maps:get(size, atomics:info(Buckets)) - 1) * 8, + ByteSize = (maps:get(size, atomics:info(Buckets)) - 2) * 8, case byte_size(Data) of ByteSize -> - ok = write_lock(Filter, infinity), - import(Buckets, Data, 2), - release_write_lock(Filter); + ?TRANSACTION(Filter, import(Buckets, Data, 3)); _ -> {error, invalid_data_size} end; @@ -370,26 +377,16 @@ import(Buckets, <>, Index) -> atomics:put(Buckets, Index, Atomic), import(Buckets, Data, Index + 1). -contains_fingerprint( - Filter = #cuckoo_filter{num_buckets = NumBuckets, hash_function = HashFunction}, - undefined, - Index, - Fingerprint, - Retry -) -> - AltIndex = alt_index(Index, Fingerprint, NumBuckets, HashFunction), - contains_fingerprint(Filter, AltIndex, Index, Fingerprint, Retry); -contains_fingerprint(Filter, Index, _AltIndex, Fingerprint, 0) -> - Bucket = read_bucket(Index, Filter), - lists:member(Fingerprint, Bucket); -contains_fingerprint(Filter, Index, AltIndex, Fingerprint, Retry) -> +contains_fingerprint(Filter, {Index, AltIndex}, Fingerprint, Retry) -> Bucket = read_bucket(Index, Filter), - case lists:member(Fingerprint, Bucket) of - true -> - true; - false -> - contains_fingerprint(Filter, AltIndex, Index, Fingerprint, Retry - 1) - end. + lists:member(Fingerprint, Bucket) orelse + Retry > 0 andalso + contains_fingerprint( + Filter, + {AltIndex(), fun() -> Index end}, + Fingerprint, + Retry - 1 + ). delete_fingerprint(Filter, Fingerprint, Index) -> Bucket = read_bucket(Index, Filter), @@ -412,7 +409,7 @@ alt_index(Index, Fingerprint, NumBuckets, HashFunction) -> Index bxor HashFunction(Fingerprint) band (NumBuckets - 1). atomic_index(BitIndex) -> - BitIndex bsr 6 + 3. + BitIndex bsr 6 + 4. insert_at_index(Filter, Index, Fingerprint) -> Bucket = read_bucket(Index, Filter), @@ -428,31 +425,7 @@ insert_at_index(Filter, Index, Fingerprint) -> {error, full} end. -write_lock(#cuckoo_filter{max_evictions = 0}, _) -> - ok; -write_lock(#cuckoo_filter{buckets = Buckets}, infinity) -> - write_lock(Buckets, infinity); -write_lock(#cuckoo_filter{buckets = Buckets}, Timeout) -> - write_lock(Buckets, erlang:monotonic_time(microsecond) + Timeout * 1000); -write_lock(Buckets, Timeout) -> - case atomics:compare_exchange(Buckets, 1, 0, 1) of - ok -> - ok; - 1 -> - case erlang:monotonic_time(microsecond) > Timeout of - true -> - {error, timeout}; - false -> - write_lock(Buckets, Timeout) - end - end. - -release_write_lock(#cuckoo_filter{max_evictions = 0}) -> - ok; -release_write_lock(#cuckoo_filter{buckets = Buckets}) -> - atomics:put(Buckets, 1, 0). - -try_insert(Filter = #cuckoo_filter{bucket_size = BucketSize}, Index, Fingerprint, RState, force) -> +force_insert(Filter = #cuckoo_filter{bucket_size = BucketSize}, Index, Fingerprint, RState) -> Filter#cuckoo_filter.max_evictions == 0 orelse error(badarg), Bucket = read_bucket(Index, Filter), UpdatedRState = rand:mwc59(RState), @@ -461,35 +434,31 @@ try_insert(Filter = #cuckoo_filter{bucket_size = BucketSize}, Index, Fingerprint 0 -> case update_in_bucket(Filter, Index, SubIndex, 0, Fingerprint) of ok -> ok; - {error, outdated} -> try_insert(Filter, Index, Fingerprint, UpdatedRState, force) + {error, outdated} -> force_insert(Filter, Index, Fingerprint, UpdatedRState) end; Fingerprint -> {ok, {Index, Fingerprint}}; Evicted -> case update_in_bucket(Filter, Index, SubIndex, Evicted, Fingerprint) of ok -> {ok, {Index, Evicted}}; - {error, outdated} -> try_insert(Filter, Index, Fingerprint, UpdatedRState, force) + {error, outdated} -> force_insert(Filter, Index, Fingerprint, UpdatedRState) end - end; -try_insert(Filter, Index, Fingerprint, RState, LockTimeout) -> - case write_lock(Filter, LockTimeout) of - ok -> - Result = - try_insert( - Filter, - Index, - Fingerprint, - RState, - #{}, - [], - Filter#cuckoo_filter.bucket_size - ), - release_write_lock(Filter), - Result; - {error, timeout} -> - {error, timeout} end. +try_insert(Filter, Index, Fingerprint, RState) -> + ?TRANSACTION( + Filter, + try_insert( + Filter, + Index, + Fingerprint, + RState, + #{}, + [], + Filter#cuckoo_filter.bucket_size + ) + ). + try_insert(_Filter, _Index, _Fingerprint, _RState, _Evictions, _EvictionsList, 0) -> {error, not_enough_space}; try_insert( @@ -618,8 +587,8 @@ update_in_bucket( case atomics:compare_exchange(Buckets, AtomicIndex, AtomicValue, UpdatedAtomic) of ok -> case {OldValue, Value} of - {0, _} -> atomics:add(Buckets, 2, 1); - {_, 0} -> atomics:sub(Buckets, 2, 1); + {0, _} -> atomics:add(Buckets, 3, 1); + {_, 0} -> atomics:sub(Buckets, 3, 1); {_, _} -> ok end; _ -> diff --git a/src/cuckoo_filter.hrl b/src/cuckoo_filter.hrl index 69c885c..8710940 100644 --- a/src/cuckoo_filter.hrl +++ b/src/cuckoo_filter.hrl @@ -1,5 +1,6 @@ -record(cuckoo_filter, { buckets :: atomics:atomics_ref(), + lock :: spinlock:spinlock(), num_buckets :: pos_integer(), max_hash :: pos_integer(), bucket_size :: pos_integer(), diff --git a/test/cuckoo_filter_SUITE.erl b/test/cuckoo_filter_SUITE.erl index ab798f3..43a5dba 100644 --- a/test/cuckoo_filter_SUITE.erl +++ b/test/cuckoo_filter_SUITE.erl @@ -28,13 +28,19 @@ all() -> concurrent_add_delete, concurrent_add_delete_with_0_evictions, concurrent_add_delete_forced, - concurrent_add_same_item, - lock_timeout + concurrent_add_same_item ]. random_items(N) -> [rand:uniform() || _ <- lists:seq(1, N)]. +receive_msg() -> + receive + Message -> Message + after 1000 -> + error(timeout) + end. + new_badargs(_Config) -> ?assertError(badarg, cuckoo_filter:new(0)), ?assertError(badarg, cuckoo_filter:new(8, [{bucket_size, 0}])), @@ -128,21 +134,20 @@ named_filter(_Config) -> Element = an_element, ?assertEqual(cuckoo_filter:whereis(Name), Filter), ?assertEqual(ok, cuckoo_filter:add(Name, Element)), - ?assertEqual(ok, cuckoo_filter:add(Name, Element, infinity)), ?assert(cuckoo_filter:contains(Name, Element)), ?assertEqual(ok, cuckoo_filter:delete(Name, Element)), - ?assertEqual(ok, cuckoo_filter:delete(Name, Element, infinity)), Hash = cuckoo_filter:hash(Name, Element), ?assertEqual(ok, cuckoo_filter:add_hash(Name, Hash)), - ?assertEqual(ok, cuckoo_filter:add_hash(Name, Hash, infinity)), ?assert(cuckoo_filter:contains_hash(Name, Hash)), ?assertEqual(ok, cuckoo_filter:delete_hash(Name, Hash)), - ?assertEqual(ok, cuckoo_filter:delete_hash(Name, Hash, infinity)), + ?assertEqual(ok, cuckoo_filter:add_hash(Name, Hash, false)), + ?assert(cuckoo_filter:contains_hash(Name, Hash)), + ?assertEqual(ok, cuckoo_filter:delete_hash(Name, Hash)), ?assertEqual(0, cuckoo_filter:size(Name)), ?assert(cuckoo_filter:capacity(Name) >= Capacity), cuckoo_filter:new(Capacity, [{name, Name}, {max_evictions, 0}]), - [cuckoo_filter:add(Name, Element, force) || _ <- lists:seq(1, 100)], - {ok, {Index, Fingerprint}} = cuckoo_filter:add(Name, Element, force), + [cuckoo_filter:add(Name, Element, true) || _ <- lists:seq(1, 100)], + {ok, {Index, Fingerprint}} = cuckoo_filter:add(Name, Element, true), ?assert(cuckoo_filter:contains_fingerprint(Name, Index, Fingerprint)), Data = cuckoo_filter:export(Name), ?assertEqual(ok, cuckoo_filter:import(Name, Data)). @@ -219,7 +224,7 @@ adding_to_a_full_filter_by_force(_Config) -> Items = random_items(Capacity * 10), lists:foreach(fun(I) -> cuckoo_filter:add(Filter, I) end, Items), ?assertEqual(cuckoo_filter:size(Filter), Capacity), - {ok, {Index, Fingerprint}} = cuckoo_filter:add(Filter, extra, force), + {ok, {Index, Fingerprint}} = cuckoo_filter:add(Filter, extra, true), ?assert(not cuckoo_filter:contains_fingerprint(Filter, Index, Fingerprint)), ?assert(cuckoo_filter:contains(Filter, extra)). @@ -268,19 +273,16 @@ import_export(_Config) -> ?assertEqual({error, invalid_data_size}, cuckoo_filter:import(Filter3, Exported)). concurrent_add(_Config) -> - Filter = cuckoo_filter:new(100 + rand:uniform(1000)), + Filter = cuckoo_filter:new(1000 + rand:uniform(1000)), Capacity = cuckoo_filter:capacity(Filter), - ItemsGroup = [random_items(Capacity div 8) || _ <- lists:seq(1, 8)], + ItemsGroup = [random_items(Capacity div 100) || _ <- lists:seq(1, 100)], Parent = self(), [ spawn(fun() -> Parent ! [I || I <- Items, cuckoo_filter:add(Filter, I) == ok] end) || Items <- ItemsGroup ], [ - receive - Added -> - ?assert(lists:all(fun(I) -> cuckoo_filter:contains(Filter, I) end, Added)) - end + ?assert(lists:all(fun(I) -> cuckoo_filter:contains(Filter, I) end, receive_msg())) || _ <- ItemsGroup ]. @@ -293,10 +295,8 @@ concurrent_delete(_Config) -> {Added1, Added2} = lists:split(length(Added) div 2, Added), spawn(fun() -> Parent ! length([I || I <- Added1, cuckoo_filter:delete(Filter, I) == ok]) end), Deleted2 = length([I || I <- Added2, cuckoo_filter:delete(Filter, I) == ok]), - receive - Deleted1 -> - ?assertEqual(Deleted1 + Deleted2, length(Added)) - end. + Deleted1 = receive_msg(), + ?assertEqual(Deleted1 + Deleted2, length(Added)). concurrent_add_delete(_Config) -> Capacity = 1024, @@ -311,6 +311,8 @@ concurrent_add_delete(_Config) -> I -> cuckoo_filter:delete(Filter, I), F() + after 1000 -> + error(timeout) end end), spawn(fun() -> @@ -321,9 +323,7 @@ concurrent_add_delete(_Config) -> I || I <- Added, _ <- lists:seq(1, 250), not cuckoo_filter:contains(Filter, I) ]), - receive - {'DOWN', Ref, process, Pid, normal} -> ok - end, + {'DOWN', Ref, process, Pid, normal} = receive_msg(), ?assertEqual(cuckoo_filter:size(Filter), length(Added)). concurrent_add_delete_with_0_evictions(_Config) -> @@ -339,6 +339,8 @@ concurrent_add_delete_with_0_evictions(_Config) -> I -> cuckoo_filter:delete(Filter, I), F() + after 1000 -> + error(timeout) end end), spawn(fun() -> @@ -349,9 +351,7 @@ concurrent_add_delete_with_0_evictions(_Config) -> I || I <- Added, _ <- lists:seq(1, 250), not cuckoo_filter:contains(Filter, I) ]), - receive - {'DOWN', Ref, process, Pid, normal} -> ok - end, + {'DOWN', Ref, process, Pid, normal} = receive_msg(), ?assertEqual(cuckoo_filter:size(Filter), length(Added)). concurrent_add_delete_forced(_Config) -> @@ -361,7 +361,7 @@ concurrent_add_delete_forced(_Config) -> Parent = self(), [ spawn(fun() -> - Parent ! length([I || I <- Items, cuckoo_filter:add(Filter, I, force) == ok]) + Parent ! length([I || I <- Items, cuckoo_filter:add(Filter, I, true) == ok]) end) || _ <- lists:seq(1, 10) ], @@ -371,12 +371,7 @@ concurrent_add_delete_forced(_Config) -> end) || _ <- lists:seq(1, 10) ], - AddsAndRemoves = [ - receive - N -> N - end - || _ <- lists:seq(1, 20) - ], + AddsAndRemoves = [receive_msg() || _ <- lists:seq(1, 20)], ?assertEqual(lists:sum(AddsAndRemoves), cuckoo_filter:size(Filter)). concurrent_add_same_item(_Config) -> @@ -387,24 +382,6 @@ concurrent_add_same_item(_Config) -> spawn(fun() -> Parent ! length([I || I <- Items, cuckoo_filter:add(Filter, I) == ok]) end), spawn(fun() -> Parent ! length([I || I <- Items, cuckoo_filter:add(Filter, I) == ok]) end), Deleted = length([I || I <- Items, cuckoo_filter:delete(Filter, I) == ok]), - receive - Added1 -> Added1 - end, - receive - Added2 -> Added2 - end, + Added1 = receive_msg(), + Added2 = receive_msg(), ?assertEqual(Deleted + cuckoo_filter:size(Filter), Added1 + Added2). - -lock_timeout(_Config) -> - Filter = cuckoo_filter:new(100 + rand:uniform(1000)), - Capacity = cuckoo_filter:capacity(Filter), - Items1 = random_items(Capacity), - Items2 = random_items(Capacity), - {Pid, Ref} = spawn_monitor(fun() -> [I || I <- Items1, cuckoo_filter:add(Filter, I) == ok] end), - ?assert(lists:any(fun(I) -> cuckoo_filter:add(Filter, I, 0) == {error, timeout} end, Items2)), - ?assert( - lists:any(fun(I) -> cuckoo_filter:delete(Filter, I, 0) == {error, timeout} end, Items2) - ), - receive - {'DOWN', Ref, process, Pid, normal} -> ok - end.