diff --git a/lib/meetup.ex b/lib/meetup.ex index 392771a..0c0029e 100644 --- a/lib/meetup.ex +++ b/lib/meetup.ex @@ -10,6 +10,7 @@ defmodule Meetup do # Start the endpoint when the application starts supervisor(Meetup.Endpoint, []), supervisor(MeetupApi.Supervisor, []), + supervisor(Meetup.ChannelClient.Supervisor, []), worker(MeetupApi.V3.RequestCache, []) ] diff --git a/lib/meetup/channel_client/cache.ex b/lib/meetup/channel_client/cache.ex new file mode 100644 index 0000000..94b2e19 --- /dev/null +++ b/lib/meetup/channel_client/cache.ex @@ -0,0 +1,36 @@ +defmodule Meetup.ChannelClient.Cache do + @moduledoc """ + This module is used to lookup for running Meetup.ChannelClient.Server for a given client +and group channel or start one + """ + use GenServer + require Logger + + def start_link do + Logger.info("#{__MODULE__} started") + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + def server_process(client, channel) do + case Meetup.ChannelClient.Server.whereis(client, channel) do + :undefined -> + GenServer.call(__MODULE__, {:server_process, client, channel}) + pid -> pid + end + end + + def init(_) do + {:ok, nil} + end + + def handle_call({:server_process, client, channel}, _, state) do + server_pid = + case Meetup.ChannelClient.Server.whereis(client, channel) do + :undefined -> + {:ok, pid} = Meetup.ChannelClient.ServerSupervisor.start_child(client, channel) + pid + pid -> pid + end + {:reply, server_pid, state} + end +end diff --git a/lib/meetup/channel_client/server.ex b/lib/meetup/channel_client/server.ex new file mode 100644 index 0000000..77e1365 --- /dev/null +++ b/lib/meetup/channel_client/server.ex @@ -0,0 +1,122 @@ +defmodule Meetup.ChannelClient.Server do + @moduledoc false + use GenServer + require Logger + + alias MeetupApi.V3.Profile + + defmodule State do + @moduledoc false + defstruct [:socket, :member_ids, :members, :token, :channel] + + def put_socket(state, socket), do: %State{state | socket: socket} + def put_member_ids(state, ids), do: %State{state | member_ids: ids} + def add_member(state, m), do: %State{state | members: [m | state.members]} + end + + @user_expiration_time Application.get_env(:meetup, :user_expiration_time) * 1000 + + # Client API + + def start_link(client, channel) do + Logger.info("#{__MODULE__} for client='#{client}' and channel='#{channel}' started") + initial_state = %State{socket: nil, member_ids: [], members: [], token: client, channel: channel} + GenServer.start_link(__MODULE__, initial_state, name: via_tuple(client, channel)) + end + + defp via_tuple(client, channel) do + {:via, :gproc, {:n, :l, {:meetup_channel_client_server, client, channel}}} + end + + def whereis(client, channel) do + :gproc.whereis_name({:n, :l, {:meetup_channel_client_server, client, channel}}) + end + + def join(pid, socket) do + GenServer.cast(pid, {:join, socket}) + end + + def member_result(pid, result) do + GenServer.cast(pid, {:member_result, result}) + end + + # Genserver callbacks + + def init(state) do + send(self(), :after_init) + Process.send_after(self(), :clean_up, @user_expiration_time) + {:ok, state} + end + + def handle_cast({:join, socket}, state) do + Logger.info(":join received in #{__MODULE__} with token=#{state.token} and members=#{inspect state.member_ids}") + state = + state + |> State.put_socket(socket) + |> send_total_members() + |> send_all_members() + {:noreply, state} + end + def handle_cast({:member_result, member}, state) do + Logger.info(":member_result received in #{__MODULE__} with token=#{state.token} and members=#{inspect state.member_ids}") + state = + state + |> State.add_member(member) + |> send_member(member) + {:noreply, state} + end + + defp send_total_members(%State{socket: s, member_ids: m} = state) do + if s, do: Phoenix.Channel.push(s, "total_members", %{total_members: length(m)}) + state + end + + def send_all_members(%State{socket: s, members: members} = state) do + if s do + for m <- Enum.chunk_every(members, 10) do + Phoenix.Channel.push(s, "members", %{members: m}) + end + end + state + end + + def send_member(%State{socket: s} = state, m) do + if s, do: Phoenix.Channel.push(s, "member", %{member: m}) + state + end + + def handle_info(:after_init, state) do + {:noreply, start_requests(state)} + end + def handle_info(:clean_up, state) do + {:stop, :normal, state} + end + def handle_info(_, state) do + {:noreply, state} + end + + def start_requests(state) do + state + |> all_member_ids() + |> send_total_members() + |> all_members() + end + + def all_member_ids(%State{channel: c, token: t} = state) do + member_ids = Profile.all(c, t) + State.put_member_ids(state, member_ids) + end + + def all_members(%State{member_ids: ids, token: token} = state) do + pid = self() + + Task.start_link(fn -> + Enum.each(ids, fn(%{"id" => id}) -> + {:ok, %{result: member}} = Profile.one(id, token) + member_result(pid, member) + end) + end) + + state + end +end diff --git a/lib/meetup/channel_client/server_supervisor.ex b/lib/meetup/channel_client/server_supervisor.ex new file mode 100644 index 0000000..d72a137 --- /dev/null +++ b/lib/meetup/channel_client/server_supervisor.ex @@ -0,0 +1,18 @@ +defmodule Meetup.ChannelClient.ServerSupervisor do + @moduledoc false + use Supervisor + require Logger + + def start_link do + Logger.info("#{__MODULE__} started") + Supervisor.start_link(__MODULE__, nil, name: __MODULE__) + end + + def start_child(client, channel) do + Supervisor.start_child(__MODULE__, [client, channel]) + end + + def init(_) do + supervise([worker(Meetup.ChannelClient.Server, [])], strategy: :simple_one_for_one) + end +end diff --git a/lib/meetup/channel_client/supervisor.ex b/lib/meetup/channel_client/supervisor.ex new file mode 100644 index 0000000..5eb227c --- /dev/null +++ b/lib/meetup/channel_client/supervisor.ex @@ -0,0 +1,18 @@ +defmodule Meetup.ChannelClient.Supervisor do + @moduledoc false + use Supervisor + require Logger + + def start_link do + Logger.info("#{__MODULE__} started") + Supervisor.start_link(__MODULE__, nil) + end + + def init(_) do + processes = [ + supervisor(Meetup.ChannelClient.ServerSupervisor, []), + worker(Meetup.ChannelClient.Cache, []) + ] + supervise(processes, strategy: :one_for_one) + end +end diff --git a/web/channels/group_channel.ex b/web/channels/group_channel.ex index 5b473be..89e3c64 100644 --- a/web/channels/group_channel.ex +++ b/web/channels/group_channel.ex @@ -2,29 +2,18 @@ defmodule Meetup.GroupChannel do @moduledoc false use Meetup.Web, :channel - alias MeetupApi.V3.Profile - def join("group:" <> group, params, socket) do send(self(), {:after_join, params}) {:ok, assign(socket, :group, group)} end def handle_info({:after_join, _params}, socket) do - token = socket.assigns.access_token.access_token + channel = socket.assigns.group - socket.assigns.group - |> Profile.all(token) - |> send_total_members(socket) - |> Enum.each(fn(%{"id" => id}) -> - {:ok, %{result: member}} = Profile.one(id, token) - push(socket, "member", %{member: member}) - end) + socket.assigns.access_token.access_token + |> Meetup.ChannelClient.Cache.server_process(channel) + |> Meetup.ChannelClient.Server.join(socket) {:noreply, socket} end - - defp send_total_members(members, socket) do - push(socket, "total_members", %{total_members: length(members)}) - members - end end diff --git a/web/static/js/group_channel.js b/web/static/js/group_channel.js index 036ed41..c0c1e66 100644 --- a/web/static/js/group_channel.js +++ b/web/static/js/group_channel.js @@ -104,6 +104,12 @@ function initGroupChannel(socket) { updateProgressBar(members, total_members) updateStatistics(statistic, members, total_members) }) + + channel.on("members", payload => { + members = members.concat(payload.members) + updateProgressBar(members, total_members) + updateStatistics(statistic, members, total_members) + }) } }