Skip to content

Commit

Permalink
Preserve order (#220)
Browse files Browse the repository at this point in the history
* fix: handle read preferences without tags

* Fixed minor bug parsing url connection. Updated dependencies libraries. (#209)

* Fixed minor bug parsing url connection. Updated dependencies libraries.

* Incremented max_nesting option in credo refactor nestring

* Changed function ReadPreference.primary() by ReadPreference.merge_defaults() in topology test file

---------

Co-authored-by: Juan Antonio Jiménez <[email protected]>
Co-authored-by: Michael Maier <[email protected]>

* fix: applies the global timeout value to each query (#215)

* Document BSON.Decoder and BSON.PreserveOrderDecoder

But keep the documentation for those modules private for now.

Expand the documentation on Data Representation and how to deal with
preserving key order when encoding and decoding data.

* Test BSON decoders

* Document how to go from annotated maps with order keys to lists

---------

Co-authored-by: zookzook <[email protected]>
Co-authored-by: Juan Antonio <[email protected]>
Co-authored-by: Juan Antonio Jiménez <[email protected]>
  • Loading branch information
4 people authored Nov 1, 2023
1 parent ecd8da0 commit 7be964c
Show file tree
Hide file tree
Showing 19 changed files with 344 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
{Credo.Check.Refactor.MapJoin, []},
{Credo.Check.Refactor.NegatedConditionsInUnless, []},
{Credo.Check.Refactor.NegatedConditionsWithElse, []},
{Credo.Check.Refactor.Nesting, []},
{Credo.Check.Refactor.Nesting, [max_nesting: 4]},
{Credo.Check.Refactor.UnlessWithElse, []},
{Credo.Check.Refactor.WithClauses, []},
{Credo.Check.Refactor.FilterFilter, []},
Expand Down
132 changes: 85 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,55 +135,18 @@ Mongo.insert_many(top, "users", [

## Data Representation

This driver chooses to accept both maps and lists of key-value tuples when encoding BSON documents (1), but will only
decode documents into maps. This has the side effect that document field order is lost when decoding.
Maps are convenient to work with, but map keys are not ordered, unlike BSON document fields.
This driver chooses to accept both maps and lists of key-value tuples when encoding BSON documents (1), but will only
decode documents into maps. Maps are convenient to work with, but Elixir map keys are not ordered, unlike BSON document
keys.

Driver users should represent documents using a list of tuples when field order matters, for example when sorting by multiple fields:
That design decision means document key order is lost when encoding Elixir maps to BSON and, conversely, when decoding
BSON documents to Elixir maps. However, see [Preserve Document Key Order](#preserve-document-key-order) to learn how to
preserve key order when it matters.

```elixir
Mongo.find(top, "users", %{}, sort: [last_name: 1, first_name: 1, _id: 1])
```

Additionally, the driver accepts both atoms and strings for document keys, but will only decode them into strings.
Creating atoms from arbitrary input (such as database documents) is [discouraged](https://elixir-lang.org/getting-started/mix-otp/genserver.html#:~:text=However%2C%20naming%20dynamic,our%20system%20memory!) because atoms are not garbage collected.

## Preserve Order
If the order of the keys is important, it is possible to use a different decoder module. The decoder module will
preserve the order of the keys by adding an attribute `:order` which contains the list of keys in the original order.
If you want to change the `:order` key then define a new decoder module:

```elixir
defmodule MyPreserverOrderDecoder do
@moduledoc false

use BSON.DecoderGenerator, preserve_order: :the_key_order
end
```

and configure the driver to use this new decoder:
```elixir
config :mongodb_driver,
decoder: MyPreserverOrderDecoder

```
The decode module is defined at compiler time. The driver provides two types of decoder:

```elixir
defmodule BSON.Decoder do
@moduledoc false

use BSON.DecoderGenerator, preserve_order: false
end

defmodule BSON.OrderPreservingDecoder do
@moduledoc false

use BSON.DecoderGenerator, preserve_order: :order
end
```

The `BSON.Decoder` is the default decoder.
Additionally, the driver accepts both atoms and strings for document keys, but will only decode them into strings.
Creating atoms from arbitrary input (such as database documents) is
[discouraged](https://elixir-lang.org/getting-started/mix-otp/genserver.html#:~:text=However%2C%20naming%20dynamic,our%20system%20memory!)
because atoms are not garbage collected.

[BSON symbols (deprecated)](https://bsonspec.org/spec.html#:~:text=Symbol.%20%E2%80%94%20Deprecated) can only be decoded (2).

Expand All @@ -209,6 +172,81 @@ The `BSON.Decoder` is the default decoder.
max key :BSON_max
decimal128 Decimal{}

## Preserve Document Key Order

### Encoding from Elixir to BSON

For some MongoDB operations, the order of the keys in a document affect the result. For example, that is the case when
sorting a query by multiple fields.

In those cases, driver users should represent documents using a list of tuples (or a keyword list) to preserve the
order. Example:

```elixir
Mongo.find(top, "users", %{}, sort: [last_name: 1, first_name: 1, _id: 1])
```

The query above will sort users by last name, then by first name and finally by ID. If an Elixir map had been used to
specify `:sort`, query results would end up sorted unexpectedly wrong.

### Decoding from BSON to Elixir

Decoded BSON documents are always represented by Elixir maps because the driver depends on that to implement its
functionality.

If the order of document keys as stored by MongoDB is needed, the driver can be configured to use a BSON decoder module
that puts a list of keys in the original order under the `:__order__` key (and it works recursively).

```elixir
config :mongodb_driver,
decoder: BSON.PreserveOrderDecoder
```

It is possible to customize the key. For example, to use `:original_order` instead of the default `:__order__`:

```elixir
config :mongodb_driver,
decoder: {BSON.PreserveOrderDecoder, key: :original_order}
```

The resulting maps with annotated key order can be recursively transformed into lists of tuples. That allows for
preserving the order again when encoding. Here is an example of how to achieve that:

```elixir
defmodule MapWithOrder do
def to_list(doc, order_key \\ :__order__) do
do_to_list(doc, order_key)
end

defp do_to_list(%{__struct__: _} = elem, _order_key) do
elem
end

defp do_to_list(doc, order_key) when is_map(doc) do
doc
|> Map.get(order_key, Map.keys(doc))
|> Enum.map(fn key -> {key, do_to_list(Map.get(doc, key), order_key)} end)
end

defp do_to_list(xs, order_key) when is_list(xs) do
Enum.map(xs, fn elem -> do_to_list(elem, order_key) end)
end

defp do_to_list(elem, _order_key) do
elem
end
end

# doc = ...
MapWithOrder.to_list(doc)
```

Note that structs are kept as-is, to handle special values such as `BSON.ObjectId`.

The decoder module is defined at compile time. The default decoder is `BSON.Decoder`, which does not preserve document
key order. As it needs to execute fewer operations when decoding data received from MongoDB, it offers improved
performance. Therefore, the default decoder is recommended for most use cases of this driver.

## Writing your own encoding info

If you want to write a custom struct to your mongo collection - you can do that
Expand Down
8 changes: 7 additions & 1 deletion lib/bson/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,18 @@ defmodule BSON.DecoderGenerator do
end

defmodule BSON.Decoder do
# This module provides functions for decoding BSON data into Elixir values.
# The data type conversions are documented at https://hexdocs.pm/mongodb_driver/readme.html#data-representation.

@moduledoc false

use BSON.DecoderGenerator, preserve_order: false
end

defmodule BSON.PreserverOrderDecoder do
defmodule BSON.PreserveOrderDecoder do
# This module is like `BSON.Decoder`, but it retains the original order of
# document keys in a list.

@moduledoc false

use BSON.DecoderGenerator, preserve_order: :__order__
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ defmodule Mongo do
@spec exec_command_session(GenServer.server(), BSON.document(), Keyword.t()) ::
{:ok, BSON.document() | nil} | {:error, Mongo.Error.t()}
def exec_command_session(session, cmd, opts) do
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
{:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
:ok <- Session.update_session(session, response, opts),
{:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do
Expand Down
14 changes: 2 additions & 12 deletions lib/mongo/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,8 @@ defmodule Mongo.Monitor do
##
# Get a new server description from the server and send it to the Topology process.
##
defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do
case get_server_description(state) do
%{round_trip_time: round_trip_time} ->
## debug info("Updating round_trip_time: #{inspect round_trip_time}")
Topology.update_rrt(topology_pid, address, round_trip_time)

%{state | round_trip_time: round_trip_time}

error ->
warning("Unable to round trip time because of #{inspect(error)}")
state
end
defp update_server_description(%{mode: :streaming_mode} = state) do
state
end

##
Expand Down
47 changes: 32 additions & 15 deletions lib/mongo/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,15 @@ defmodule Mongo.Session do

@doc """
Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically.
The global session timeout is merged to the options as well.
"""
@spec bind_session(Session.t(), BSON.document()) :: {:ok, pid, BSON.document()} | {:error, term()}
def bind_session(nil, _cmd) do
@spec bind_session(Session.t(), BSON.document(), Keyword.t()) :: {:ok, pid, BSON.document(), Keyword.t()} | {:error, term()}
def bind_session(nil, _cmd, _opts) do
{:error, Mongo.Error.exception("No session")}
end

def bind_session(pid, cmd) do
call(pid, {:bind_session, cmd})
def bind_session(pid, cmd, opts) do
call(pid, {:bind_session, cmd, opts})
end

@doc """
Expand Down Expand Up @@ -462,13 +463,16 @@ defmodule Mongo.Session do
##
# bind session: only if wire_version >= 6, MongoDB 3.6.x and no transaction is running: only lsid and the transaction-id is added
#
def handle_call_event({:bind_session, cmd}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data)
def handle_call_event({:bind_session, cmd, client_opts}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data)
when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
## only if retryable_writes are enabled!
options =
case opts[:retryable_writes] do
true -> [lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
_ -> [lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
true ->
[lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]

_ ->
[lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
end

cmd =
Expand All @@ -477,11 +481,12 @@ defmodule Mongo.Session do
|> ReadPreference.add_read_preference(opts)
|> filter_nils()

{:keep_state_and_data, {:ok, conn, cmd}}
client_opts = merge_timeout(client_opts, opts)
{:keep_state_and_data, {:ok, conn, cmd, client_opts}}
end

def handle_call_event({:bind_session, cmd}, :starting_transaction, %Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do
result =
def handle_call_event({:bind_session, cmd, client_opts}, :starting_transaction, %Session{conn: conn, opts: opts, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do
cmd =
Keyword.merge(cmd,
readConcern: read_concern(data, Keyword.get(cmd, :readConcern)),
lsid: %{id: id},
Expand All @@ -492,10 +497,11 @@ defmodule Mongo.Session do
|> filter_nils()
|> Keyword.drop(~w(writeConcern)a)

{:next_state, :transaction_in_progress, {:ok, conn, result}}
client_opts = merge_timeout(client_opts, opts)
{:next_state, :transaction_in_progress, {:ok, conn, cmd, client_opts}}
end

def handle_call_event({:bind_session, cmd}, :transaction_in_progress, %Session{conn: conn, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
def handle_call_event({:bind_session, cmd, client_opts}, :transaction_in_progress, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
result =
Keyword.merge(cmd,
lsid: %{id: id},
Expand All @@ -504,12 +510,13 @@ defmodule Mongo.Session do
)
|> Keyword.drop(~w(writeConcern readConcern)a)

{:keep_state_and_data, {:ok, conn, result}}
client_opts = merge_timeout(client_opts, opts)
{:keep_state_and_data, {:ok, conn, result, client_opts}}
end

# In case of wire_version < 6 we do nothing
def handle_call_event({:bind_session, cmd}, _transaction, %Session{conn: conn}) do
{:keep_state_and_data, {:ok, conn, cmd}}
def handle_call_event({:bind_session, cmd, client_opts}, _transaction, %Session{conn: conn}) do
{:keep_state_and_data, {:ok, conn, cmd, client_opts}}
end

def handle_call_event({:commit_transaction, _start_time}, :starting_transaction, _data) do
Expand Down Expand Up @@ -710,4 +717,14 @@ defmodule Mongo.Session do
def in_session(session, _topology_pid, _read_write_type, fun, opts) do
fun.(session, opts)
end

defp merge_timeout(opts, default_ops) do
case Keyword.get(default_ops, :timeout) do
nil ->
opts

timeout ->
Keyword.put_new(opts, :timeout, timeout)
end
end
end
12 changes: 12 additions & 0 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ defmodule Mongo.Topology do

## found
{:ok, {address, opts}} ->
opts = merge_timeout(opts, state.opts)

with {:ok, connection} <- get_connection(address, state),
wire_version <- wire_version(address, topology),
{server_session, new_state} <- checkout_server_session(state),
Expand Down Expand Up @@ -593,4 +595,14 @@ defmodule Mongo.Topology do
Keyword.put_new(opts, :read_preference, read_preference)
end
end

defp merge_timeout(opts, default_ops) do
case Keyword.get(default_ops, :timeout) do
nil ->
opts

timeout ->
Keyword.put_new(opts, :timeout, timeout)
end
end
end
6 changes: 5 additions & 1 deletion lib/mongo/url_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Mongo.UrlParser do

require Logger

@mongo_url_regex ~r/^mongodb(?<srv>\+srv)?:\/\/((?<username>[^:]+):(?<password>[^@]+)@)?(?<seeds>[^\/]+)(\/(?<database>[^?]+))?(\?(?<options>.*))?$/
@mongo_url_regex ~r/^mongodb(?<srv>\+srv)?:\/\/(?:(?<username>[^:]+):(?<password>[^@]+)@)?(?<seeds>[^\/\?]+)(?:\/(?<database>[^?]*)?(?:\?(?<options>(?:[^\s=]+=[^\s&]*)+))?)?$/

# https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options
@mongo_options %{
Expand Down Expand Up @@ -236,6 +236,10 @@ defmodule Mongo.UrlParser do
end
end

defp parse_tags([]) do
[]
end

defp parse_tags(tags) do
tags
|> String.split(",")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Mongodb.Mixfile do
{:decimal, "~> 2.1.1"},
{:patch, "~> 0.12.0", only: [:dev, :test]},
{:jason, "~> 1.3", only: [:dev, :test]},
{:credo, "~> 1.6.1", only: [:dev, :test], runtime: false},
{:credo, "~> 1.7.0", only: [:dev, :test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
]
end
Expand Down
Loading

0 comments on commit 7be964c

Please sign in to comment.