-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstore.erl
143 lines (124 loc) · 4.72 KB
/
store.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
-module (store).
-behaviour (gen_server).
-define(SERVER, ?MODULE).
-define(GARBAGE_DELAY,1000).
-export([start/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
start() ->
{_, Pid} = gen_server:start_link(?MODULE, #{}, []),
Pid.
init(Partition) -> {ok, Partition}.
% Callback Routines
handle_call({gc}, _From, Partition) ->
PrunedPart = gc(maps:keys(Partition), Partition, timestamp()),
{reply, ok, PrunedPart};
handle_call({up, Key, Value}, _From, Partition) ->
NewPart = update_partition(Partition, Key, Value),
{reply, ok, NewPart};
handle_call({read, SnapshotTime, Key}, Manager, Partition) ->
try
#{Key := Submap} = Partition,
case SnapshotTime =< timestamp() of
true ->
Val = process_read(Submap, SnapshotTime),
{reply, Val, Partition};
% We store request in buffer for further treatment
false ->
%UpdatedBuffer = [{Manager, SnapshotTime, Key} | Buffer],
Delay = SnapshotTime - timestamp(),
erlang:send_after(Delay, self(), {retransmit, Manager, SnapshotTime, Key}),
{noreply, Partition}
end
catch
error:_ ->
{reply, nil, Partition}
end.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info({retransmit, Manager, SnapshotTime, Key}, [Partition, Buffer]) ->
#{Key := Submap} = Partition,
Val = process_read(Submap, SnapshotTime),
gen_server:reply(Manager, Val),
{noreply, [Partition, Buffer]};
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%----------------------------------------------------------------------
%% Function: process_read/3
%% Purpose: From a preselected Map of a certain Key, get the most recent
%% value for the SnapshotTime and send to Manager
%% Args: Map : A submap of key/val which are timestamps/vals
%% SnapshotTime : The SnapshotTime from Manager
%% Manager : ManagerPID
%% Returns: N/A
%%--------------
process_read(Map, SnapshotTime) ->
Timestamps = lists:reverse(maps:keys(Map)),
read_most_recent_value(Map, Timestamps, SnapshotTime).
%%----------------------------------------------------------------------
%% Function: read_most_recent_value/3
%% Purpose: Read the most recent version of an item in the data store
%% Args: Map, a map of Timestamp/Value pairs
%% List, a list of Timestamp, from the highest to lowest
%% SnapshotTime, the ManagerTimeStamp
%% Returns: Value from the most recent TimeStamp
%%--------------
read_most_recent_value(Map, [Head | Tail], SnapshotTime) ->
case Head =< SnapshotTime of
true -> maps:get(Head, Map);
false -> read_most_recent_value(Map, Tail, SnapshotTime)
end;
read_most_recent_value(Map, Elem, _) -> maps:get(Elem, Map).
%%----------------------------------------------------------------------
%% Function: update_partition/3
%% Purpose: Updates the held partition with new Key/Value
%% Args: Current partitions
%% Key,
%% Value
%% Returns: Updated Partition
%%--------------
update_partition(Partition, Key, Value) ->
try
#{Key := Submap} = Partition,
UpdatedSubmap = Submap#{timestamp() => Value},
Partition#{Key => UpdatedSubmap}
catch
error:_ ->
Partition#{Key => #{timestamp() => Value}}
end.
%%----------------------------------------------------------------------
%% Function: gc/3
%% Purpose: Start a garbage collection on the data store excluding the most recent tuples
%% Args: Keys of the data store
%% Data store,
%% Timestamp of the transaction
%% Returns: Pruned partition
%%--------------
gc([], Partition, _) -> Partition;
gc([Key|T], Partition, Now) ->
#{Key := Submap} = Partition,
[_|Timestamps] = lists:reverse(maps:keys(Submap)),
PrunedSubmap = prune_data(Submap, Timestamps, Now),
gc(T, Partition#{Key => PrunedSubmap}, Now).
%%----------------------------------------------------------------------
%% Function: prune_data/3
%% Purpose: Remove previous version of an item if they are older than X seconds
%% Args: Map with timestamps as key and a string value
%% Keys of the map,
%% Timestamp of the transaction
%% Returns: Pruned submap
prune_data(Submap, [], _) -> Submap;
prune_data(Submap, [Timestamp|T], Now) ->
if
Now - Timestamp > ?GARBAGE_DELAY ->
prune_data(maps:remove(Timestamp, Submap), T, Now);
true ->
prune_data(Submap, T, Now)
end.
%%----------------------------------------------------------------------
%% Function: timestamp/0
%% Purpose: Return system time as timestamp
%% Returns: timestamp
%%----------------------------------------------------------------------
timestamp() ->
os:system_time(millisecond).