From 8313e1828da0a931f32a1a888daf6db672817112 Mon Sep 17 00:00:00 2001 From: Shpaky Date: Wed, 6 Jan 2021 19:02:37 +0300 Subject: [PATCH] lets go --- LICENCE | 29 +++ include/lru.hrl | 30 +++ lru.app | 18 ++ lru.config | 13 ++ priv/init | 52 +++++ priv/lru.rel | 13 ++ priv/stop | 8 + src/lru.erl | 458 +++++++++++++++++++++++++++++++++++++++++++ src/lru_app.erl | 21 ++ src/lru_protocol.erl | 233 ++++++++++++++++++++++ src/lru_sup.erl | 95 +++++++++ src/lru_utils.erl | 88 +++++++++ 12 files changed, 1058 insertions(+) create mode 100644 LICENCE create mode 100644 include/lru.hrl create mode 100644 lru.app create mode 100644 lru.config create mode 100755 priv/init create mode 100644 priv/lru.rel create mode 100644 priv/stop create mode 100644 src/lru.erl create mode 100644 src/lru_app.erl create mode 100644 src/lru_protocol.erl create mode 100644 src/lru_sup.erl create mode 100644 src/lru_utils.erl diff --git a/LICENCE b/LICENCE new file mode 100644 index 0000000..808cabc --- /dev/null +++ b/LICENCE @@ -0,0 +1,29 @@ +Copyright (c) 2020 Vladimir Solenkov +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY +THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. diff --git a/include/lru.hrl b/include/lru.hrl new file mode 100644 index 0000000..c2a6b66 --- /dev/null +++ b/include/lru.hrl @@ -0,0 +1,30 @@ +-define(SERIE_SIZE,10000000). +-define(MAX_COUNTER,1000000000000000). + +-define(TIMEOUT_STATE_DELETE,90000). + +-define(ETS_KEYS_STORE_TABLE_NAME,lru_key_store). +-define(ETS_KEYS_FETCH_TABLE_NAME,lfu_key_fetch). +-define(ETS_KEYS_FETCH_TABLE_OPTS,[ + public,bag,{write_concurrency,true}, + {decentralized_counters,true} +]). + +-define(MAX_KEY_SIZE,fun() -> application:get_env(lru,max_key_size,72) end). + +%% +%% following settings in progress develop +%% +-define(SPAWN_OPT_LRU,[ +% {max_heap_size,0}, +% {message_queue_data,off_heap}, + {fullsweep_after,65535} +]). + +-ifdef(support). + -define(SUPPORT,true). + -define(AUXILIARY,any). +-else. + -define(SUPPORT,false). + -define(AUXILIARY,any). +-endif. diff --git a/lru.app b/lru.app new file mode 100644 index 0000000..3e7fcb2 --- /dev/null +++ b/lru.app @@ -0,0 +1,18 @@ +{application,lru,[ + {description,"Least Recently Used Algorithm"}, + {vsn,"1.0.0"}, + {modules,[ + lru_app,lru_sup,lru, + lru_protocol,lru_utils + ]}, + {registered,[ + lru_sup,lru + ]}, + {applications,[kernel,stdlib]}, + {included_applications,[ranch]}, + {mod,{lru_app,[]}}, + {start_phases,[]}, + {env,[]}, + {maintainers,["Vladimir Solenkov"]}, + {links,[{"Github","https://github.com/Algorithms-Lab/LRU.git"}]} +]}. diff --git a/lru.config b/lru.config new file mode 100644 index 0000000..a556432 --- /dev/null +++ b/lru.config @@ -0,0 +1,13 @@ +[{lru,[ + {ets_dir,"priv"}, %% !!! must be atom string !!!!! + {ets_sync_reset,true}, %% !!! must be atom type !!!!! + {ets_recovery,true}, %% !!! must be atom type !!!!! + {tcp,on}, %% !!! must be atom type !!!!! + {mode,inet}, %% !!! must be atom type !!!!! + {port,7777}, %% !!! must be atom type !!!!! + {ip,{127,0,0,1}}, %% !!! must be tuple type !!!!! + {unix,"/tmp/lru_socket"}, %% !!! must by string type !!!!! + {num_acceptors,100}, %% !!! must by integer type !!!!! + {max_connections,1024}, %% !!! must by integer type !!!!! + {max_size_key,72} %% !!! must by integer type !!!!! +]}]. diff --git a/priv/init b/priv/init new file mode 100755 index 0000000..6f6a166 --- /dev/null +++ b/priv/init @@ -0,0 +1,52 @@ +#!/bin/bash + +NAME="lru" +APP=$NAME"_app" +TNODE=$NAME"@"`hostname -s` +CNODE=$NAME`date +_nodeclt_%H_%M_%S_%N` + +export ERL_MAX_ETS_TABLES=140000 + +ACTION=$1 + +if [ "" = "$ACTION" ]; +then + echo " + USAGE: + $0 [ ...] + MANAGE + start - start up node - 'attached' + startd - start up node - 'detached' + attach - attach to running node + stop - stops application and halts the node + " +else + echo " + INFO: + NAME: $NAME + TNODE: $TNODE + CNODE: $CNODE + PWD: $PWD + APP: $APP + " + cd $PWD + case "$ACTION" in + "start") + bin/start + sleep 3 + bin/to_erl pipe/ + ;; + "startd") + bin/start + ;; + "attach") + bin/to_erl pipe/ + ;; + "stop") + escript "stop" $CNODE $TNODE + ;; + *) + echo "UNKNOW COMMAND!" + ;; + esac +fi; diff --git a/priv/lru.rel b/priv/lru.rel new file mode 100644 index 0000000..c8a9f1d --- /dev/null +++ b/priv/lru.rel @@ -0,0 +1,13 @@ +{release, + {"lru","1"}, + {erts,"11.1"}, + [{kernel,"7.1"}, + {stdlib,"3.13.2"}, + {sasl, "4.0.1"}, + {ranch, "2.0.0"}, + {ssl,"10.1"}, + {crypto,"4.8"}, + {public_key,"1.9"}, + {asn1,"5.0.14"}, + {lru, "1.0.0"}] +}. diff --git a/priv/stop b/priv/stop new file mode 100644 index 0000000..ba0db11 --- /dev/null +++ b/priv/stop @@ -0,0 +1,8 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +main([CNode,TNode]) -> + {ok, _} = net_kernel:start([list_to_atom(CNode),shortnames]), + Res = rpc:call(list_to_atom(TNode),init,stop,[]), + io:fwrite("==> ~p~n",[Res]). + diff --git a/src/lru.erl b/src/lru.erl new file mode 100644 index 0000000..a768c8f --- /dev/null +++ b/src/lru.erl @@ -0,0 +1,458 @@ +-module('lru'). +-author('VSolenkov'). + +-behaviour('gen_statem'). + +-export([ + start_link/0 +]). +-export([ + point/1, + cheat/1, + count/1, + state/0, + store/0, + fetch/0, + clean/0, + clean/1, + clean/2 +]). +-export([ + reset/1 +]). +-export([ + init/1, + callback_mode/0 +]). +-export([ + common/3, + delete/3 +]). +-include("include/lru.hrl"). + + + +start_link() -> + gen_statem:start_link( + {local,?MODULE}, + ?MODULE,[0,0,0], + [{spawn_opt,?SPAWN_OPT_LRU}]). + + + +init([_,_,_]) -> + [O,C,Q] = restorage(?ETS_KEYS_STORE_TABLE_NAME), + {ok,common,[O,C,Q]}. + +callback_mode() -> + state_functions. + + +point(K) -> + case lru_utils:key_validation(K) of + BK when is_binary(BK) -> + gen_statem:cast(?MODULE,{point,BK}); + -1 -> + "type_error"; + -2 -> + "size_key_error" + end. +cheat(KVL) -> + case is_list(KVL) of + true -> + VKVL = lists:filtermap( + fun({K,V}) -> + case lru_utils:key_validation(K) of + BK when is_binary(BK) -> + {true,{BK,V}}; + _ -> + false + end + end, + KVL), + if + length(VKVL) > 0 -> + gen_statem:cast(?MODULE,{cheat,VKVL}); + true -> + "data_error" + end; + false -> + "type_error" + end. +count(K) -> + case lru_utils:key_validation(K) of + BK when is_binary(BK) -> + gen_statem:call(?MODULE,{count,BK}); + -1 -> + "type_error"; + -2 -> + "size_key_error" + end. +state() -> + gen_statem:call(?MODULE,state). +store() -> + gen_statem:cast(?MODULE,store). +fetch() -> + gen_statem:call(?MODULE,fetch). +clean() -> + gen_statem:call(?MODULE,{clean,async}). +clean(async) -> + gen_statem:call(?MODULE,{clean,async}); +clean(sync) -> + gen_statem:call(?MODULE,{clean,sync}). + +clean(R,K) -> + gen_statem:cast(?MODULE,{{clean,R},K}). + +reset(D) -> + gen_statem:cast(?MODULE,{reset,D}). + + +common(cast,{point,K},[O,C,Q]) -> + [NO,NC,NQ] = point_handler(K,O,C,Q), + {keep_state,[NO,NC,NQ]}; +common(cast,{cheat,KVL},[O,C,Q]) -> + [NO,NC,NQ] = cheat_handler(KVL,O,C,Q), + {keep_state,[NO,NC,NQ]}; +common(cast,store,[O,C,Q]) -> + lru_utils:ets_reset([?ETS_KEYS_STORE_TABLE_NAME]), + {keep_state,[O,C,Q]}; +common(cast,{reset,D},[O,C,Q]) -> + [NO,NC,NQ] = reset_handler(D,O,C,Q), + {keep_state,[NO,NC,NQ]}; +common({call,F},{count,K},[O,C,Q]) -> + {keep_state,[O,C,Q],[{reply,F,get(K)}]}; +common({call,F},state,[O,C,Q]) -> + {keep_state,[O,C,Q],[{reply,F,[O,C,Q]}]}; +common({call,F},fetch,[O,C,Q]) -> + {keep_state,[O,C,Q],[{reply,F,fetch_handler(O,Q)}]}; +common({call,F},{clean,async},[O,C,Q]) -> + {next_state,delete,[O,C,Q,#{from => F}],[{next_event,internal,clean}]}; +common({call,F},{clean,sync},[O,C,Q]) -> + K = fetch_handler(O,Q), + R = make_ref(), + {next_state,delete,[O,C,Q,#{key => K, ref => R}],[{reply,F,{K,R}},{state_timeout,?TIMEOUT_STATE_DELETE,K}]}; +common(cast,{{clean,_R},_K},_StateData) -> + keep_state_and_data; +common(cast,EventContent,_StateData) -> + io:format('State: common~nEventType: cast~nEventContent: ~p~n',[EventContent]), + keep_state_and_data; +common({call,_F},EventContent,_StateData) -> + io:format('State: common~nEventType: cast~nEventContent: ~p~n',[EventContent]), + keep_state_and_data; +common(info,_EventContent,_StateData) -> + keep_state_and_data. + +delete(internal,clean,[O,C,Q,#{from := F}]) -> + [K,NO,NC,NQ] = clean_handler(O,C,Q), + {next_state,common,[NO,NC,NQ],[{reply,F,K}]}; +delete(state_timeout,K,[O,C,Q,#{key := K, ref := _}]) -> + {next_state,common,[O,C,Q]}; +delete(cast,{{clean,R},K},[O,C,Q,#{key := K, ref := R}]) -> + [K,NO,NC,NQ] = clean_handler(O,C,Q), + {next_state,common,[NO,NC,NQ]}; +delete(cast,{{clean,_R},_K},_StateData) -> + keep_state_and_data; +delete(cast,{point,_K},_StateData) -> + {keep_state_and_data,[postpone]}; +delete(cast,{cheat,_KVL},_StateData) -> + {keep_state_and_data,[postpone]}; +delete(cast,store,_StateData) -> + {keep_state_and_data,[postpone]}; +delete(cast,reset,_StateData) -> + {keep_state_and_data,[postpone]}; +delete({call,_F},{count,_K},_StateData) -> + {keep_state_and_data,[postpone]}; +delete({call,_F},state,_StateData) -> + {keep_state_and_data,[postpone]}; +delete({call,_F},fetch,_StateData) -> + {keep_state_and_data,[postpone]}; +delete({call,_F},{clean,async},_StateData) -> + {keep_state_and_data,[postpone]}; +delete({call,_F},{clean,sync},_StateData) -> + {keep_state_and_data,[postpone]}; +delete(cast,_EventContent,_StateData) -> + keep_state_and_data; +delete({call,_F},_EventContent,_StateData) -> + keep_state_and_data; +delete(info,_EventContent,_StateData) -> + keep_state_and_data. + + +point_handler(K,O,C,Q) -> + if + C =< ?MAX_COUNTER -> + [NO,NC,NQ] = + if + C rem ?SERIE_SIZE == 0 -> + case get(K) of + undefined -> + put(K,C+1), + put(C div ?SERIE_SIZE,<<"1">>), + ets:insert(?ETS_KEYS_STORE_TABLE_NAME,{K,C+1}), + [if Q == 0 -> C+1; true -> O end,C+1,Q+1]; + OC -> + case get((OC-1) div ?SERIE_SIZE) of + <<"1">> -> erase((OC-1) div ?SERIE_SIZE); + QS -> put((OC-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS)-1)) + end, + put(K,C+1), + put(C div ?SERIE_SIZE,<<"1">>), + ets:insert(?ETS_KEYS_STORE_TABLE_NAME,{K,C+1}), + if + OC == O -> + [compute_oldest_key((O-1) div ?SERIE_SIZE,C div ?SERIE_SIZE),C+1,Q]; + true -> + [O,C+1,Q] + end + end; + true -> + get(C div ?SERIE_SIZE) =:= undefined andalso put(C div ?SERIE_SIZE,<<"0">>), + + case get(K) of + undefined -> + put(K,C+1), + put(C div ?SERIE_SIZE,integer_to_binary(binary_to_integer(get(C div ?SERIE_SIZE))+1)), + ets:insert(?ETS_KEYS_STORE_TABLE_NAME,{K,C+1}), + [O,C+1,Q+1]; + OC -> + put(K,C+1), + (OC-1) div ?SERIE_SIZE /= C div ?SERIE_SIZE andalso + case get((OC-1) div ?SERIE_SIZE) of + <<"1">> -> erase((OC-1) div ?SERIE_SIZE),true; + QS -> put((OC-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS)-1)),true + end andalso put(C div ?SERIE_SIZE,integer_to_binary(binary_to_integer(get(C div ?SERIE_SIZE)) + 1)), + ets:insert(?ETS_KEYS_STORE_TABLE_NAME,{K,C+1}), + if + OC == O -> + [compute_oldest_key((O-1) div ?SERIE_SIZE,(C-1) div ?SERIE_SIZE),C+1,Q]; + true -> + [O,C+1,Q] + end + end + end, + ?SUPPORT andalso erlang:apply(?AUXILIARY,point,[K]), + [NO,NC,NQ]; + true -> + ?SUPPORT andalso erlang:apply(?AUXILIARY,point,[K]), + [O,C,Q] + end. + +cheat_handler(KVL,O,C,Q) -> + [NO,NC,NQ] = lists:foldl( + fun({K,V},[NO,NC,NQ]) when V =< ?MAX_COUNTER -> + case get(K) of + undefined -> + if + V > 0 -> + put(K,V), + put((V-1) div ?SERIE_SIZE, + case get((V-1) div ?SERIE_SIZE) of + undefined -> <<"1">>; + QS -> integer_to_binary(binary_to_integer(QS) + 1) + end + ), + ets:insert(?ETS_KEYS_STORE_TABLE_NAME,{K,V}), + if + NQ == 0 -> + [V,V,1]; + NC < V -> + [NO,V,NQ+1]; + NO > V -> + [V,NC,NQ+1]; + true -> + [NO,NC,NQ+1] + end; + true -> + [NO,NC,NQ] + end; + OV -> + if + V > 0 -> + put(K,V), + get((V-1) div ?SERIE_SIZE) /= get((OV-1) div ?SERIE_SIZE) andalso + case get((OV-1) div ?SERIE_SIZE) of + <<"1">> -> erase((OV-1) div ?SERIE_SIZE),true; + QS1 -> put((OV-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS1)-1)),true + end andalso + case get((V-1) div ?SERIE_SIZE) of + undefined -> put((V-1) div ?SERIE_SIZE,<<"1">>); + QS2 -> put((V-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS2)-1)) + end, + ets:insert(?ETS_KEYS_STORE_TABLE_NAME,{K,V}), + if + NC < V -> + if + NO == OV -> + [compute_oldest_key((OV-1) div ?SERIE_SIZE,(V-1) div ?SERIE_SIZE),V,NQ]; + true -> + [NO,V,NQ] + end; + NO > V -> + [V,NC,NQ]; + NO == OV -> + [compute_oldest_key((OV-1) div ?SERIE_SIZE,(NC-1) div ?SERIE_SIZE),NC,NQ]; + true -> + [NO,NC,NQ] + end; + true -> + erase(K), + case get((OV-1) div ?SERIE_SIZE) of + <<"1">> -> erase((OV-1) div ?SERIE_SIZE); + QS3 -> put((OV-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS3)-1)) + end, + ets:delete(?ETS_KEYS_STORE_TABLE_NAME,K), + if + NO == OV -> + [compute_oldest_key((OV-1) div ?SERIE_SIZE,(NC-1) div ?SERIE_SIZE),NC,NQ-1]; + true -> + [NO,NC,NQ-1] + end + end + end; + (_,[NO,NC,NQ]) -> + [NO,NC,NQ] + end, + [O,C,Q],KVL), + [NO,if NQ == 0 -> 0; true -> NC end,NQ]. + +fetch_handler(O,Q) -> + if + Q =:= 0 -> + {O,[]}; %% that is: {0,[]}; + true -> + [K|_] = get_keys(O), %% in case if command 'cheat' sets duplicate counters for different keys + {O,[K]} + end. + +clean_handler(O,C,Q) -> + if + Q =:= 0 -> + [{O,[]},O,C,Q]; %% that is: [{0,[]},0,0,0]; + true -> + [K|_] = get_keys(O), %% in case if command 'cheat' sets duplicate counters for different keys + erase(K), + case get((O-1) div ?SERIE_SIZE) of + <<"1">> -> erase((O-1) div ?SERIE_SIZE); + QS -> put((O-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS)-1)) + end, + ets:delete(?ETS_KEYS_STORE_TABLE_NAME,K), + ?SUPPORT andalso erlang:apply(?AUXILIARY,reset,[[{O,[K]}]]), + [ + {O,[K]}, + compute_oldest_key((O-1) div ?SERIE_SIZE,(C-1) div ?SERIE_SIZE), + if + Q-1 == 0 -> 0; + true -> C + end, + Q-1 + ] + end. + +reset_handler({_,KL},O,C,Q) -> + [NO,NQ] = lists:fold1( + fun(K,[NO,NQ]) -> + case erase(K) of + undefined -> [NO,NQ]; + OC -> + case get((OC-1) div ?SERIE_SIZE) of + <<"1">> -> erase((OC-1) div ?SERIE_SIZE); + QS -> put((OC-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS)-1)) + end, + ets:delete(?ETS_KEYS_STORE_TABLE_NAME,K), + if + OC == O -> + [compute_oldest_key((OC-1) div ?SERIE_SIZE,(C-1) div ?SERIE_SIZE),NQ-1]; + true -> + [NO,NQ-1] + end + end + end, + [O,Q],KL), + [NO,if NQ == 0 -> 0; true -> C end,NQ]; +reset_handler(T,O,C,Q) -> + [NO,NQ] = case ets:info(T) of + undefined -> [O,Q]; + _ -> + ets:foldl( + fun({_,KL},[NO,NQ]) -> + lists:foldl( + fun(K,[NO,NQ]) -> + case erase(K) of + undefined -> [NO,NQ]; + OC -> + case get((OC-1) div ?SERIE_SIZE) of + <<"1">> -> erase((OC-1) div ?SERIE_SIZE); + QS -> put((OC-1) div ?SERIE_SIZE,integer_to_binary(binary_to_integer(QS)-1)) + end, + ets:delete(?ETS_KEYS_STORE_TABLE_NAME,K), + if + OC == O -> + [compute_oldest_key((OC-1) div ?SERIE_SIZE,(C-1) div ?SERIE_SIZE),NQ-1]; + true -> + [NO,NQ-1] + end + end + end, + [NO,NQ],KL) + end, + [O,Q],T) + end, + [NO,if NQ == 0 -> 0; true -> C end,NQ]. + + +compute_oldest_key(U,U) -> + case get(U) of + undefined -> 0; + _ -> check_keys_serie((U*?SERIE_SIZE)+1,(U+1)*?SERIE_SIZE) + end; +compute_oldest_key(L,U) -> + case get(L) of + undefined -> compute_oldest_key(L+1,U); + _ -> check_keys_serie((L*?SERIE_SIZE)+1,(L+1)*?SERIE_SIZE) + end. + +check_keys_serie(U,U) -> + case get_keys(U) of + [] -> 0; + [_|_] -> U %% in case if command 'cheat' sets duplicate counters for different keys + end; +check_keys_serie(L,U) -> + case get_keys(L) of + [] -> check_keys_serie(L+1,U); + [_|_] -> L %% in case if command 'cheat' sets duplicate counters for different keys + end. + + +restorage(T) -> + case ets:whereis(T) of + undefined -> [0,0,0]; + _ -> + ets:foldl( + fun({K,C},[NO,NC,NQ]) -> + if + C =< ?MAX_COUNTER andalso C > 0 -> + QS = get((C-1) div ?SERIE_SIZE), + put((C-1) div ?SERIE_SIZE, + if + QS == undefined -> <<"1">>; + true -> integer_to_binary(binary_to_integer(QS) + 1) + end + ), + put(K,C), + if + NQ == 0 -> + [C,C,1]; + C < NO -> + [C,NC,NQ+1]; + C > NC -> + [NO,C,NQ+1]; + true -> + [NO,NC,NQ+1] + end; + true -> + [NO,NC,NQ] + end + end, + [0,0,0],T) + end. diff --git a/src/lru_app.erl b/src/lru_app.erl new file mode 100644 index 0000000..f99a530 --- /dev/null +++ b/src/lru_app.erl @@ -0,0 +1,21 @@ +-module('lru_app'). +-author('VSolenkov'). + +-behavior(application). + +-export([ + start/2, + stop/1, + prep_stop/1 +]). + + +start(_StartType,_StartArgs) -> + lru_sup:start_link(). + +stop(_ETS_TABLES) -> + ok. + +prep_stop(ETS_TABLES) -> + lru_utils:ets_reset(ETS_TABLES), + ok. diff --git a/src/lru_protocol.erl b/src/lru_protocol.erl new file mode 100644 index 0000000..2024fde --- /dev/null +++ b/src/lru_protocol.erl @@ -0,0 +1,233 @@ +-module(lru_protocol). +-author('VSolenkov'). + +-behavior('gen_statem'). +-behavior('ranch_protocol'). + +-export([ + start_link/3 +]). +-export([ + init/1, + callback_mode/0 +]). +-export([ + common/3, + delete/3 +]). +-include("include/lru.hrl"). + + + +start_link(R,T,_) -> + {ok,proc_lib:spawn_link(?MODULE,init,[[R,T]])}. + + +init([R,T]) -> + {ok,S} = ranch:handshake(R), + T:setopts(S,[{active,once},{packet,line}]), + gen_statem:enter_loop(?MODULE,[],common,[S,T]). + +callback_mode() -> + state_functions. + + +common(info,{tcp,S,<<"POINT:",P/binary>>},[S,T]) -> + T:setopts(S,[{active,once}]), + K = break_binary_string(byte_size(P),P), + case lru:point(K) of + ok -> + T:send(S,<<"OK">>); + "type_error" -> + T:send(S,<<"{","ERROR",":","TYPE_ERROR","}">>); + "size_key_error" -> + T:send(S,<<"{","ERROR",":","MAX_SIZE_KEY","}">>); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"CHEAT:",P/binary>>},[S,T]) -> + T:setopts(S,[{active,once}]), + KVL = lists:filtermap( + fun(KV) -> + case binary:split(KV,<<",">>,[global]) of + [K,V] -> + case catch binary_to_integer(V) of + {'EXIT',_} -> false; + I -> {true,{K,I}} + end; + _ -> + false + end + end, + binary:split(break_binary_string(byte_size(P),P),<<";">>,[global])), + case lru:cheat(KVL) of + ok -> + T:send(S,<<"OK">>); + "type_error" -> + T:send(S,<<"{","ERROR",":","TYPE_ERROR","}">>); + "data_error" -> + T:send(S,<<"{","ERROR",":","DATA_ERROR","}">>); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"COUNT:",P/binary>>},[S,T]) -> + T:setopts(S,[{active,once}]), + K = break_binary_string(byte_size(P),P), + case lru:count(K) of + "type_error" -> + T:send(S,<<"{","ERROR",":","TYPE_ERROR","}">>); + "size_key_error" -> + T:send(S,<<"{","ERROR",":","MAX_SIZE_KEY","}">>); + undefined -> + T:send(S,<<"{","ERROR",":","UNDEFINED_KEY","}">>); + C when is_integer(C) -> + BC = integer_to_binary(C), + T:send(S,<>); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"STATE",E/binary>>},[S,T]) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + T:setopts(S,[{active,once}]), + case lru:state() of + [O,C,Q] -> + BO = integer_to_binary(O), + BC = integer_to_binary(C), + BQ = integer_to_binary(Q), + T:send(S,<<"{","O",":",BO/binary,",","C",":",BC/binary,",","Q",":",BQ/binary,"}">>); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"STORE",E/binary>>},[S,T]) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + T:setopts(S,[{active,once}]), + case lru:store() of + ok -> + T:send(S,<<"OK">>); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"FETCH",E/binary>>},[S,T]) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + T:setopts(S,[{active,once}]), + case lru:fetch() of + {C,KL} when is_list(KL) -> + BC = integer_to_binary(C), + BK = pack_list_to_binary(KL,<<>>), + BD = <<"{",BC/binary,":",BK/binary,"}">>, + T:send(S,BD); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"CLEAN:SYNC",E/binary>>},[S,T]) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + T:setopts(S,[{active,once}]), + case lru:clean(sync) of + {{C,KL},R} when is_list(KL) andalso is_reference(R) -> + BC = integer_to_binary(C), + BK = pack_list_to_binary(KL,<<>>), + BD = <<"{",BC/binary,":",BK/binary,"}">>, + BR = list_to_binary(ref_to_list(R)), + T:send(S,<<"{","[",BD/binary,"]",":",BR/binary,"}">>), + {next_state,delete,[S,T,#{ref => BR, key => {C,KL}}],[{state_timeout,?TIMEOUT_STATE_DELETE,BR}]}; + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>), + keep_state_and_data + end; +common(info,{tcp,S,<<"CLEAN:ASYNC",E/binary>>},[S,T]) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + T:setopts(S,[{active,once}]), + case lru:clean() of + {C,KL} when is_list(KL) -> + BC = integer_to_binary(C), + BK = pack_list_to_binary(KL,<<>>), + BD = <<"{",BC/binary,":",BK/binary,"}">>, + T:send(S,BD); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,<<"CLEAN",":",_P/binary>>},[S,T]) -> + T:setopts(S,[{active,once}]), + T:send(S,<<"{","ERROR",":","EXPIRED_REF","}">>), + keep_state_and_data; +common(info,{tcp,S,<<"CLEAN",E/binary>>},[S,T]) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + T:setopts(S,[{active,once}]), + case lru:clean() of + {C,KL} when is_list(KL) -> + BC = integer_to_binary(C), + BK = pack_list_to_binary(KL,<<>>), + BD = <<"{",BC/binary,":",BK/binary,"}">>, + T:send(S,BD); + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>) + end, + keep_state_and_data; +common(info,{tcp,S,_B},[S,T]) -> + T:setopts(S,[{active,once}]), + T:send(S,<<"{","ERROR",":","UNKNOW_COMMAND","}">>), + keep_state_and_data. + +delete(state_timeout,BR,[S,T,#{ref := BR, key := _K}]) -> + {next_state,common,[S,T]}; +delete(info,{tcp,_S,<<"CLEAN:ASYNC",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"CLEAN:SYNC",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,S,<<"CLEAN",":",P/binary>>},[S,T,#{ref := BR, key := K}]) -> + T:setopts(S,[{active,once}]), + case break_binary_string(byte_size(P),P) =:= BR of + true -> + case lru:clean(list_to_ref(binary_to_list(BR)),K) of + ok -> + T:send(S,<<"OK">>), + {next_state,common,[S,T]}; + _ -> + T:send(S,<<"{","ERROR",":","UNKNOW_ERROR","}">>), + {next_state,common,[S,T]} + end; + false -> + T:send(S,<<"{","ERROR",":","UNKNOW_REF","}">>), + keep_state_and_data + end; +delete(info,{tcp,_S,<<"POINT:",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"CHEAT:",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"COUNT:",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"STATE",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"STORE",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"FETCH",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,_S,<<"CLEAN",E/binary>>},_StateData) when E =:= <<>> orelse E =:= <<"\r\n">> orelse E =:= <<"\n">> orelse E =:= <<"\r">> -> + {keep_state_and_data,[postpone]}; +delete(info,{tcp,S,_B},[S,T,_MD]) -> + T:setopts(S,[{active,once}]), + T:send(S,<<"{","ERROR",":","UNKNOW_COMMAND","}">>), + keep_state_and_data. + + +pack_list_to_binary([],B) -> <<"[",B/binary,"]">>; +pack_list_to_binary([H|T],B) -> + case B of + <<>> -> + pack_list_to_binary(T,<>); + B -> + pack_list_to_binary(T,<>) + end. + +break_binary_string(S,B) -> + case B of + <> -> + B1; + <> -> + B2; + <> -> + B3; + _ -> + B + end. diff --git a/src/lru_sup.erl b/src/lru_sup.erl new file mode 100644 index 0000000..eadbdab --- /dev/null +++ b/src/lru_sup.erl @@ -0,0 +1,95 @@ +-module(lru_sup). +-author('VSolenkov'). + +-behavior('supervisor'). + +-export([ + start_link/0 +]). +-export([ + stop/0 +]). + +-export([ + init/1 +]). +-include("include/lru.hrl"). + + +start_link() -> + {ok,PID} = supervisor:start_link({local,?MODULE},?MODULE,[?ETS_KEYS_STORE_TABLE_NAME]), + {ok,PID,[?ETS_KEYS_STORE_TABLE_NAME]}. + + +init(ETS_TABLES) -> + init_tables(ETS_TABLES), + {ok,{ + {rest_for_one,5,300},[ + { + lru,{lru,start_link,[]}, + permanent,5000,worker,[lru] + } + ] ++ + case application:get_env(lru,tcp,off) of + on -> + [ + { + ranch,{ranch_app,start,[0,0]}, + permanent,5000,worker,[ranch_app] + } + ] ++ + [ranch:child_spec( + {?MODULE,lru_protocol},ranch_tcp, + #{ + socket_opts => init_socket_opts(), + num_acceptors => init_acceptors(), + max_connections => init_connections() + },lru_protocol, + [])]; + _ -> + [] + end + }}. + + +stop() -> + exit(whereis(?MODULE),shutdown). + + +init_tables(ETS_TABLES) -> + case application:get_env(lru,ets_recovery,false) of + true -> + lists:foreach( + fun(T) -> + F = element(2,file:get_cwd()) ++ "/" ++ application:get_env(lru,ets_dir,"priv") ++ "/" ++ atom_to_list(T), + case filelib:is_file(F) of + true -> + ets:file2tab(element(2,file:get_cwd()) ++ "/" ++ application:get_env(lru,ets_dir,"priv") ++ "/" ++ atom_to_list(T)); + false -> + ets:new(T,[named_table,set,public]) + end + end, + ETS_TABLES); + _ -> + lists:foreach( + fun(T) -> + ets:new(T,[named_table,set,public]) + end, + ETS_TABLES) + end. + +init_socket_opts() -> + init_socket(). +init_socket() -> + case application:get_env(lru,mode,inet) of + unix -> init_unix(); + inet -> init_port() + end. +init_unix() -> + [{ip,{local,application:get_env(lru,unix)}},{port,0}]. +init_port() -> + [{ip,application:get_env(lru,ip,{127,0,0,1})},{port,application:get_env(lru,port,7777)}]. +init_acceptors() -> + application:get_env(lru,num_acceptors,100). +init_connections() -> + application:get_env(lru,max_connections,1024). diff --git a/src/lru_utils.erl b/src/lru_utils.erl new file mode 100644 index 0000000..5a440dc --- /dev/null +++ b/src/lru_utils.erl @@ -0,0 +1,88 @@ +-module('lru_utils'). +-author('VSolenkov'). + +-export([ + for/3, + key_validation/1, + ets_create/0, + ets_re_create/0, + ets_reset/1 +]). +-include("include/lru.hrl"). + + + +ets_reset(TL) -> + lists:foreach( + fun(T) -> + ets:whereis(T) =/= undefined andalso + ets:tab2file( + T, + element(2,file:get_cwd()) ++ "/" ++ application:get_env(lru,ets_dir,"priv") ++ "/" ++ atom_to_list(T), + [{sync,application:get_env(lru,ets_sync_reset,true)}]) + end, + TL). + +ets_delete(T) -> + catch ets:delete(T). + +ets_create() -> + NT = ets:new(?ETS_KEYS_FETCH_TABLE_NAME,?ETS_KEYS_FETCH_TABLE_OPTS), + case get(?ETS_KEYS_FETCH_TABLE_NAME) of + undefined -> skip; + OT -> + ets:info(OT) =/= undefined andalso ets_delete(OT) + end, + put(?ETS_KEYS_FETCH_TABLE_NAME,NT), + NT. + +ets_re_create() -> + NT = ets:new(?ETS_KEYS_FETCH_TABLE_NAME,?ETS_KEYS_FETCH_TABLE_OPTS), + case get(?ETS_KEYS_FETCH_TABLE_NAME) of + undefined -> skip; + OT -> + case ets:info(OT) of + undefined -> skip; + _ -> + ets:foldl( + fun({K,V},[]) -> + ets:insert(NT,{K,V}),[] + end, + [],OT), + catch ets:delete(OT) + end + end, + put(?ETS_KEYS_FETCH_TABLE_NAME,NT), + NT. + +key_validation(K) -> + BK = + if + is_integer(K) -> + integer_to_binary(K); + is_atom(K) -> + atom_to_binary(K); + is_list(K) -> + list_to_binary(K); + is_tuple(K) -> + term_to_binary(K); + is_binary(K) -> + K; + true -> + -1 + end, + case BK of + -1 -> + -1; + BK -> + case size(BK) > ?MAX_KEY_SIZE() of + true -> + -2; + false -> + BK + end + end. + +for(N,N,F) -> F(N); +for(I,N,_) when I > N -> null; +for(I,N,F) -> F(I),for(I+1,N,F).