Skip to content

Commit

Permalink
Merge pull request #5 from erlcloud/merge_me
Browse files Browse the repository at this point in the history
Merge me - combination of two previous PR
  • Loading branch information
Evgeny Bob authored Dec 26, 2016
2 parents 39a3624 + d6c17a6 commit 476e950
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 96 deletions.
15 changes: 4 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
language: erlang
otp_release:
- 17.0-rc1
- R16B03-1
- R16B03
- R16B02
- R16B01
- R15B03
- R15B02
- R15B01
- R14B04
- R14B03
- R14B02
- R16B03-1
- 17.5
- 18.3
- 19.0
script: "make test"
29 changes: 12 additions & 17 deletions src/lhttpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ request(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{response, self(), {error, Reason}};
error:closed ->
{response, self(), {error, connection_closed}};
error:Reason ->
Class:Reason when Class == error; Class == exit ->
Stack = erlang:get_stacktrace(),
{response, self(), {error, {Reason, Stack}}}
end,
Expand All @@ -119,7 +119,7 @@ request(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
%% socket used is new, it also makes the pool gen_server its controlling process.
%% @end
%%------------------------------------------------------------------------------
execute(From, Host, Port, Ssl, Path, Method, Hdrs0, Body, Options) ->
execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
UploadWindowSize = proplists:get_value(partial_upload, Options),
PartialUpload = proplists:is_defined(partial_upload, Options),
PartialDownload = proplists:is_defined(partial_download, Options),
Expand All @@ -135,7 +135,6 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs0, Body, Options) ->
ProxyUrl when is_list(ProxyUrl) ->
lhttpc_lib:parse_url(ProxyUrl)
end,
Hdrs = lhttpc_lib:canonical_headers(Hdrs0),
{ChunkedUpload, Request} = lhttpc_lib:format_request(Path, NormalizedMethod,
Hdrs, Host, Port, Body, PartialUpload),
%SocketRequest = {socket, self(), Host, Port, Ssl},
Expand Down Expand Up @@ -433,7 +432,7 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
NewStatus = {NewStatusCode, Reason},
read_response(State, NewVsn, NewStatus, Hdrs);
{ok, {http_header, _, Name, _, Value}} ->
Header = lhttpc_lib:canonical_header({Name, Value}),
Header = {lhttpc_lib:maybe_atom_to_list(Name), Value},
read_response(State, Vsn, Status, [Header | Hdrs]);
{ok, http_eoh} when StatusCode >= 100, StatusCode =< 199 ->
% RFC 2616, section 10.1:
Expand Down Expand Up @@ -529,27 +528,23 @@ has_body(_, _, _) ->
%%------------------------------------------------------------------------------
%% @private
%% @doc Find out how to read the entity body from the request.
% * If Transfer-Encoding is set to chunked, we should read one chunk at
% the time, ignoring a Content-Length header if it is present in error
% * If we have a Content-Length, just use that and read the complete
% entity.
% * If Transfer-Encoding is set to chunked, we should read one chunk at
% the time
% * If neither of this is true, we need to read until the socket is
% closed (AFAIK, this was common in versions before 1.1).
%% @end
%%------------------------------------------------------------------------------
-spec body_type(headers()) -> 'chunked' | 'infinite' | {fixed_length, integer()}.
body_type(Hdrs) ->
case lhttpc_lib:header_value("content-length", Hdrs) of
undefined ->
TransferEncoding = string:to_lower(
lhttpc_lib:header_value("transfer-encoding", Hdrs, "undefined")
),
case TransferEncoding of
"chunked" -> chunked;
_ -> infinite
end;
ContentLength ->
{fixed_length, list_to_integer(ContentLength)}
TransferEncoding = string:tokens(string:to_lower(lhttpc_lib:header_value(
"transfer-encoding", Hdrs, "identity")), ", "),
ContentLength = lhttpc_lib:header_value("content-length", Hdrs),
case {lists:member("chunked", TransferEncoding), ContentLength} of
{true, _} -> chunked;
{false, undefined} -> infinite;
{false, _} -> {fixed_length, list_to_integer(ContentLength)}
end.

%%------------------------------------------------------------------------------
Expand Down
94 changes: 56 additions & 38 deletions src/lhttpc_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
normalize_method/1,
maybe_atom_to_list/1,
format_hdrs/1,
dec/1,
canonical_headers/1,
canonical_header/1
dec/1
]).

