Skip to content

Commit

Permalink
Create gen server to intermediate channel and api
Browse files Browse the repository at this point in the history
Create a GenServer to communicate with the client socket and the meetup
api.
With this solution we can handle socket reconnection and page
refreshes.
The server will start the data collection from the api when it is
created, and the client can join any time and receive the previous
collected members, and then it will receive the new members when they
arrive to server. This will also make sure we start only one data
collection by user access token and group slug.
  • Loading branch information
ggpasqualino committed Aug 9, 2017
1 parent 2328e92 commit 52f2ea9
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 15 deletions.
1 change: 1 addition & 0 deletions lib/meetup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Meetup do
# Start the endpoint when the application starts
supervisor(Meetup.Endpoint, []),
supervisor(MeetupApi.Supervisor, []),
supervisor(Meetup.ChannelClient.Supervisor, []),
worker(MeetupApi.V3.RequestCache, [])
]

Expand Down
36 changes: 36 additions & 0 deletions lib/meetup/channel_client/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Meetup.ChannelClient.Cache do
@moduledoc """
This module is used to lookup for running Meetup.ChannelClient.Server for a given client
and group channel or start one
"""
use GenServer
require Logger

def start_link do
Logger.info("#{__MODULE__} started")
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

def server_process(client, channel) do
case Meetup.ChannelClient.Server.whereis(client, channel) do
:undefined ->
GenServer.call(__MODULE__, {:server_process, client, channel})
pid -> pid
end
end

def init(_) do
{:ok, nil}
end

def handle_call({:server_process, client, channel}, _, state) do
server_pid =
case Meetup.ChannelClient.Server.whereis(client, channel) do
:undefined ->
{:ok, pid} = Meetup.ChannelClient.ServerSupervisor.start_child(client, channel)
pid
pid -> pid
end
{:reply, server_pid, state}
end
end
122 changes: 122 additions & 0 deletions lib/meetup/channel_client/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule Meetup.ChannelClient.Server do
@moduledoc false
use GenServer
require Logger

alias MeetupApi.V3.Profile

defmodule State do
@moduledoc false
defstruct [:socket, :member_ids, :members, :token, :channel]

def put_socket(state, socket), do: %State{state | socket: socket}
def put_member_ids(state, ids), do: %State{state | member_ids: ids}
def add_member(state, m), do: %State{state | members: [m | state.members]}
end

@user_expiration_time Application.get_env(:meetup, :user_expiration_time) * 1000

# Client API

def start_link(client, channel) do
Logger.info("#{__MODULE__} for client='#{client}' and channel='#{channel}' started")
initial_state = %State{socket: nil, member_ids: [], members: [], token: client, channel: channel}
GenServer.start_link(__MODULE__, initial_state, name: via_tuple(client, channel))
end

defp via_tuple(client, channel) do
{:via, :gproc, {:n, :l, {:meetup_channel_client_server, client, channel}}}
end

def whereis(client, channel) do
:gproc.whereis_name({:n, :l, {:meetup_channel_client_server, client, channel}})
end

def join(pid, socket) do
GenServer.cast(pid, {:join, socket})
end

def member_result(pid, result) do
GenServer.cast(pid, {:member_result, result})
end

# Genserver callbacks

def init(state) do
send(self(), :after_init)
Process.send_after(self(), :clean_up, @user_expiration_time)
{:ok, state}
end

def handle_cast({:join, socket}, state) do
Logger.info(":join received in #{__MODULE__} with token=#{state.token} and members=#{inspect state.member_ids}")
state =
state
|> State.put_socket(socket)
|> send_total_members()
|> send_all_members()
{:noreply, state}
end
def handle_cast({:member_result, member}, state) do
Logger.info(":member_result received in #{__MODULE__} with token=#{state.token} and members=#{inspect state.member_ids}")
state =
state
|> State.add_member(member)
|> send_member(member)
{:noreply, state}
end

defp send_total_members(%State{socket: s, member_ids: m} = state) do
if s, do: Phoenix.Channel.push(s, "total_members", %{total_members: length(m)})
state
end

def send_all_members(%State{socket: s, members: members} = state) do
if s do
for m <- Enum.chunk_every(members, 10) do
Phoenix.Channel.push(s, "members", %{members: m})
end
end
state
end

def send_member(%State{socket: s} = state, m) do
if s, do: Phoenix.Channel.push(s, "member", %{member: m})
state
end

def handle_info(:after_init, state) do
{:noreply, start_requests(state)}
end
def handle_info(:clean_up, state) do
{:stop, :normal, state}
end
def handle_info(_, state) do
{:noreply, state}
end

def start_requests(state) do
state
|> all_member_ids()
|> send_total_members()
|> all_members()
end

def all_member_ids(%State{channel: c, token: t} = state) do
member_ids = Profile.all(c, t)
State.put_member_ids(state, member_ids)
end

def all_members(%State{member_ids: ids, token: token} = state) do
pid = self()

Task.start_link(fn ->
Enum.each(ids, fn(%{"id" => id}) ->
{:ok, %{result: member}} = Profile.one(id, token)
member_result(pid, member)
end)
end)

state
end
end
18 changes: 18 additions & 0 deletions lib/meetup/channel_client/server_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Meetup.ChannelClient.ServerSupervisor do
@moduledoc false
use Supervisor
require Logger

def start_link do
Logger.info("#{__MODULE__} started")
Supervisor.start_link(__MODULE__, nil, name: __MODULE__)
end

def start_child(client, channel) do
Supervisor.start_child(__MODULE__, [client, channel])
end

def init(_) do
supervise([worker(Meetup.ChannelClient.Server, [])], strategy: :simple_one_for_one)
end
end
18 changes: 18 additions & 0 deletions lib/meetup/channel_client/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Meetup.ChannelClient.Supervisor do
@moduledoc false
use Supervisor
require Logger

def start_link do
Logger.info("#{__MODULE__} started")
Supervisor.start_link(__MODULE__, nil)
end

def init(_) do
processes = [
supervisor(Meetup.ChannelClient.ServerSupervisor, []),
worker(Meetup.ChannelClient.Cache, [])
]
supervise(processes, strategy: :one_for_one)
end
end
19 changes: 4 additions & 15 deletions web/channels/group_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,18 @@ defmodule Meetup.GroupChannel do
@moduledoc false
use Meetup.Web, :channel

alias MeetupApi.V3.Profile

def join("group:" <> group, params, socket) do
send(self(), {:after_join, params})
{:ok, assign(socket, :group, group)}
end

def handle_info({:after_join, _params}, socket) do
token = socket.assigns.access_token.access_token
channel = socket.assigns.group

socket.assigns.group
|> Profile.all(token)
|> send_total_members(socket)
|> Enum.each(fn(%{"id" => id}) ->
{:ok, %{result: member}} = Profile.one(id, token)
push(socket, "member", %{member: member})
end)
socket.assigns.access_token.access_token
|> Meetup.ChannelClient.Cache.server_process(channel)
|> Meetup.ChannelClient.Server.join(socket)

{:noreply, socket}
end

defp send_total_members(members, socket) do
push(socket, "total_members", %{total_members: length(members)})
members
end
end
6 changes: 6 additions & 0 deletions web/static/js/group_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ function initGroupChannel(socket) {
updateProgressBar(members, total_members)
updateStatistics(statistic, members, total_members)
})

channel.on("members", payload => {
members = members.concat(payload.members)
updateProgressBar(members, total_members)
updateStatistics(statistic, members, total_members)
})
}
}

Expand Down

0 comments on commit 52f2ea9

Please sign in to comment.