Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: added support for read preference specified by the URL #207

Merged
merged 1 commit into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,15 @@ a simple map, supporting the following keys:

* `:mode`, possible values: `:primary`, `:primary_preferred`, `:secondary`, `:secondary_preferred` and `:nearest`
* `:max_staleness_ms`, the maxStaleness value in milliseconds
* `:tag_sets`, the set of tags, for example: `[dc: "west", usage: "production"]`
* `:tags`, the set of tags, for example: `[dc: "west", usage: "production"]`

The driver selects the server using the read preference.

```elixr
prefs = %{
mode: :secondary,
max_staleness_ms: 120_000,
tag_sets: [dc: "west", usage: "production"]
tags: [dc: "west", usage: "production"]
}

Mongo.find_one(top, "dogs", %{name: "Oskar"}, read_preference: prefs)
Expand Down
147 changes: 77 additions & 70 deletions lib/mongo/read_preference.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,134 +4,141 @@ defmodule Mongo.ReadPreference do
@moduledoc ~S"""
Determines which servers are considered suitable for read operations

A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`.
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If hedge is set, it configures how server hedged reads are used.

The default mode is `:primary`.
The default tag_sets is a list with an empty tag set: [{}].
The default tags is a list with an empty tag set: [{}].
The default max_staleness_ms is unset.
The default hedge is unset.

## mode

* `:primary` Only an available primary is suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable.
* `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
but only eligible secondaries are suitable.
* `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
Otherwise, when there are no eligible secondaries, the primary is suitable.
* `:nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable.

"""
@type t :: %{
mode:
:primary
| :secondary
| :primary_preferred
| :secondary_preferred
| :nearest,
tag_sets: [%{String.t() => String.t()}],
max_staleness_ms: non_neg_integer,
hedge: BSON.document()
}

@primary %{
mode: :primary,
tag_sets: [],
tags: [],
max_staleness_ms: 0
}

def primary(map \\ nil)
@doc """
Merge default values to the read preferences and converts deprecated tag_sets to tags
"""
def merge_defaults(%{tag_sets: tags} = map) do
map =
map
|> Map.delete(:tag_sets)
|> Map.put(:tags, tags)

Map.merge(@primary, map)
end

def primary(map) when is_map(map) do
def merge_defaults(map) when is_map(map) do
Map.merge(@primary, map)
end

def primary(_), do: @primary
def merge_defaults(_other) do
@primary
end

@doc """
Add read preference to the cmd
"""
def add_read_preference(cmd, opts) do
case Keyword.get(opts, :read_preference) do
nil -> cmd
pref -> cmd ++ ["$readPreference": pref]
nil ->
cmd

pref ->
cmd ++ ["$readPreference": pref]
end
end

@doc """
From the specs:

Use of slaveOk

There are two usages of slaveOK:

* A driver query parameter that predated read preference modes and tag set lists.
* A wire protocol flag on OP_QUERY operations

Converts the preference to the mongodb format for replica sets
"""
def slave_ok(%{:mode => :primary}) do
%{:mode => :primary}
def to_replica_set(%{:mode => :primary}) do
%{mode: :primary}
end

def slave_ok(config) do
def to_replica_set(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
end
:primary_preferred ->
:primaryPreferred

filter_nils(mode: mode, tag_sets: config[:tag_sets])
end
:secondary_preferred ->
:secondaryPreferred

##
# Therefore, when sending queries to a mongos, the following rules apply:
#
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
def mongos(%{mode: :primary}) do
nil
end
other ->
other
end

# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :secondary} = config) do
transform(config)
end
case config[:tags] do
[] ->
%{mode: mode}

# For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :primary_preferred} = config) do
transform(config)
end
nil ->
%{mode: mode}

# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
def mongos(%{mode: :secondary_preferred} = config) do
transform(config)
tags ->
%{mode: mode, tags: [tags]}
end
end

# For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :nearest} = config) do
transform(config)
@doc """
Converts the preference to the mongodb format for mongos
"""
def to_mongos(%{mode: :primary}) do
nil
end

defp transform(config) do
# for the others we should use the read preferences
def to_mongos(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
:primary_preferred ->
:primaryPreferred

:secondary_preferred ->
:secondaryPreferred

other ->
other
end

max_staleness_seconds =
case config[:max_staleness_ms] do
i when is_integer(i) -> div(i, 1000)
nil -> nil
i when is_integer(i) ->
div(i, 1000)

nil ->
nil
end

read_preference =
case config[:tags] do
[] ->
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}

nil ->
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}

tags ->
%{mode: mode, tags: [tags], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
end

[mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]]
|> filter_nils()
filter_nils(read_preference)
end
end
14 changes: 14 additions & 0 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ defmodule Mongo.Topology do
# checkout a new session
#
def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts})
Expand All @@ -398,6 +400,8 @@ defmodule Mongo.Topology do
end

def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts})
Expand Down Expand Up @@ -579,4 +583,14 @@ defmodule Mongo.Topology do
defp fetch_arbiters(state) do
Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end)
end

defp merge_read_preferences(opts, url_opts) do
case Keyword.get(url_opts, :read_preference) do
nil ->
opts

read_preference ->
Keyword.put_new(opts, :read_preference, read_preference)
end
end
end
Loading
Loading