-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add latency stats for riak pb client operations [JIRA: RCS-243] #1189
Changes from all commits
dc637e1
3874b3b
1e2164d
374d8f4
6c6735b
bfe6871
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,19 +194,20 @@ handle_cast({put_block, ReplyPid, Bucket, Key, UUID, BlockNumber, Value, BCSum}, | |
[Bucket, Key, UUID, BlockNumber, Error]) | ||
end, | ||
%% TODO: Handle put failure here. | ||
ok = do_put_block(FullBucket, FullKey, <<>>, Value, MD, RcPid, FailFun), | ||
ok = do_put_block(FullBucket, FullKey, <<>>, Value, MD, | ||
RcPid, [riakc, put_block], FailFun), | ||
riak_cs_put_fsm:block_written(ReplyPid, BlockNumber), | ||
dt_return(<<"put_block">>, [BlockNumber], [Bucket, Key]), | ||
{noreply, State}; | ||
handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#state{riak_client=RcPid}) -> | ||
dt_entry(<<"delete_block">>, [BlockNumber], [Bucket, Key]), | ||
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber), | ||
StartTime = os:timestamp(), | ||
Timeout = riak_cs_config:get_block_timeout(), | ||
|
||
%% do a get first to get the vclock (only do a head request though) | ||
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}, head], | ||
_ = case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, Timeout) of | ||
GetOptions = [head | r_one_options()], | ||
_ = case riak_cs_pbc:get(block_pbc(RcPid), FullBucket, FullKey, | ||
GetOptions, Timeout, [riakc, head_block]) of | ||
{ok, RiakObject} -> | ||
ok = delete_block(RcPid, ReplyPid, RiakObject, {UUID, BlockNumber}); | ||
{error, notfound} -> | ||
|
@@ -215,7 +216,6 @@ handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#sta | |
%% move on to the next block. | ||
riak_cs_delete_fsm:block_deleted(ReplyPid, {ok, {UUID, BlockNumber}}) | ||
end, | ||
ok = riak_cs_stats:update_with_start([block, delete], StartTime), | ||
dt_return(<<"delete_block">>, [BlockNumber], [Bucket, Key]), | ||
{noreply, State}; | ||
handle_cast(_Msg, State) -> | ||
|
@@ -229,11 +229,11 @@ get_block(ReplyPid, Bucket, Key, ClusterId, BagId, UUID, BlockNumber, RcPid) -> | |
|
||
case riak_cs_utils:n_val_1_get_requests() of | ||
true -> | ||
do_get_block(ReplyPid, Bucket, Key, ClusterId, UseProxyGet, ProxyActive, UUID, | ||
BlockNumber, RcPid); | ||
do_get_block(ReplyPid, Bucket, Key, ClusterId, UseProxyGet, ProxyActive, | ||
UUID, BlockNumber, RcPid); | ||
false -> | ||
normal_nval_block_get(ReplyPid, Bucket, Key, ClusterId, | ||
UseProxyGet, UUID, BlockNumber, RcPid) | ||
get_block_legacy(ReplyPid, Bucket, Key, ClusterId, | ||
UseProxyGet, UUID, BlockNumber, RcPid) | ||
end. | ||
|
||
do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive, | ||
|
@@ -264,21 +264,17 @@ do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive, | |
dt_entry(<<"get_block">>, [BlockNumber], [Bucket, Key]), | ||
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber), | ||
|
||
StartTime = os:timestamp(), | ||
GetOptions1 = n_val_one_options(), | ||
GetOptions2 = r_one_options(), | ||
|
||
ProceedFun = fun(OkReply) -> | ||
ok = riak_cs_stats:update_with_start([block, get, retry], StartTime), | ||
ok = riak_cs_get_fsm:chunk(ReplyPid, {UUID, BlockNumber}, OkReply), | ||
dt_return(<<"get_block">>, [BlockNumber], [Bucket, Key]) | ||
end, | ||
RetryFun = fun(NewPause) -> | ||
ok = riak_cs_stats:update_with_start([block, get, retry], StartTime), | ||
do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, | ||
ProxyActive, UUID, BlockNumber, RcPid, MaxRetries, NewPause) | ||
end, | ||
|
||
Timeout = riak_cs_config:local_get_block_timeout(), | ||
try_local_get(RcPid, FullBucket, FullKey, GetOptions1, GetOptions2, | ||
Timeout, ProceedFun, RetryFun, ErrorReasons, UseProxyGet, | ||
|
@@ -287,7 +283,8 @@ do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive, | |
try_local_get(RcPid, FullBucket, FullKey, GetOptions1, GetOptions2, | ||
Timeout, ProceedFun, RetryFun, ErrorReasons, UseProxyGet, | ||
ProxyActive, ClusterID) -> | ||
case get_block_local(RcPid, FullBucket, FullKey, GetOptions1, Timeout) of | ||
case get_block_local(RcPid, FullBucket, FullKey, GetOptions1, Timeout, | ||
[riakc, get_block_n_one]) of | ||
{ok, _} = Success -> | ||
ProceedFun(Success); | ||
{error, {insufficient_vnodes,_,need,_} = Reason} -> | ||
|
@@ -311,18 +308,19 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2, | |
ProxyActive, ClusterID) -> | ||
|
||
Timeout = riak_cs_config:get_block_timeout(), | ||
case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, Timeout) of | ||
case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, Timeout, | ||
[riakc, get_block_n_all]) of | ||
{ok, _} = Success -> | ||
ProceedFun(Success); | ||
|
||
{error, Why} when Why == disconnected; | ||
Why == timeout -> | ||
_ = lager:debug("get_block_local/5 failed: {error, ~p}", [Why]), | ||
_ = lager:debug("get_block_local() failed: {error, ~p}", [Why]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this changed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The arity does not have information. I like log to be minimal but sufficient. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest, I want to use standard or de-fact notation to refer a function that does not specify (or emphasize) its arity. Sometimes, Parenthesis with empty args may be not familiar.. Hmm... I saw the notation in exometer_core README. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I won't make change about this. |
||
RetryFun([{local_quorum, Why}|ErrorReasons]); | ||
|
||
{error, notfound} when UseProxyGet andalso ProxyActive-> | ||
case get_block_remote(RcPid, FullBucket, FullKey, | ||
ClusterID, GetOptions2) of | ||
case get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions2, | ||
[riakc, get_block_remote]) of | ||
{ok, _} = Success -> | ||
ProceedFun(Success); | ||
{error, Reason} -> | ||
|
@@ -339,26 +337,27 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2, | |
RetryFun({failure, [{local_quorum, Other}|ErrorReasons]}) | ||
end. | ||
|
||
get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout) -> | ||
case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, Timeout) of | ||
-spec get_block_local(riak_client(), binary(), binary(), list(), | ||
timeout(), riak_cs_stats:key()) -> | ||
{ok, binary()} | {error, term()}. | ||
get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout, StatsKey) -> | ||
case riak_cs_pbc:get(block_pbc(RcPid), FullBucket, FullKey, | ||
GetOptions, Timeout, StatsKey) of | ||
{ok, RiakObject} -> | ||
resolve_block_object(RiakObject, RcPid); | ||
%% %% Corrupted siblings hack: just add another.... | ||
%% [{MD,V}] = riakc_obj:get_contents(RiakObject), | ||
%% RiakObject2 = setelement(5, RiakObject, [{MD, <<"foobar">>}, {MD, V}]), | ||
%% resolve_block_object(RiakObject2, RcPid); | ||
Else -> | ||
Else | ||
end. | ||
|
||
-spec get_block_remote(riak_client(), binary(), binary(), binary(), get_options()) -> | ||
-spec get_block_remote(riak_client(), binary(), binary(), binary(), get_options(), | ||
riak_cs_stats:key()) -> | ||
{ok, binary()} | {error, term()}. | ||
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0) -> | ||
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0, StatsKey) -> | ||
%% replace get_block_timeout with proxy_get_block_timeout | ||
GetOptions = proplists:delete(timeout, GetOptions0), | ||
Timeout = riak_cs_config:proxy_get_block_timeout(), | ||
case riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, | ||
ClusterID, GetOptions, Timeout) of | ||
case riak_cs_pbc:repl_get(block_pbc(RcPid), FullBucket, FullKey, | ||
ClusterID, GetOptions, Timeout, StatsKey) of | ||
{ok, RiakObject} -> | ||
resolve_block_object(RiakObject, RcPid); | ||
Else -> | ||
|
@@ -367,30 +366,24 @@ get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0) -> | |
|
||
%% @doc This is the 'legacy' block get, before we introduced the ability | ||
%% to modify n-val per GET request. | ||
normal_nval_block_get(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, UUID, | ||
BlockNumber, RcPid) -> | ||
dt_entry(<<"get_block">>, [BlockNumber], [Bucket, Key]), | ||
|
||
get_block_legacy(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, UUID, | ||
BlockNumber, RcPid) -> | ||
dt_entry(<<"get_block_legacy">>, [BlockNumber], [Bucket, Key]), | ||
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber), | ||
StartTime = os:timestamp(), | ||
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}], | ||
Object = case UseProxyGet of | ||
false -> | ||
LocalTimeout = riak_cs_config:get_block_timeout(), | ||
riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, LocalTimeout); | ||
true -> | ||
RemoteTimeout = riak_cs_config:proxy_get_block_timeout(), | ||
riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, ClusterID, GetOptions, RemoteTimeout) | ||
end, | ||
ChunkValue = case Object of | ||
{ok, RiakObject} -> | ||
{ok, riakc_obj:get_value(RiakObject)}; | ||
{error, notfound}=NotFound -> | ||
NotFound | ||
end, | ||
ok = riak_cs_stats:update_with_start([block, get], StartTime), | ||
GetOptions = r_one_options(), | ||
ChunkValue = | ||
case UseProxyGet of | ||
false -> | ||
LocalTimeout = riak_cs_config:get_block_timeout(), | ||
StatsKey = [riakc, get_block_legacy], | ||
get_block_local(block_pbc(RcPid), FullBucket, FullKey, GetOptions, | ||
LocalTimeout, StatsKey); | ||
true -> | ||
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions, | ||
[riakc, get_block_legacy_remote]) | ||
end, | ||
ok = riak_cs_get_fsm:chunk(ReplyPid, {UUID, BlockNumber}, ChunkValue), | ||
dt_return(<<"get_block">>, [BlockNumber], [Bucket, Key]). | ||
dt_return(<<"get_block_legacy">>, [BlockNumber], [Bucket, Key]). | ||
|
||
delete_block(RcPid, ReplyPid, RiakObject, BlockId) -> | ||
Result = constrained_delete(RcPid, RiakObject, BlockId), | ||
|
@@ -400,14 +393,17 @@ delete_block(RcPid, ReplyPid, RiakObject, BlockId) -> | |
|
||
constrained_delete(RcPid, RiakObject, BlockId) -> | ||
DeleteOptions = [{r, all}, {pr, all}, {w, all}, {pw, all}], | ||
StatsKey = [riakc, delete_block_constrained], | ||
Timeout = riak_cs_config:delete_block_timeout(), | ||
format_delete_result( | ||
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions, Timeout), | ||
BlockId). | ||
riak_cs_pbc:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions, | ||
Timeout, StatsKey), | ||
BlockId). | ||
|
||
secondary_delete_check({error, {unsatisfied_constraint, _, _}}, RcPid, RiakObject) -> | ||
Timeout = riak_cs_config:delete_block_timeout(), | ||
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, [], Timeout); | ||
StatsKey = [riakc, delete_block_secondary], | ||
riak_cs_pbc:delete_obj(block_pbc(RcPid), RiakObject, [], Timeout, StatsKey); | ||
secondary_delete_check({error, Reason} = E, _, _) -> | ||
_ = lager:warning("Constrained block deletion failed. Reason: ~p", [Reason]), | ||
E; | ||
|
@@ -497,7 +493,7 @@ find_md_usermeta(MD) -> | |
{ok, binary()} | {error, notfound}. | ||
resolve_block_object(RObj, RcPid) -> | ||
{{MD, Value}, NeedRepair} = | ||
riak_cs_utils:resolve_robj_siblings(riakc_obj:get_contents(RObj)), | ||
riak_cs_utils:resolve_robj_siblings(riakc_obj:get_contents(RObj)), | ||
_ = if NeedRepair andalso is_binary(Value) -> | ||
RBucket = riakc_obj:bucket(RObj), | ||
RKey = riakc_obj:key(RObj), | ||
|
@@ -513,13 +509,12 @@ resolve_block_object(RObj, RcPid) -> | |
Bucket = proplists:get_value(<<?USERMETA_BUCKET>>, S3Info), | ||
Key = proplists:get_value(<<?USERMETA_KEY>>, S3Info), | ||
VClock = riakc_obj:vclock(RObj), | ||
FailFun = | ||
fun(Error) -> | ||
_ = lager:error("Put S3 ~p ~p Riak ~p ~p failed: ~p\n", | ||
[Bucket, Key, RBucket, RKey, Error]) | ||
FailFun = fun(Error) -> | ||
_ = lager:error("Put S3 ~p ~p Riak ~p ~p failed: ~p\n", | ||
[Bucket, Key, RBucket, RKey, Error]) | ||
end, | ||
do_put_block(RBucket, RKey, VClock, Value, MD, RcPid, | ||
FailFun); | ||
[riakc, put_block_resolved], FailFun); | ||
NeedRepair andalso not is_binary(Value) -> | ||
_ = lager:error("All checksums fail: ~P\n", [RObj, 200]); | ||
true -> | ||
|
@@ -534,15 +529,13 @@ resolve_block_object(RObj, RcPid) -> | |
make_md_usermeta(Props) -> | ||
dict:from_list([{?MD_USERMETA, Props}]). | ||
|
||
do_put_block(FullBucket, FullKey, VClock, Value, MD, RcPid, FailFun) -> | ||
do_put_block(FullBucket, FullKey, VClock, Value, MD, RcPid, StatsKey, FailFun) -> | ||
RiakObject0 = riakc_obj:new(FullBucket, FullKey, Value), | ||
RiakObject = riakc_obj:set_vclock( | ||
riakc_obj:update_metadata(RiakObject0, MD), VClock), | ||
Timeout = riak_cs_config:put_block_timeout(), | ||
StartTime = os:timestamp(), | ||
case riakc_pb_socket:put(block_pbc(RcPid), RiakObject, Timeout) of | ||
case riak_cs_pbc:put(block_pbc(RcPid), RiakObject, Timeout, StatsKey) of | ||
ok -> | ||
ok = riak_cs_stats:update_with_start([block, put], StartTime), | ||
ok; | ||
Else -> | ||
_ = FailFun(Else), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: basho/riak-erlang-client#219
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still open. Should be reviewed.