-include("lhttpc_types.hrl").
Expand Down Expand Up @@ -75,43 +73,30 @@ header_value(Hdr, Hdrs) ->
%% @doc
%% Returns the value associated with the `Header' in `Headers'.
%% `Header' must be a lowercase string, since every header is mangled to
%% check the match. `Headres' must be canonical.
%% If no match is found, `Default' is returned.
%% check the match. If no match is found, `Default' is returned.
%% @end
%%------------------------------------------------------------------------------
-spec header_value(string(), headers(), term()) -> term().
header_value(Hdr, Headers, Default) ->
case lists:keyfind(Hdr, 1, Headers) of
header_value(Hdr, [{Hdr, Value} | _], _) ->
case is_list(Value) of
true -> string:strip(Value);
false -> Value
end;
header_value(Hdr, [{ThisHdr, Value}| Hdrs], Default) when is_atom(ThisHdr) ->
header_value(Hdr, [{atom_to_list(ThisHdr), Value}| Hdrs], Default);
header_value(Hdr, [{ThisHdr, Value}| Hdrs], Default) when is_binary(ThisHdr) ->
header_value(Hdr, [{binary_to_list(ThisHdr), Value}| Hdrs], Default);
header_value(Hdr, [{ThisHdr, Value}| Hdrs], Default) ->
case string:equal(string:to_lower(ThisHdr), Hdr) of
true -> case is_list(Value) of
true -> string:strip(Value);
false -> Value
end;
false ->
Default;
{_, Value} when is_list(Value) ->
string:strip(Value);
{_, Value} ->
%% ransomr: not sure why we only need to strip list values, but
%% but leaving as-is
Value
end.

%%------------------------------------------------------------------------------
%% @doc
%% @end
%%------------------------------------------------------------------------------
canonical_headers(Headers) ->
[canonical_header(Header) || Header <- Headers].

%%------------------------------------------------------------------------------
%% @doc
%% @end
%%------------------------------------------------------------------------------
canonical_header({Name, Value}) ->
{canonical_header_name(Name), Value}.

canonical_header_name(Name) when is_list(Name) ->
string:to_lower(Name);
canonical_header_name(Name) when is_atom(Name) ->
canonical_header_name(atom_to_list(Name));
canonical_header_name(Name) when is_binary(Name) ->
canonical_header_name(binary_to_list(Name)).
header_value(Hdr, Hdrs, Default)
end;
header_value(_, [], Default) ->
Default.

%%------------------------------------------------------------------------------
%% @spec (Item) -> OtherItem
Expand Down Expand Up @@ -205,7 +190,8 @@ dec(Else) -> Else.
%%------------------------------------------------------------------------------
-spec format_hdrs(headers()) -> [string()].
format_hdrs(Headers) ->
format_hdrs(Headers, []).
NormalizedHeaders = normalize_headers(Headers),
format_hdrs(NormalizedHeaders, []).

%%==============================================================================
%% Internal functions
Expand Down Expand Up @@ -299,14 +285,46 @@ split_port(_,[$/ | _] = Path, Port) ->
split_port(Scheme, [P | T], Port) ->
split_port(Scheme, T, [P | Port]).

%%------------------------------------------------------------------------------
%% @private
%% @spec normalize_headers(RawHeaders) -> Headers
%% RawHeaders = [{atom() | binary() | string(), binary() | string()}]
%% Headers = headers()
%% @doc Turns the headers into binaries suitable for inclusion in a HTTP request
%% line.
%% @end
%%------------------------------------------------------------------------------
-spec normalize_headers(raw_headers()) -> headers().
normalize_headers(Headers) ->
normalize_headers(Headers, []).

