Skip to content

fix: Reconnect on a closed connection when checked out#304

Open
jrogov wants to merge 1 commit intoplausible:masterfrom
adjust:fix/reconnect-on-closed-checkout-conn
Open

fix: Reconnect on a closed connection when checked out#304
jrogov wants to merge 1 commit intoplausible:masterfrom
adjust:fix/reconnect-on-closed-checkout-conn

Conversation

@jrogov
Copy link
Copy Markdown

@jrogov jrogov commented Apr 10, 2026

(In collaboration with @trollfred)

Hiya 👋

We've been experimenting with checkout a bit more, and stumbled upon a rather inconvenient behaviour of Ch when a connection's been checkout of echo_ch repo.

TL;DR: due to Clickhouse's keep_alive_timeout (defaults to 10 seconds), when a connection is checked out and there's a pause between DB operations, connection gets closed by server, and instead of reconnecting, ch just fails.

This PR provides a simple single reconnect in such cases.

Steps to reproduce:

Mix.install([
  {:ecto_sql, "~> 3.13.0"},
  {:ch, path: Path.expand("~/code/ch"), override: true},
  {:ecto_ch, "~> 0.8.6"}
])

defmodule ChRepo do
  use Ecto.Repo,
    otp_app: :demo,
    adapter: Ecto.Adapters.ClickHouse
end

{:ok, _} =
  ChRepo.start_link(
    hostname: "localhost",
    port: 8123,
    database: "default",
    # Ensure that nothing else is interfering
    timeout: 60_000
)

ChRepo.checkout(
  fn ->
    ChRepo.query!("SELECT 'before'")

    # Anything >10s
    :timer.sleep(15_000)

    ChRepo.query!("SELECT 'after'")
  end,
  timeout: :infinity
)

Result:

17:42:51.521 [debug] QUERY OK db=4.0ms
SELECT 'before' []

17:44:51.523 [debug] QUERY ERROR db=0.2ms
SELECT 'after' []

17:44:51.528 [error] Ch.Connection (#PID<0.283.0> ("db_conn_8")) disconnected: ** (Mint.TransportError) socket closed
** (Mint.TransportError) socket closed
    (ecto_sql 3.13.5) lib/ecto/adapters/sql.ex:1113: Ecto.Adapters.SQL.raise_sql_call_error/1
    iex:17: (file)
    iex:9: (file)

Comment thread lib/ch/connection.ex
Comment on lines 386 to +391
def request_chunked(conn, method, path, headers, stream, opts) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
with_retry_if_stale_connection(conn, fn conn ->
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
end)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we should not do this on request_chunked when stream-writing the data, as it would mean that we could write duplicated data. Just wanted to check if that's correct.

If that's so, let me know, and I'll remove this part

Copy link
Copy Markdown
Collaborator

@ruslandoga ruslandoga Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it would mean that we could write duplicated data

I think since it's an ongoing HTTP request, if the connection gets closed for it, ClickHouse would drop all received data, at least for non-async inserts. But I haven't actually checked that. So it might be that we don't gain anything from reconnecting since we need to start sending the stream anew anyway.

Copy link
Copy Markdown
Collaborator

@ruslandoga ruslandoga Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also request_chunked should probably be removed (I'll open a PR), I added it because I didn't know at the time about ability to do Stream.into/2 with DBConnection.

DBConnection.run(ch, fn conn ->
  File.stream!("demo.csv", 512_000)
  |> Stream.into(Ch.stream(conn, "insert into demo format CSV\n"))
  |> Stream.run()
end)

But the issue would remain, do we reconnect in those scenarios or not.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since it's an ongoing HTTP request, if the connection gets closed for it, ClickHouse would drop all received data, at least for non-async inserts.

Good catch!

But thinking about it, the problem we're trying to solve here will never manifest in the middle of the insert anyway: the query gets closed because of the keep-alive-timeout, in particular on Clickhouse side.

Even if it's because of a proxy in-between, it'll still happen in the very beginning, before we actually manage to insert any data at all.

So the flow will be initiate chunked request -> try to send data -> try to receive ack -> get socket: closed`, with no data inserted at all -> reconnect (this PR) -> start actually inserting the data.

The only side-effect would be (partially) running the Stream twice..?

But that gets covered by your notion that...

Also request_chunked should probably be removed (I'll open a PR), I added it because I didn't know at the time about ability to do Stream.into/2 with DBConnection.

What would be the flow here, then? I see that lib/ch/stream.ex gets a portion of data, and then simply sends it, IIUC. Then it wouldn't rerun the Stream if it fails on sending that data?

But IIUC we should also add retry mechanism here as well: https://github.com/adjust/ch/blob/fcfe1e3fbdb9999a84719425896b919f9a84f049/lib/ch/connection.ex#L193-L223

WDYT?

@ruslandoga
Copy link
Copy Markdown
Collaborator

👋 @jrogov

I wonder, what's your use-case? If something is pausing for that long, why not do two separate checkouts?

@ruslandoga ruslandoga mentioned this pull request Apr 11, 2026
10 tasks
@jrogov
Copy link
Copy Markdown
Author

jrogov commented Apr 13, 2026

I wonder, what's your use-case? If something is pausing for that long, why not do two separate checkouts?

We run queries on a considerably big dataset. Even with CH these queries can run up to 10 minutes. The queries need so much memory that they frequently OOM unless we split them in smaller queries, and chain them together (think Stream.concat).

Obviously, we don't want to hand-over connections between these "sub-queries" – we'd rather optimize for a single stream efficiency than request parallelization.

So we need checkout to ensure that the connection is reserved for this single query, and it can complete all "sub-queries" without any interruption.

@ruslandoga
Copy link
Copy Markdown
Collaborator

ruslandoga commented Apr 13, 2026

Obviously, we don't want to hand-over connections between these "sub-queries"

Why not? Compared to 10 minute queries, checkout costs are probably negligible. And these HTTP connections should probably be considered interchangeable. I think I am missing something ...

So we need checkout to ensure that the connection is reserved for this single query, and it can complete all "sub-queries" without any interruption.

You might also want to consider not using a shared pool for it at all but a "dedicated" one, with a single connection. It's not easy to do right now because of the way Ch uses db_connection but it might get easier after #303 (I'm giving it an honest attempt this time). In your use case you'd do a checkout per sub-query, but the underlying connection would get reused.


One problem with reconnects/retries like this, I think, is that we can only know about closed connection on Mint.HTTP1.recv/3 and that might be too late for INSERTs and other "non-retriable" requests (by the time we get "closed socket" error from Mint.HTTP1.recv/3 we've already written the data with Mint.HTTP1.request/5, and it might have already been or would be soon delivered, but some proxy along the way decided to close our mint-to-proxy connection). I've just realized this problem with retrying too much is also present in #292, I'll need to open a quickfix PR.

Maybe we can have something like retry: true|false|:safe where :safe would only retry on {:error, %Mint.TransportError{reason: :closed, ...}} from Mint.HTTP1.request/5 or SELECT and other "read" queries.

I don't know if "closed socket" errors ever happen on Mint.HTTP1.request/5, it always returns ok to me even when trying it against :gen_tcp.listen -> :gen_tcp.accept -> :gen_tcp.close locally.


Maybe in the future we could use mode: :active while sockets are in the pool, but not sure if it'd help. Also Gemini suggests trying Mint.HTTP1.recv(conn, 0, 0) to simulate peeking at the TCP queue. We can do it before sending the request to determine if the socket is open, maybe. I haven't tried it yet. I'll try asking Claude later as well.

@ruslandoga ruslandoga mentioned this pull request Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants