Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(balance): cache book members balance #356

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions apps/app/lib/app/balance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,56 @@ defmodule App.Balance do

import Ecto.Query

alias App.Repo

alias App.Balance.BalanceError
alias App.Balance.CacheUpdaterWorker
alias App.Books.Book
alias App.Books.BookMember
alias App.Books.Members
alias App.Repo
alias App.Transfers
alias App.Transfers.Peer

@doc """
Schedule a job to immediately update the balance of the members of a book.
"""
@spec schedule_balance_update(Book.id()) :: Ecto.Multi.t()
def schedule_balance_update(book_id) when is_integer(book_id) do
CacheUpdaterWorker.update_book_balance(book_id)
end

@doc """
Compute the balance of book members and update it.
"""
@spec update_book_members_balance(Book.t()) :: :ok
def update_book_members_balance(%Book{} = book) do
members =
book
|> Members.list_members_of_book()
|> fill_members_balance()

{:ok, _} =
Repo.transaction(fn ->
balance_error_fields = BalanceError.__schema__(:fields)

for member <- members do
# TODO once the BookMember `:balance` field is not virtual anymore,
# create and use a change to update the balance.
{:ok, balance} = Money.Ecto.Composite.Type.dump(member.balance)

"book_members"
|> where(id: ^member.id)
|> Repo.update_all(
set: [
balance: balance,
balance_errors: Enum.map(member.balance_errors, &Map.take(&1, balance_error_fields))
]
)
end
end)

:ok
end

@doc """
Compute the `:balance` field of book members.
"""
Expand Down
19 changes: 14 additions & 5 deletions apps/app/lib/app/balance/balance_configs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule App.Balance.BalanceConfigs do
import Ecto.Query

alias App.Accounts.User
alias App.Balance
alias App.Balance.BalanceConfig
alias App.Books.BookMember
alias App.Repo
Expand Down Expand Up @@ -85,13 +86,21 @@ defmodule App.Balance.BalanceConfigs do
@doc """
Link a balance configuration to a list of peers.
"""
@spec link_balance_config_to_peers(BalanceConfig.t(), [Peer.t()]) :: :ok
def link_balance_config_to_peers(%BalanceConfig{} = balance_config, peers) do
@spec link_balance_config_to_peers(BalanceConfig.t(), [Peer.t()], Book.t()) :: :ok
def link_balance_config_to_peers(%BalanceConfig{} = balance_config, peers, book) do
peer_ids = Enum.map(peers, & &1.id)

{_, nil} =
from([peer: peer] in Peer.base_query(), where: peer.id in ^peer_ids)
|> Repo.update_all(set: [balance_config_id: balance_config.id])
{:ok, _changes} =
Ecto.Multi.new()
|> Ecto.Multi.update_all(
:peers,
from([peer: peer] in Peer.base_query(), where: peer.id in ^peer_ids),
set: [balance_config_id: balance_config.id]
)
|> Ecto.Multi.run(:balance_job, fn _repo, _changes ->
Balance.schedule_balance_update(book.id)
end)
|> Repo.transaction()

:ok
end
Expand Down
38 changes: 38 additions & 0 deletions apps/app/lib/app/balance/cache_updater_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule App.Balance.CacheUpdaterWorker do
@moduledoc """
An Oban job that updates the balance of book members within a book.
"""
use Oban.Worker,
queue: :balance,
max_attempts: 1,
unique: [
fields: [:args],
keys: [:book_id],
states: [:available, :scheduled, :executing, :retryable]
]

alias App.Balance
alias App.Books
alias App.Books.Book

@doc """
Create a new job to update the balance of book members.

If a job is already scheduled for the book, it will be replaced.
"""
@spec update_book_balance(Book.id()) :: {:ok, Oban.Job.t()} | {:error, term()}
def update_book_balance(book_id) when is_integer(book_id) do
%{book_id: book_id}
|> new()
|> Oban.insert()
end

@impl Oban.Worker
def perform(job) do
%{"book_id" => book_id} = job.args