%%------------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%%------------------------------------------------------------------------------
-spec normalize_headers(raw_headers(), headers()) -> headers().
normalize_headers([{Header, Value} | T], Acc) when is_list(Header) ->
NormalizedHeader = try list_to_existing_atom(Header)
catch
error:badarg -> Header
end,
NewAcc = [{NormalizedHeader, Value} | Acc],
normalize_headers(T, NewAcc);
normalize_headers([{Header, Value} | T], Acc) ->
NewAcc = [{Header, Value} | Acc],
normalize_headers(T, NewAcc);
normalize_headers([], Acc) ->
Acc.

%%------------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%%------------------------------------------------------------------------------
format_hdrs([{Hdr, Value} | T], Acc) ->
NewAcc =
[Hdr, ": ", Value, "\r\n" | Acc],
[maybe_atom_to_list(Hdr), ": ", maybe_atom_to_list(Value), "\r\n" | Acc],
format_hdrs(T, NewAcc);
format_hdrs([], Acc) ->
[Acc, "\r\n"].
Expand Down
18 changes: 13 additions & 5 deletions src/lhttpc_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ client_done(Pool, Host, Port, Ssl, Socket) ->
%%------------------------------------------------------------------------------
-spec init(any()) -> {ok, #httpc_man{}}.
init(Options) ->
process_flag(trap_exit, true),
process_flag(priority, high),
case lists:member({seed,1}, ssl:module_info(exports)) of
true ->
Expand Down Expand Up @@ -313,11 +314,16 @@ handle_call({connection_count, Destination}, _, State) ->
handle_call({done, Host, Port, Ssl, Socket}, {Pid, _} = From, State) ->
gen_server:reply(From, ok),
Dest = {Host, Port, Ssl},
{Dest, MonRef} = dict:fetch(Pid, State#httpc_man.clients),
true = erlang:demonitor(MonRef, [flush]),
Clients2 = dict:erase(Pid, State#httpc_man.clients),
State2 = deliver_socket(Socket, Dest, State#httpc_man{clients = Clients2}),
{noreply, State2};
case dict:find(Pid, State#httpc_man.clients) of
{ok, {Dest, MonRef}} ->
true = erlang:demonitor(MonRef, [flush]),
Clients2 = dict:erase(Pid, State#httpc_man.clients),
State2 = deliver_socket(Socket, Dest, State#httpc_man{clients = Clients2}),
{noreply, State2};
error ->
lhttpc_sock:close(Socket, Ssl),
{noreply, State}
end;
handle_call(_, _, State) ->
{reply, {error, unknown_request}, State}.

Expand Down Expand Up @@ -361,6 +367,8 @@ handle_info({'DOWN', MonRef, process, Pid, _Reason}, State) ->
State2 = State#httpc_man{queues = Queues2, clients = Clients2},
{noreply, monitor_client(Dest, From, State2)}
end;
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {received_exit, Pid, Reason}, State};
handle_info(_, State) ->
{noreply, State}.

Expand Down
33 changes: 29 additions & 4 deletions src/lhttpc_sock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
close/2
]).

-define(OTP_TCP_LIMIT, 67108864).

-include("lhttpc_types.hrl").

%%==============================================================================
Expand Down Expand Up @@ -103,11 +105,34 @@ recv(Socket, false) ->
%% @end
%%------------------------------------------------------------------------------
-spec recv(socket(), integer(), boolean()) -> {ok, any()} | {error, atom()}.
recv(_, 0, _) ->
{ok, <<>>};
recv(Socket, Length, true) ->
recv(Socket, Length, SslFlag) ->
recv(Socket, Length, SslFlag, <<>>).
-spec recv(socket(), integer(), boolean(), binary()) ->
{ok, any()} | {error, atom()}.
recv(_, 0, _, Accum) ->
{ok, Accum};
%% HACK: gen_tcp:recv limits transfers at 64M for some reason
recv(Socket, Length, SslFlag, Accum) when (Length > ?OTP_TCP_LIMIT) ->
case recv_len(Socket, ?OTP_TCP_LIMIT, SslFlag) of
{ok, Part} ->
Accum0 = <<Accum/binary, Part/binary>>,
Length0 = Length-?OTP_TCP_LIMIT,
recv(Socket, Length0, SslFlag, Accum0);
Error ->
Error
end;
recv(Socket, Length, SslFlag, Accum) ->
case recv_len(Socket, Length, SslFlag) of
{ok, Part} ->
Accum0 = <<Accum/binary, Part/binary>>,
{ok, Accum0};
Error ->
Error
end.
-spec recv_len(socket(), integer(), boolean()) -> {ok, any()} | {error, atom()}.
recv_len(Socket, Length, true) ->
ssl:recv(Socket, Length);
recv(Socket, Length, false) ->
recv_len(Socket, Length, false) ->
gen_tcp:recv(Socket, Length).

%%------------------------------------------------------------------------------
Expand Down
33 changes: 22 additions & 11 deletions test/lhttpc_manager_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@
%%% Eunit setup stuff

start_app() ->
application:start(crypto),
application:start(asn1),
application:start(public_key),
ok = application:start(ssl),
_ = application:load(lhttpc),
ok = application:set_env(lhttpc, pool_size, 3),
ok = application:start(lhttpc).
{ok, _} = application:ensure_all_started(lhttpc),
ok.

stop_app(_) ->
ok = application:stop(lhttpc),
Expand All @@ -56,7 +53,8 @@ manager_test_() ->
?_test(one_socket()),
{timeout, 60, ?_test(connection_timeout())},
{timeout, 60, ?_test(many_sockets())},
{timeout, 60, ?_test(closed_race_cond())}
{timeout, 60, ?_test(closed_race_cond(true))},
{timeout, 60, ?_test(closed_race_cond(false))}
]}
}.

Expand Down Expand Up @@ -280,7 +278,7 @@ many_sockets() ->
unlink(whereis(lhttpc_manager)),
ok.

closed_race_cond() ->
closed_race_cond(CloseBeforeDispatch) ->
LS = socket_server:listen(),
link(whereis(lhttpc_manager)), % want to make sure it doesn't crash
?assertEqual(0, lhttpc_manager:connection_count(lhttpc_manager)),
Expand All @@ -290,7 +288,7 @@ closed_race_cond() ->

Result1 = connect_client(Client),
?assertMatch({ok, _}, Result1),
{ok, Socket} = Result1,
{ok, _} = Result1,
?assertEqual(ok, ping_client(Client)),

?assertEqual(0, lhttpc_manager:connection_count(lhttpc_manager)),
Expand All @@ -301,13 +299,27 @@ closed_race_cond() ->
ManagerPid = whereis(lhttpc_manager),
true = erlang:suspend_process(ManagerPid),

case CloseBeforeDispatch of
true ->
gen_tcp:close(LS), % a closed message should be sent to the manager
timer:sleep(1000);
false -> ok
end,

Pid = self(),
spawn_link(fun() ->
Pid ! {result, client_peek_socket(Client)}
end),

erlang:yield(), % make sure that the spawned process has run
gen_tcp:close(Socket), % a closed message should be sent to the manager
timer:sleep(1000),

case CloseBeforeDispatch of
true -> ok;
false ->
gen_tcp:close(LS), % a closed message should be sent to the client
timer:sleep(1000)
end,

true = erlang:resume_process(ManagerPid),

Result2 = receive
Expand All @@ -319,7 +331,6 @@ closed_race_cond() ->
?assertEqual(0, lhttpc_manager:connection_count(lhttpc_manager)),

?assertEqual(ok, stop_client(Client)),
catch gen_tcp:close(LS),
unlink(whereis(lhttpc_manager)),
ok.

Expand Down
Loading

0 comments on commit 476e950

Please sign in to comment.