Skip to content

Conversation

@GeorgeMac
Copy link
Member

@GeorgeMac GeorgeMac commented Oct 15, 2025

Summary

This PR does a couple things:

  1. Addresses a leak of startAutoCloseIdleConnections goroutines
  2. Refactors the idle pool to maintain connections in a circular queue (buffer)

We observed in the CH operator that startAutoCloseIdleConnections goroutines were being leaked.
We found in one long running operator, 35k instances of this goroutine running unchecked.
These get leaked if conn.Close() is not called, however, it appears that the client implementation can also leak them in the situation that the receiver on ch.exit is not ready when conn.Close() is called. Since it does so with a select and a default, which is best effort and needs a receiver to be ready.

A quick test with the new testing/synctest package showed that this was very easy to demonstrate by opening a connection and then immediately closing it. I wanted to include this test in this PR, however, this package only stabilized in Go 1.25 and in 1.24 it was introduced behind a GOEXPERIMENT env var with a different library API. So I have omitted it for now and included it as an example in this description:

// TestIdleConnectionCleanupLeak demonstrates that startAutoCloseIdleConnections
// goroutines are leaked when connections are opened and closed.
//
// This test uses Go's synctest package (stably available in Go 1.25+) to deterministically
// detect the leaked goroutine without relying on timing or sleep calls.
func TestIdleConnectionCleanupLeak(t *testing.T) {
	synctest.Test(t, func(t *testing.T) {
		conn, err := Open(&Options{
			Addr: []string{"localhost:9000"},
		})
		if err != nil {
			t.Fatalf("failed to open connection: %v", err)
		}

		// Close the connection - this should stop the goroutine
		if err := conn.Close(); err != nil {
			t.Fatalf("failed to close connection: %v", err)
		}

		// Wait for all goroutines in this synctest bubble to exit
		// This will hang if startAutoCloseIdleConnections is still running
		synctest.Wait()
	})
}

And it fails deterministically like this:

--- FAIL: TestIdleConnectionCleanupLeak (0.00s)
panic: deadlock: main bubble goroutine has exited but blocked goroutines remain [recovered, repanicked]

goroutine 101 [running]:
testing.tRunner.func1.2({0x1024ab740, 0x140001101b0})
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/testing/testing.go:1872 +0x190
testing.tRunner.func1()
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/testing/testing.go:1875 +0x31c
panic({0x1024ab740?, 0x140001101b0?})
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/runtime/panic.go:783 +0x120
internal/synctest.Run(0x140001227a0)
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/runtime/synctest.go:251 +0x2c4
testing/synctest.Test(0x14000132380, 0x1024ebdb0)
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/testing/synctest/synctest.go:282 +0x88
github.com/ClickHouse/clickhouse-go/v2.TestIdleConnectionCleanupLeak(0x14000132380?)
        /Users/georgemacrorie/github/ClickHouse/clickhouse-go/clickhouse_test.go:16 +0x24
testing.tRunner(0x14000132380, 0x1024ebc90)
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/testing/testing.go:1934 +0xc8
created by testing.(*T).Run in goroutine 1
        /opt/homebrew/Cellar/go/1.25.2/libexec/src/testing/testing.go:1997 +0x364

goroutine 104 [select (durable), synctest bubble 1]:
github.com/ClickHouse/clickhouse-go/v2.(*clickhouse).startAutoCloseIdleConnections(0x14000112e40)
        /Users/georgemacrorie/github/ClickHouse/clickhouse-go/clickhouse.go:334 +0x9c
created by github.com/ClickHouse/clickhouse-go/v2.Open in goroutine 103
        /Users/georgemacrorie/github/ClickHouse/clickhouse-go/clickhouse.go:76 +0x158
FAIL    github.com/ClickHouse/clickhouse-go/v2  0.696s
FAIL

After this change, this test passes.

Additional Refactor

I originally attempted this with a min heap for connection age, but that was both over complicated likely not what we need.
I have since changed it to use a circular queue instead.
So if you see our conversation below and it doesn't make sense, this is likely why.

To tackle this, I wanted to revisit the idle pool, which so far has used a channel for holding connections.
I think the channel works OK, however, it has some drawbacks:

  1. You can only synchronize adding and removing entries (i.e. additional locking is still required to synchronize closing on teardown)
  2. It is awkward to clear out expired connections, as you have to empty and refill it every time.

Instead, I have proposed here a new idle pool and circular queue implementation.

Circular Queue

The circular queue is a ring with a max capacity and constant memory usage (it avoids resizing).
Pushing and pulling from the queue wraps items around in the slice under the hood.
When the queue is full, any attempts to push new entries are rejected.
https://github.com/GeorgeMac/clickhouse-go/blob/71a7247f7421d7939fd6b9ead2a3808121763f1f/internal/circular/queue.go#L40-L51

The queue supports clearing elements based on a filter function (which is used for clearing expired connections).
This returns an iterator over the items which were cleared (we use this later for closing cleared connections).
The queue does not protect against concurrent access, it is expected that the caller will handle that.
https://github.com/GeorgeMac/clickhouse-go/blob/71a7247f7421d7939fd6b9ead2a3808121763f1f/internal/circular/queue.go#L86-L93

There is a set of unit tests and benchmarks focused on attempting to validate these semantics and memory guarantees.

Idle Pool

The idle pool focuses on supporting concurrent access to connections in the backing queue, background cleanup of expired connections and safe cleanup of these processes on shutdown (to address the original idle cleanup goroutine problem).

