Skip to content

Commit

Permalink
add watch and list_keys API to kv
Browse files Browse the repository at this point in the history
  • Loading branch information
RoadRunnr committed Oct 22, 2024
1 parent daee867 commit 3bb2b0e
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 31 deletions.
135 changes: 104 additions & 31 deletions src/nats_kv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

%% K/V store API
-export([
watch/4,
list_keys/3, list_keys/4,
get/3, get/4, get/5, get_msg/4,
put/4,
create/4, create/5,
Expand All @@ -51,6 +53,7 @@

-include_lib("kernel/include/logger.hrl").
-include_lib("enats/include/nats_stream.hrl").
-include("nats_kv.hrl").

-export_type([config/0]).

Expand Down Expand Up @@ -107,35 +110,12 @@
compression => boolean()
}.

%% one minute in nanoseconds (10^-9)
-define(MINUTE_NS, 60 * 1_000_000_000).
-define(BUCKET_NAME(Bucket), <<"KV_", Bucket/binary>>).
-define(SUBJECT_NAME(Bucket), <<"$KV.", Bucket/binary>>).
-define(SUBJECT_NAME(Bucket, KeyPart), <<"$KV.", Bucket/binary, $., KeyPart/binary>>).

%% Headers for published messages.
-define(MSG_ID_HDR, ~"Nats-Msg-Id").
-define(EXPECTED_STREAM_HDR, ~"Nats-Expected-Stream").
-define(EXPECTED_LAST_SEQ_HDR, ~"Nats-Expected-Last-Sequence").
-define(EXPECTED_LAST_SUBJ_SEQ_HDR, ~"Nats-Expected-Last-Subject-Sequence").
-define(EXPECTED_LAST_MSG_IDu_Hdr, ~"Nats-Expected-Last-Msg-Id").
-define(MSG_ROLLUP, ~"Nats-Rollup").

%% Rollups, can be subject only or all messages.
-define(MSG_ROLLUP_ALL, ~"all").
-define(MSG_ROLLUP_SUBJECT, ~"sub").

%% K/V operations
-define(KV_OP, ~"KV-Operation").
-define(KV_DEL, ~"DEL").
-define(KV_PURGE, ~"PURGE").

%%%===================================================================
%%% API
%%%===================================================================

get_bucket(Conn, Bucket)
when is_binary(Bucket) ->
when is_binary(Bucket) ->
get_bucket(Conn, Bucket, #{}).

get_bucket(Conn, Bucket, Opts)
Expand Down Expand Up @@ -199,6 +179,62 @@ delete_bucket(Conn, Bucket, Opts)
when is_binary(Bucket), is_map(Opts) ->
nats_stream:delete(Conn, ?BUCKET_NAME(Bucket), Opts).

-doc """
Watch for any updates to keys that match the keys argument which
could include wildcards. By default, the watcher will send the latest
value for each key and all future updates. Watch will send a nil
entry when it has received all initial values. There are a few ways
to configure the watcher:
- IncludeHistory will have the key watcher send all historical values
for each key (up to KeyValueMaxHistory).
- IgnoreDeletes will have the key watcher not pass any keys with
delete markers.
- UpdatesOnly will have the key watcher only pass updates on values
(without latest values when started).
- MetaOnly will have the key watcher retrieve only the entry meta
data, not the entry value.
- ResumeFromRevision instructs the key watcher to resume from a
specific revision number.
""".
watch(Conn, Bucket, WatchOpts, Opts)
when is_binary(Bucket), is_map(WatchOpts), is_map(Opts) ->
watch(Conn, Bucket, WatchOpts, Opts, [{spawn_opt, [link]}]).

watch(Conn, Bucket, WatchOpts, Opts, StartOpts)
when is_binary(Bucket), is_map(WatchOpts), is_map(Opts) ->
nats_kv_watch:start(Conn, Bucket, WatchOpts, Opts, StartOpts).

-doc """
ListKeys will return KeyLister, allowing to retrieve all keys from
the key value store in a streaming fashion (on a channel).
""".
list_keys(Conn, Bucket, WatchOpts) ->
list_keys(Conn, Bucket, WatchOpts, #{}).

list_keys(Conn, Bucket, WatchOpts0, Opts) ->
WatchOpts = WatchOpts0#{ignore_deletes => true,
cb => fun list_keys_watch_cb/3},
{ok, Pid} = watch(Conn, Bucket, WatchOpts, Opts),
list_keys_loop(Pid, []).

list_keys_watch_cb({init, Owner}, _Conn, _) ->
{continue, Owner};
list_keys_watch_cb(init_done, _Conn, Owner) ->
Owner ! {done, self()},
{stop, normal};
list_keys_watch_cb({msg, _, _, _} = Msg, _Conn, Owner) ->
Owner ! {'WATCH', self(), Msg},
{continue, Owner}.

list_keys_loop(Pid, Acc) ->
receive
{done, Pid} ->
{ok, lists:reverse(Acc)};
{'WATCH', Pid, {msg, Key, _Value, _Opts}} ->
list_keys_loop(Pid, [Key | Acc])
end.

-doc """
Get returns the latest value for the key. If the key does not exist,
ErrKeyNotFound will be returned.
Expand Down Expand Up @@ -231,8 +267,7 @@ get_msg(Conn, Bucket, Key, last, Opts) ->
get_last_msg_for_subject(Conn, Bucket, Key, Opts);
get_msg(Conn, Bucket, Key, SeqNo, Opts)
when is_integer(SeqNo) ->
get_msg(Conn, Bucket, #{last_by_subject => ?SUBJECT_NAME(Bucket, Key),
seq => SeqNo}, Opts).
get_msg(Conn, Bucket, #{last_by_subject => Key, seq => SeqNo}, Opts).

get_last_msg_for_subject(Conn, Bucket, Key, #{allow_direct := true} = Opts) ->
GetStr = <<?BUCKET_NAME(Bucket)/binary, $., ?SUBJECT_NAME(Bucket, Key)/binary>>,
Expand All @@ -244,20 +279,21 @@ get_last_msg_for_subject(Conn, Bucket, Key, #{allow_direct := true} = Opts) ->
Other
end;
get_last_msg_for_subject(Conn, Bucket, Key, Opts) ->
Req = #{last_by_subj => ?SUBJECT_NAME(Bucket, Key)},
get_msg(Conn, Bucket, Req, Opts).
get_msg(Conn, Bucket, #{last_by_subj => Key}, Opts).

get_msg(Conn, Bucket, Req, #{allow_direct := true} = Opts) ->
Topic = make_js_direct_api_topic(~"GET", ?BUCKET_NAME(Bucket), Opts),
case nats:request(Conn, Topic, json:encode(Req), Opts) of
JSON = json:encode(marshal_get_request(Bucket, Req)),
case nats:request(Conn, Topic, JSON, Opts) of
{ok, Response} ->
direct_msg_response(Response);
Other ->
Other
end;
get_msg(Conn, Bucket, Req, Opts) ->
Name = ?BUCKET_NAME(Bucket),
case nats_stream:msg_get(Conn, Name, Req, Opts) of
JSON = marshal_get_request(Bucket, Req),
case nats_stream:msg_get(Conn, Name, JSON, Opts) of
{ok, #{message := Msg} = Response} ->
{ok, Response#{message := get_response_msg(Msg)}};
Other ->
Expand Down Expand Up @@ -387,6 +423,29 @@ make_js_direct_api_topic(Op, Stream, #{domain := Domain}) ->
make_js_direct_api_topic(Op, Stream, _) ->
<<"$JS.API.DIRECT.", Op/binary, $., Stream/binary>>.

marshal_get_request(Bucket, Req) ->
maps:map(
fun(K, V)
when K =:= last_by_subj; K =:= next_by_subj ->
?SUBJECT_NAME(Bucket, V);
(K, V)
when K =:= start_time; K =:= up_to_time ->
if V < 0 ->
%% NATS uses this mean `never`
~"0001-01-01T00:00:00Z";
V >= 0 ->
iolist_to_binary(
calendar:system_time_to_rfc3339(V, [{unit, nanosecond}, {offset, "Z"}]))
end;
(multi_last, Subjects) ->
lists:map(fun(S) -> ?SUBJECT_NAME(Bucket, S) end, Subjects);
(batch, V) when V > 1 ->
?LOG(critical, "enats: batch get requests are not supported"),
V;
(_K, V) ->
V
end, Req).

to_atom(Bin) when is_binary(Bin) ->
try binary_to_existing_atom(Bin) catch _:_ -> Bin end.

Expand Down Expand Up @@ -419,9 +478,23 @@ unmarshal_response({Response, _Opts}) ->

direct_msg_response({#{error := Error}, _}) ->
{error, Error};

direct_msg_response({_Content,
#{header := <<"NATS/1.0 ", Code:3/bytes, " ", Rest/binary>>}}) ->
{Pos, _} = binary:match(Rest, <<"\r">>),
<<Msg:Pos/binary, "\r\n", _HdrStr/binary>> = Rest,

%% Headers = nats_hd:parse_headers(HdrStr),
Error0 = #{code => binary_to_integer(Code), description => Msg},
Error =
case Msg of
<<"Message Not Found">> -> Error0#{err_code => ?JS_ERR_CODE_MESSAGE_NOT_FOUND};
_ -> Error0
end,
{error, Error};

direct_msg_response({Content, #{header := <<"NATS/1.0\r\n", HdrStr/binary>>}}) ->
Headers = nats_hd:parse_headers(HdrStr),
?LOG(debug, "Headers: ~p", [Headers]),
Msg = lists:foldl(fun direct_msg_response_f/2,
#{data => Content, hdrs => Headers}, Headers),
{ok, #{message => Msg}}.
Expand Down
22 changes: 22 additions & 0 deletions src/nats_kv.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
%% one minute in nanoseconds (10^-9)
-define(MINUTE_NS, 60 * 1_000_000_000).
-define(BUCKET_NAME(Bucket), <<"KV_", Bucket/binary>>).
-define(SUBJECT_NAME(Bucket), <<"$KV.", Bucket/binary>>).
-define(SUBJECT_NAME(Bucket, KeyPart), <<"$KV.", Bucket/binary, $., KeyPart/binary>>).

%% Headers for published messages.
-define(MSG_ID_HDR, ~"Nats-Msg-Id").
-define(EXPECTED_STREAM_HDR, ~"Nats-Expected-Stream").
-define(EXPECTED_LAST_SEQ_HDR, ~"Nats-Expected-Last-Sequence").
-define(EXPECTED_LAST_SUBJ_SEQ_HDR, ~"Nats-Expected-Last-Subject-Sequence").
-define(EXPECTED_LAST_MSG_IDu_Hdr, ~"Nats-Expected-Last-Msg-Id").
-define(MSG_ROLLUP, ~"Nats-Rollup").

%% Rollups, can be subject only or all messages.
-define(MSG_ROLLUP_ALL, ~"all").
-define(MSG_ROLLUP_SUBJECT, ~"sub").

%% K/V operations
-define(KV_OP, ~"KV-Operation").
-define(KV_DEL, ~"DEL").
-define(KV_PURGE, ~"PURGE").
Loading

0 comments on commit 3bb2b0e

Please sign in to comment.