diff --git a/lib/grpc/client/adapters/mint.ex b/lib/grpc/client/adapters/mint.ex index eea1c8a2..38ea074e 100644 --- a/lib/grpc/client/adapters/mint.ex +++ b/lib/grpc/client/adapters/mint.ex @@ -114,12 +114,16 @@ defmodule GRPC.Client.Adapters.Mint do |> Keyword.get(:transport_opts, []) |> Keyword.merge(ssl) - [transport_opts: Keyword.merge(@default_transport_opts, transport_opts)] + keep_alive = Keyword.get(opts, :keep_alive, false) + + [transport_opts: Keyword.merge(@default_transport_opts, transport_opts), keep_alive: keep_alive] end defp connect_opts(_channel, opts) do transport_opts = Keyword.get(opts, :transport_opts, []) - [transport_opts: Keyword.merge(@default_transport_opts, transport_opts)] + keep_alive = Keyword.get(opts, :keep_alive, false) + + [transport_opts: Keyword.merge(@default_transport_opts, transport_opts), keep_alive: keep_alive] end defp mint_scheme(%Channel{scheme: "https"} = _channel), do: :https diff --git a/lib/grpc/client/adapters/mint/connection_process/connection_process.ex b/lib/grpc/client/adapters/mint/connection_process/connection_process.ex index 0fab29d9..2c98c090 100644 --- a/lib/grpc/client/adapters/mint/connection_process/connection_process.ex +++ b/lib/grpc/client/adapters/mint/connection_process/connection_process.ex @@ -11,6 +11,8 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do alias GRPC.Client.Adapters.Mint.ConnectionProcess.State alias GRPC.Client.Adapters.Mint.StreamResponseProcess + @keep_alive_timeout 60_000 + require Logger @connection_closed_error "the connection is closed" @@ -49,7 +51,11 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do """ @spec disconnect(pid :: pid()) :: :ok def disconnect(pid) do - GenServer.call(pid, :disconnect) + if Process.alive?(pid) do + GenServer.call(pid, :disconnect) + else + :ok + end end @doc """ @@ -76,8 +82,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do @impl true def init({scheme, host, port, opts}) do + keep_alive = Keyword.get(opts, :keep_alive, false) + case Mint.HTTP.connect(scheme, host, port, opts) do {:ok, conn} -> + if keep_alive do + Process.send_after(self(), :ping, @keep_alive_timeout) + end + {:ok, State.new(conn, opts[:parent])} {:error, reason} -> @@ -94,6 +106,9 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do def handle_call(:disconnect, _from, state) do # TODO add a code to if disconnect is brutal we just stop if is friendly we wait for pending requests {:ok, conn} = Mint.HTTP.close(state.conn) + + finish_all_pending_requests(state) + {:stop, :normal, :ok, State.update_conn(state, conn)} end @@ -172,6 +187,34 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do end @impl true + def handle_info(:ping, %{ping_ref: nil} = state) do + case Mint.HTTP2.ping(state.conn) do + {:ok, conn, ref} -> + Process.send_after(self(), :ping, @keep_alive_timeout) + + state = + state + |> State.update_conn(conn) + |> State.set_ping_ref(ref) + + {:noreply, state} + + {:error, conn, _error} -> + state = State.update_conn(state, conn) + check_connection_status(state) + {:noreply, state} + end + end + + def handle_info(:ping, state) do + Logger.debug("No response received from ping stopping connection") + {:ok, conn} = Mint.HTTP.close(state.conn) + + finish_all_pending_requests(state) + + {:stop, :normal, State.update_conn(state, conn)} + end + def handle_info(message, state) do case Mint.HTTP.stream(state.conn, message) do :unknown -> @@ -251,6 +294,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do state end + defp process_response({:pong, request_ref}, %{ping_ref: request_ref} = state) do + State.set_ping_ref(state, nil) + end + defp process_response({:done, request_ref}, state) do :ok = state @@ -261,6 +308,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do new_state end + defp process_response({:error, request_ref, error}, state) do + pid = State.stream_response_pid(state, request_ref) + :ok = StreamResponseProcess.consume(pid, :error, error) + :ok = StreamResponseProcess.done(pid) + + state + end + defp chunk_body_and_enqueue_rest({request_ref, body, from}, state) do {head, tail} = chunk_body(body, get_window_size(state.conn, request_ref)) @@ -391,7 +446,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do end defp check_connection_status(state) do - if Mint.HTTP.open?(state.conn) do + if Mint.HTTP.open?(state.conn, :read) do check_request_stream_queue(state) else finish_all_pending_requests(state) diff --git a/lib/grpc/client/adapters/mint/connection_process/state.ex b/lib/grpc/client/adapters/mint/connection_process/state.ex index bad73233..61c627e3 100644 --- a/lib/grpc/client/adapters/mint/connection_process/state.ex +++ b/lib/grpc/client/adapters/mint/connection_process/state.ex @@ -1,12 +1,13 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do @moduledoc false - defstruct [:conn, :parent, requests: %{}, request_stream_queue: :queue.new()] + defstruct [:conn, :parent, :ping_ref, requests: %{}, request_stream_queue: :queue.new()] @type t :: %__MODULE__{ conn: Mint.HTTP.t(), requests: map(), - parent: pid() + parent: pid(), + ping_ref: Mint.Types.request_ref() | nil } def new(conn, parent) do @@ -17,6 +18,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do %{state | conn: conn} end + def set_ping_ref(state, ref) do + %{state | ping_ref: ref} + end + def update_request_stream_queue(state, queue) do %{state | request_stream_queue: queue} end diff --git a/lib/grpc/client/adapters/mint/stream_response_process.ex b/lib/grpc/client/adapters/mint/stream_response_process.ex index 249bd280..5a22b006 100644 --- a/lib/grpc/client/adapters/mint/stream_response_process.ex +++ b/lib/grpc/client/adapters/mint/stream_response_process.ex @@ -143,7 +143,8 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do _from, %{responses: responses} = state ) do - {:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}} + + {:reply, :ok, %{state | done: true, responses: [error | responses]}, {:continue, :produce_response}} end def handle_call({:consume_response, :done}, _from, state) do