book_id
|> Books.get_book!()
|> Balance.update_book_members_balance()
end
end
59 changes: 48 additions & 11 deletions apps/app/lib/app/transfers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule App.Transfers do

import Ecto.Query

alias App.Balance
alias App.Books.Book
alias App.Books.BookMember
alias App.Repo
Expand Down Expand Up @@ -173,6 +174,9 @@ defmodule App.Transfers do
end,
[]
)
|> Ecto.Multi.run(:balance_job, fn _repo, _changes ->
Balance.schedule_balance_update(book.id)
end)
|> Repo.transaction()

case result do
Expand All @@ -187,9 +191,18 @@ defmodule App.Transfers do
@spec update_money_transfer(MoneyTransfer.t(), map()) ::
{:ok, MoneyTransfer.t()} | {:error, Ecto.Changeset.t()}
def update_money_transfer(%MoneyTransfer{} = money_transfer, attrs) do
money_transfer
|> MoneyTransfer.changeset(attrs)
|> Repo.update()
result =
Ecto.Multi.new()
|> Ecto.Multi.update(:money_transfer, MoneyTransfer.changeset(money_transfer, attrs))
|> Ecto.Multi.run(:balance_job, fn _repo, _changes ->
Balance.schedule_balance_update(money_transfer.book_id)
end)
|> Repo.transaction()

case result do
{:ok, %{money_transfer: money_transfer}} -> {:ok, money_transfer}
{:error, :money_transfer, changeset, _changes} -> {:error, changeset}
end
end

@doc """
Expand All @@ -198,7 +211,18 @@ defmodule App.Transfers do
@spec delete_money_transfer(MoneyTransfer.t()) ::
{:ok, MoneyTransfer.t()} | {:error, Ecto.Changeset.t()}
def delete_money_transfer(%MoneyTransfer{} = money_transfer) do
Repo.delete(money_transfer)
result =
Ecto.Multi.new()
|> Ecto.Multi.delete(:money_transfer, money_transfer)
|> Ecto.Multi.run(:balance_job, fn _repo, _changes ->
Balance.schedule_balance_update(money_transfer.book_id)
end)
|> Repo.transaction()

case result do
{:ok, %{money_transfer: money_transfer}} -> {:ok, money_transfer}
{:error, :money_transfer, changeset, _changes} -> {:error, changeset}
end
end

@doc """
Expand All @@ -216,13 +240,26 @@ defmodule App.Transfers do
@spec create_reimbursement(Book.t(), map()) ::
{:ok, MoneyTransfer.t()} | {:error, Ecto.Changeset.t()}
def create_reimbursement(%Book{} = book, attrs) do
%MoneyTransfer{
book_id: book.id,
type: :reimbursement,
balance_means: :divide_equally
}
|> MoneyTransfer.reimbursement_changeset(attrs)
|> Repo.insert()
changeset =
%MoneyTransfer{
book_id: book.id,
type: :reimbursement,
balance_means: :divide_equally
}
|> MoneyTransfer.reimbursement_changeset(attrs)

result =
Ecto.Multi.new()
|> Ecto.Multi.insert(:reimbursement, changeset)
|> Ecto.Multi.run(:abalnce_job, fn _repo, _changes ->
Balance.schedule_balance_update(book.id)
end)
|> Repo.transaction()

case result do
{:ok, %{reimbursement: reimbursement}} -> {:ok, reimbursement}
{:error, :reimbursement, changeset, _changes} -> {:error, changeset}
end
end