It exposes a Get and a Put method for managing connections in the pool.
Put handles automatically dropping connections which are expired or when capacity in the pool is reached.
Get removes connections from the queue, until it finds a non-expired one or the queue is empty.
For each already expired one it comes across, it closes it.

The cleanup routine periodically attempts to clear connections which have expired.
It blocks until either its ticker tics, or it is signaled to finish, in which case it returns.

Close handles signalling the cleanup routine to finish, then waiting for it to do so.
Once the cleanup routine is finished, it goes through and clears all connections in the pool.

Changes to type clickhouse struct {}

Primarily, this swaps the channel used for idle connections with the new *idlePool.
This removes the need for the idle cleanup from this part of the code.
Now both acquire and release interface with the idle pool to attempt to fetch existing idle connections and return them.
There is also a little further simplification such as leaning into context timeouts for some dial timeout handling.

Let me know what you think 🙏

Checklist

Delete items not relevant to your PR:

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG
  • For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

@GeorgeMac GeorgeMac force-pushed the gm/idle-connection-cleanup-leak branch from 2cf0ff2 to 0154a51 Compare October 15, 2025 13:12
@kavirajk
Copy link
Contributor

Thanks @GeorgeMac for the PR. Your test case using synctest to reproduce the issue is super helpful ❤️

Although I wonder if we need the whole idlePool to solve the leaking issue. I understand about the age based connections and currently we constantly drain and refill the pool because it's just channel and no orders are maintained. Still I think this seems bit too much complexity for the fixing the leaking issue.

I quickly tried simple fix

  1. Adding a closed flag
type clickhouse struct {
	opt    *Options
	idle   chan nativeTransport
	open   chan struct{}
	exit   chan struct{}
	closed atomic.Bool
	connID int64
}
  1. And blocking on c.exit <- struct{}{} without any default case.
func (ch *clickhouse) Close() error {
	if ch.closed.Load() {
		return nil
	}

	for {
		select {
		case conn := <-ch.idle:
			conn.debugf("[close: closing pool]")
			conn.close()
		default:
			ch.exit <- struct{}{}
			ch.closed.Store(true)
			return nil
		}
	}
}

(2) is to wait till the go routine (go conn.startAutoCloseIdleConnections()) is returned without leaking. And your test case seems to be passing just fine. This also avoids multiple calls to Close() to block. What do you think?

I'm curious how bad "not having age based priority queue" is impacting your use cases? Happy to discuss in detail.

@GeorgeMac
Copy link
Member Author

GeorgeMac commented Oct 15, 2025

@kavirajk we could definitely go after a simpler fix that keeps it more in-line with what is already here if that is preferred 👍

What you proposed works for ensuring that we stop leaking startAutoCloseIdleConnections, which is great 💯

However, one thing to keep in mind here is that is doesn't guarantee to drain the idle pool on shutdown, which leaves the real connections unclosed.

The select there with a default, which receives on the idle channel. is also best-effort and if it doesn't have something in it at that time, then the default clause will run at the send to exit will happen. However, as soon as the exit channel send happens and it synchronizes with the receiver in startAutoCloseIdleConnections, a concurrent call to release() could put another connection in the idle pool in this moment.

Now we have released connections which will never be closed, because the auto close idle routine is gone.

We could go a step further with your proposal there and attempt to drain the idle channel.
However, we need to also be sure that any concurrent calls which release active connectons will not attempt to put those back in the idle pool. We need that guarantee before we do one last drain of the pool.

This is also something I have addressed in my implementation, and I suspect that the existing idle connections counting tests in the tests directory will actually show this is necessary. Because they count open connections on the CH server side, which require all these connections in the idle pool have explicitly had close called on them when drained.

You are right, the ordering by age is not necessary to solve the problem, and I am happy to remove it if it is not favourable. However, I would challenge the decision to use a channel as an idle pool, and would instead suggest we would be better served with a slice and a lock for this. Much like how database/sql.DB manages its connection pool.

@GeorgeMac
Copy link
Member Author

GeorgeMac commented Oct 16, 2025

Update: Actually, you are right to question the order connections by age. I am not sure what I was thinking there. I guess just prioritising older connections get reuse before releasing them. It also had the nice side effect that when putting a connection into a pool at capacity, we always throw away the connection with the soonest expiration. However, in hindsight (after sleeping on it and your push back) I see that really FIFO order of connections being reused is probably a fairer use of all connections. Rather than potentially never using newer connections, which could happen with the priority queue based on age.

In that case, a channel has good semantics for the behaviour, but I still think it has ergonomic issues and still really we need a lock to coordinate that once closed any returned connections aren't added, but simply closed themselves.

Let me shuffle this around, see if I can keep it closer to the original implementation, remove the leak, and ensure all connections are drained.

@GeorgeMac
Copy link
Member Author

GeorgeMac commented Oct 16, 2025

Sorry @kavirajk in an effort to remove complexity, I made the change bigger 😅

The change I made breaks the logic up into three separate parts:

  1. The existing top-level pool for opening new connections and retrieving existing idle ones
  2. The idle pool for safe concurrent access to idle connections and cleanup of expired ones
  3. A circular queue/buffer implementation with fixed capacity, that rejects inserts when full

I updated the description above to go into details.

While bigger, I hope the implementation is more digestible in some ways (or maybe not, you tell me), and with far more tests than before.
If it is still too much, we can just ignore it all and go ahead make the channel work.
I appreciate this is me scratching an itch 😅 It's your call 👍

@kavirajk
Copy link
Contributor

No worries @GeorgeMac. Thanks for trying to fix this issue. I will take a deep look today :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants