Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dialyzer-ignore.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
{":0:unknown_type Unknown type: Mongo.WriteError.t/0." }
]
44 changes: 27 additions & 17 deletions lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ defmodule Mongo do
* `:max_time` - Specifies a time limit in milliseconds
* `:use_cursor` - Use a cursor for a batched response (Default: true)
"""
@spec aggregate(GenServer.server(), collection, [BSON.document()], Keyword.t()) :: cursor
@spec aggregate(GenServer.server(), collection, [BSON.document()], Keyword.t()) ::
{:ok, cursor} | {:error, Mongo.Error.t()}
def aggregate(topology_pid, coll, pipeline, opts \\ []) do
query =
[
Expand All @@ -190,10 +191,10 @@ defmodule Mongo do

if cursor? do
query = query ++ [cursor: filter_nils(%{batchSize: opts[:batch_size]})]
aggregation_cursor(conn, "$cmd", query, nil, opts)
{:ok, aggregation_cursor(conn, "$cmd", query, nil, opts)}
else
query = query ++ [cursor: %{}]
aggregation_cursor(conn, "$cmd", query, nil, opts)
{:ok, aggregation_cursor(conn, "$cmd", query, nil, opts)}
end
end
end
Expand Down Expand Up @@ -482,7 +483,8 @@ defmodule Mongo do
* `:projection` - Limits the fields to return for all matching document
* `:skip` - The number of documents to skip before returning (Default: 0)
"""
@spec find(GenServer.server(), collection, BSON.document(), Keyword.t()) :: cursor
@spec find(GenServer.server(), collection, BSON.document(), Keyword.t()) ::
cursor | {:error, Mongo.Error.t()}
def find(topology_pid, coll, filter, opts \\ []) do
query =
[
Expand Down Expand Up @@ -538,7 +540,7 @@ defmodule Mongo do
* `:skip` - The number of documents to skip before returning (Default: 0)
"""
@spec find_one(GenServer.server(), collection, BSON.document(), Keyword.t()) ::
BSON.document() | nil
BSON.document() | nil | {:error, Mongo.Error.t()}
def find_one(conn, coll, filter, opts \\ []) do
opts =
opts
Expand Down Expand Up @@ -597,10 +599,10 @@ defmodule Mongo do
rp = ReadPreference.defaults(%{mode: :primary})
rp_opts = [read_preference: Keyword.get(opts, :read_preference, rp)]

with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, rp_opts) do
opts = Keyword.put(opts, :slave_ok, slave_ok)
direct_command(conn, query, opts)
end
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, rp_opts) do
opts = Keyword.put(opts, :slave_ok, slave_ok)
direct_command(conn, query, opts)
end
end

@doc false
Expand Down Expand Up @@ -1001,17 +1003,19 @@ defmodule Mongo do
@doc """
Returns a cursor to enumerate all indexes
"""
@spec list_indexes(GenServer.server(), String.t(), Keyword.t()) :: cursor
@spec list_indexes(GenServer.server(), String.t(), Keyword.t()) ::
{:ok, cursor} | {:error, Mongo.Error.t()}
def list_indexes(topology_pid, coll, opts \\ []) do
with {:ok, conn, _, _} <- select_server(topology_pid, :read, opts) do
aggregation_cursor(conn, "$cmd", [listIndexes: coll], nil, opts)
{:ok, aggregation_cursor(conn, "$cmd", [listIndexes: coll], nil, opts)}
end
end

@doc """
Convenient function that returns a cursor with the names of the indexes.
"""
@spec list_index_names(GenServer.server(), String.t(), Keyword.t()) :: %Stream{}
@spec list_index_names(GenServer.server(), String.t(), Keyword.t()) ::
{:ok, %Stream{}} | {:error, Mongo.Error.t()}
def list_index_names(topology_pid, coll, opts \\ []) do
list_indexes(topology_pid, coll, opts)
|> Stream.map(fn %{"name" => name} -> name end)
Expand All @@ -1020,18 +1024,23 @@ defmodule Mongo do
@doc """
Getting Collection Names
"""
@spec show_collections(GenServer.server(), Keyword.t()) :: cursor
@spec show_collections(GenServer.server(), Keyword.t()) ::
{:ok, cursor} | {:error, Mongo.Error.t()}
def show_collections(topology_pid, opts \\ []) do
##
# from the specs
# https://github.com/mongodb/specifications/blob/f4bb783627e7ed5c4095c5554d35287956ef8970/source/enumerate-collections.rst#post-mongodb-280-rc3-versions
#
# In versions 2.8.0-rc3 and later, the listCollections command returns a cursor!
#

with {:ok, conn, _, _} <- select_server(topology_pid, :read, opts) do
aggregation_cursor(conn, "$cmd", [listCollections: 1], nil, opts)
|> Stream.filter(fn coll -> coll["type"] == "collection" end)
|> Stream.map(fn coll -> coll["name"] end)
cursor =
aggregation_cursor(conn, "$cmd", [listCollections: 1], nil, opts)
|> Stream.filter(fn coll -> coll["type"] == "collection" end)
|> Stream.map(fn coll -> coll["name"] end)

{:ok, cursor}
end
end

Expand Down Expand Up @@ -1095,7 +1104,8 @@ defmodule Mongo do
select_servers(topology_pid, type, opts, start_time)

{:error, :selection_timeout} ->
{:error, %Mongo.Error{type: :network, message: "Topology selection timeout", code: 89}}
{:error,
%Mongo.Error{type: :network, message: "Topology selection timeout", code: 89}}
end
else
{:ok, servers, slave_ok, mongos?}
Expand Down
7 changes: 4 additions & 3 deletions lib/mongo/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,10 @@ defmodule Mongo.Session do
@impl :gen_statem
# Abort all pending transactions if there any and end session itself.
def terminate(_reason, state, %{pid: pid} = data) do
if state == :in_transaction do
_ = try_run_txn_command(data, :abortTransaction)
end
_ =
if state == :in_transaction do
try_run_txn_command(data, :abortTransaction)
end

query = %{
endSessions: [data.id]
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Mongodb.Mixfile do
description: description(),
package: package(),
dialyzer: [
ignore_warnings: ".dialyzer-ignore.exs",
flags: [:underspecs, :unknown, :unmatched_returns],
plt_add_apps: [:logger, :connection, :db_connection, :mix, :elixir, :ssl, :public_key],
plt_add_deps: :transitive
Expand Down
65 changes: 43 additions & 22 deletions test/mongo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ defmodule Mongo.Test do
cmd = [createIndexes: coll_2, indexes: [[key: [foo: 1, bar: 1], name: "not-a-collection"]]]
assert {:ok, _} = Mongo.command(c.pid, cmd)

assert {:ok, colls0} = c.pid |> Mongo.show_collections()

colls =
c.pid
|> Mongo.show_collections()
colls0
|> Enum.to_list()

assert Enum.member?(colls, coll_1)
Expand All @@ -68,9 +69,12 @@ defmodule Mongo.Test do
cmd = [createIndexes: coll_1, indexes: [[key: [foo: 1, bar: 1], name: "foo-bar"]]]
assert {:ok, _} = Mongo.command(c.pid, cmd)

{:ok, colls00}=
c.pid
Mongo.list_index_names(coll_1)

indexes =
c.pid
|> Mongo.list_index_names(coll_1)
colls00
|> Enum.to_list()

assert Enum.count(indexes) == 3
Expand All @@ -87,8 +91,8 @@ defmodule Mongo.Test do
assert {:ok, _} = Mongo.insert_one(c.pid, coll, %{foo: 44})
assert {:ok, _} = Mongo.insert_one(c.pid, coll, %{foo: 45})

assert [%{"foo" => 42}, %{"foo" => 43}, %{"foo" => 44}, %{"foo" => 45}] =
c.pid |> Mongo.aggregate(coll, []) |> Enum.to_list()
assert {:ok, colls0} =c.pid |> Mongo.aggregate(coll, [])
assert [%{"foo" => 42}, %{"foo" => 43}, %{"foo" => 44}, %{"foo" => 45}] = colls0 |> Enum.to_list()

query = [
%{
Expand All @@ -104,27 +108,44 @@ defmodule Mongo.Test do
}
]

assert [%{"_id" => "foo", "total" => 89}] =
c.pid |> Mongo.aggregate(coll, query) |> Enum.to_list()
assert {:ok, colls1} = c.pid |> Mongo.aggregate(coll, query)
assert [%{"_id" => "foo", "total" => 89}] = colls1 |> Enum.to_list()

assert {:ok, colls2} = c.pid |> Mongo.aggregate(coll, [])
assert [] = colls2 |> Enum.take(0)

assert {:ok, colls3} = c.pid |> Mongo.aggregate(coll, [])
assert [] = colls3 |> Enum.drop(4)

assert {:ok, colls4} = c.pid |> Mongo.aggregate(coll, [])
assert [%{"foo" => 42}] = colls4 |> Enum.take(1)

assert {:ok, colls5} = c.pid |> Mongo.aggregate(coll, [])
assert [%{"foo" => 45}] = colls5 |> Enum.drop(3)

assert {:ok, colls6} = Mongo.aggregate(c.pid, coll, [], use_cursor: false)
assert [] = colls6 |> Enum.take(0)

assert {:ok, colls7} = Mongo.aggregate(c.pid, coll, [], use_cursor: false)
assert [] = colls7 |> Enum.drop(4)

assert {:ok, colls8} = c.pid |> Mongo.aggregate(coll, [], use_cursor: false)
assert [%{"foo" => 42}] = colls8 |> Enum.take(1)

assert [] = c.pid |> Mongo.aggregate(coll, []) |> Enum.take(0)
assert [] = c.pid |> Mongo.aggregate(coll, []) |> Enum.drop(4)
assert [%{"foo" => 42}] = c.pid |> Mongo.aggregate(coll, []) |> Enum.take(1)
assert [%{"foo" => 45}] = c.pid |> Mongo.aggregate(coll, []) |> Enum.drop(3)
assert {:ok, colls9} = c.pid |> Mongo.aggregate(coll, [], use_cursor: false)
assert [%{"foo" => 45}] = colls9 |> Enum.drop(3)

assert [] = Mongo.aggregate(c.pid, coll, [], use_cursor: false) |> Enum.take(0)
assert [] = Mongo.aggregate(c.pid, coll, [], use_cursor: false) |> Enum.drop(4)
assert {:ok, colls10} = Mongo.aggregate(c.pid, coll, [], batch_size: 1)
assert [] = colls10 |> Enum.take(0)

assert [%{"foo" => 42}] =
c.pid |> Mongo.aggregate(coll, [], use_cursor: false) |> Enum.take(1)
assert {:ok, colls11} = Mongo.aggregate(c.pid, coll, [], batch_size: 1)
assert [] = colls11 |> Enum.drop(4)

assert [%{"foo" => 45}] =
c.pid |> Mongo.aggregate(coll, [], use_cursor: false) |> Enum.drop(3)
assert {:ok, colls12} = c.pid |> Mongo.aggregate(coll, [], batch_size: 1)
assert [%{"foo" => 42}] = colls12 |> Enum.take(1)

assert [] = Mongo.aggregate(c.pid, coll, [], batch_size: 1) |> Enum.take(0)
assert [] = Mongo.aggregate(c.pid, coll, [], batch_size: 1) |> Enum.drop(4)
assert [%{"foo" => 42}] = c.pid |> Mongo.aggregate(coll, [], batch_size: 1) |> Enum.take(1)
assert [%{"foo" => 45}] = c.pid |> Mongo.aggregate(coll, [], batch_size: 1) |> Enum.drop(3)
assert {:ok, colls13} = c.pid |> Mongo.aggregate(coll, [], batch_size: 1)
assert [%{"foo" => 45}] = colls13 |> Enum.drop(3)
end

test "count", c do
Expand Down
3 changes: 2 additions & 1 deletion test/support/specifications/crud/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ defmodule Mongo.Specification.CRUD.Helpers do
|> Map.drop(["pipeline"])
|> atomize_keys()

pid |> Mongo.aggregate(collection, pipeline, opts) |> Enum.to_list()
{:ok, cursor} = pid |> Mongo.aggregate(collection, pipeline, opts) |> Enum.to_list()
cursor
end

def match_operation_result?(expected, actual) do
Expand Down