@doc """
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule App.Repo.Migrations.AddBookMembersBalance do
use Ecto.Migration

def change do
alter table("book_members") do
add :balance, :money_with_currency
add :balance_errors, :map, null: false, default: []
end
end
end
27 changes: 27 additions & 0 deletions apps/app/test/app/balance/balance_configs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule App.Balance.BalanceConfigsTest do
import App.TransfersFixtures

alias App.Balance.BalanceConfigs
alias App.Balance.CacheUpdaterWorker

describe "get_balance_config_of_member/1" do
test "returns the balance config of the member" do
Expand Down Expand Up @@ -112,6 +113,32 @@ defmodule App.Balance.BalanceConfigsTest do
end
end

describe "link_balance_config_to_peers/3" do
test "links a balance config to the given peers" do
balance_config = balance_config_fixture()
book = book_fixture()
member = book_member_fixture(book)
transfer = money_transfer_fixture(book, tenant_id: member.id)
peer = peer_fixture(transfer, member_id: member.id)

:ok = BalanceConfigs.link_balance_config_to_peers(balance_config, [peer], book)

assert Repo.reload!(peer).balance_config_id == balance_config.id
end

test "schedules a job to update the book members balance" do
balance_config = balance_config_fixture()
book = book_fixture()
member = book_member_fixture(book)
transfer = money_transfer_fixture(book, tenant_id: member.id)
peer = peer_fixture(transfer, member_id: member.id)

:ok = BalanceConfigs.link_balance_config_to_peers(balance_config, [peer], book)

assert_enqueued(worker: CacheUpdaterWorker, args: %{book_id: book.id})
end
end

describe "change_balance_config_revenues/2" do
setup do
%{balance_config: balance_config_fixture()}
Expand Down
85 changes: 85 additions & 0 deletions apps/app/test/app/balance/cache_updater_worker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule App.Balance.CacheUpdaterWorkerTest do
use App.DataCase, async: true

import App.Books.MembersFixtures
import App.BooksFixtures
import App.TransfersFixtures

alias App.Balance.CacheUpdaterWorker

describe "update_book_balance/1" do
test "creates an Oban job when the book_id" do
book_id = 123

{:ok, job} = CacheUpdaterWorker.update_book_balance(book_id)
assert job.queue == "balance"
assert job.args == %{book_id: book_id}
end

test "overrides previous existing job" do
book_id = 123

{:ok, %{id: id}} = CacheUpdaterWorker.update_book_balance(book_id)
{:ok, %{id: ^id}} = CacheUpdaterWorker.update_book_balance(book_id)

jobs = all_enqueued(worker: CacheUpdaterWorker)
assert length(jobs) == 1
end

test "does not override jobs with different book_id" do
book_id1 = 123
book_id2 = 456

{:ok, %{id: id1}} = CacheUpdaterWorker.update_book_balance(book_id1)
{:ok, %{id: id2}} = CacheUpdaterWorker.update_book_balance(book_id2)

assert id1 != id2

jobs = all_enqueued(worker: CacheUpdaterWorker)
assert length(jobs) == 2
end
end

describe "perform/1" do
test "updates the book members balance" do
book = book_fixture()
member1 = book_member_fixture(book)
member2 = book_member_fixture(book)
member3 = book_member_fixture(book)

transfer = money_transfer_fixture(book, tenant_id: member1.id, amount: Money.new(:EUR, 333))
_peer1 = peer_fixture(transfer, member_id: member1.id)
_peer2 = peer_fixture(transfer, member_id: member2.id)
_peer3 = peer_fixture(transfer, member_id: member3.id)

perform_job(CacheUpdaterWorker, %{book_id: book.id})

member1 = get_member_balance(member1.id)
assert Money.equal?(member1.balance, Money.new(:EUR, 222))
assert member1.balance_errors == []
member2 = get_member_balance(member2.id)
assert Money.equal?(member2.balance, Money.new(:EUR, "-111.00"))
assert member2.balance_errors == []
member3 = get_member_balance(member3.id)
assert Money.equal?(member3.balance, Money.new(:EUR, "-111.00"))
assert member3.balance_errors == []
end

# TODO once the BookMember `:balance` field is not virtual anymore,
# remove this request and use the BookMember fields directly
defp get_member_balance(member_id) do
from(book_member in "book_members",
where: book_member.id == ^member_id,
select: %{
balance: book_member.balance,
balance_errors: book_member.balance_errors
}
)
|> Repo.one!()
|> Map.update!(:balance, fn raw ->
{:ok, balance} = Money.Ecto.Composite.Type.load(raw)
balance
end)
end
end
end
Loading
Loading