Skip to content

High memory allocations when recreating consumers #339

@CIRCULARKA

Description

@CIRCULARKA

Hello!

I have noticed that there are constantly increasing memory allocations when I recreate consumers after disposing them. Here is the code snippet that can reproduce the issue:

var client = await new PulsarClientBuilder()
    .ServiceUrl("<host>")
    .BuildAsync();

var tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
    var index = i;
    tasks.Add(
        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    await using var consumer = await client
                        .NewConsumer(Schema.STRING())
                        .Topic(
                            "<topic-name>"
                        )
                        .SubscriptionName("test" + index)
                        .SubscriptionType(SubscriptionType.KeyShared)
                        .BatchReceivePolicy(new BatchReceivePolicy(100))
                        .SubscribeAsync();

                    var msgs = await consumer.BatchReceiveAsync();
                    await consumer.AcknowledgeAsync(msgs);

                    await consumer.DisposeAsync();
                }
                catch (Exception e)
                {
                    Console.WriteLine("Error in thread " + index + ": " + e);
                }
            }
        })
    );
}

await Task.WhenAll(tasks);

The code snippet simply reproduces of how my service works: it has many threads a.k.a. "processes" where each one has one consumer. When error on the process occurs - it must be restarted, that is fully disposed and created again. The problem is that memory allocations keep growing after such procedures and my service ends up running with high memory usage or even got killed by OOM in hours.

The code snippet above starting at 80-90 MB and keeps growing till ~600 MB, then memory keep holding at those ~600 MB. I think this is very big consumption considering I'm not saving any data here and expecting GC to clear unreferenced messages. Moreover, rate of incoming messages in my case is very low: a couple of messages per minute.

If I'll modify code snippet above and move consumer creation above the while (true), that is, remove constant consumer reinitialization, memory consumption will constantly be at level of ~70 MB and that's what I want when recreating consumers too.

I also collected memory dump (with dotnet dump) of the program above after it has reached ~600 MB and found out, that most of the space is busy by System.Byte[] arrays.

dumpheap -stat -live of dump I've collected previously:

7fad03957680  9,738   2,259,216 Pulsar.Client.Internal.ClientCnx
7fad045b64a8 26,733   2,566,368 System.IO.Pipelines.BufferSegment
7fad04784ca0  9,738   2,727,664 Pulsar.Client.Internal.RequestsOperation[]
7fad047885c0  9,738   2,728,944 Pulsar.Client.Internal.CnxOperation[]
7fad0478afa8  9,738   2,941,680 Pulsar.Client.Internal.SocketMessage[]
7fad05464040 41,730   3,004,560 System.Threading.Tasks.Task<Pulsar.Client.Common.PulsarResponseType>
7fad0395f3c0 29,710   3,089,840 System.Threading.Channels.AsyncOperation<System.Boolean>
7fad02de8d38 19,567   3,124,084 System.Int32[]
7fad03990c78 49,182   3,541,104 System.Threading.Tasks.ContinuationTaskFromTask
7fad04784bf8  9,738   4,128,912 System.Collections.Concurrent.SingleProducerSingleConsumerQueue<Pulsar.Client.Internal.RequestsOperation>+Segment
7fad04788518  9,738   4,128,912 System.Collections.Concurrent.SingleProducerSingleConsumerQueue<Pulsar.Client.Internal.CnxOperation>+Segment
7fad0478af00  9,738   4,128,912 System.Collections.Concurrent.SingleProducerSingleConsumerQueue<Pulsar.Client.Internal.SocketMessage>+Segment
7fad041de6d8 19,476   4,985,856 Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs
7fad045ce638  9,737   5,275,512 Pulsar.Client.Internal.RequestTime[]
7fad045b3a10 19,476   5,609,088 System.IO.Pipelines.Pipe
7fad02debe30 46,034   5,907,280 System.String
7fad05465128  9,736   7,660,080 System.Collections.Generic.Dictionary<System.UInt64, System.Threading.Tasks.TaskCompletionSource<Pulsar.Client.Common.PulsarResponseType>>+Entry[]
7fad05551088  9,735   7,661,280 System.Collections.Generic.Dictionary<System.UInt64, Pulsar.Client.Internal.ConsumerOperations>+Entry[]
7fad03658a48 24,596 565,715,484 System.Byte[] <- The most allocated object is System.Byte[]
Total 1,628,600 objects, 707,623,488 bytes

Most of allocated byte arrays have same size (dumpheap -mt 7fad03658a48 -live):

    7f6d28443818     7fad03658a48          4,120
    7f6d284461f0     7fad03658a48          4,120
    7f6d28450070     7fad03658a48          4,120
    7f6d28467720     7fad03658a48          4,120
    7f6d2846cc00     7fad03658a48          4,120
    7f6d284a5d80     7fad03658a48          4,120
    7f6d284b2c70     7fad03658a48          4,120
    7f6d284b4e10     7fad03658a48          4,120
    7f6d284c9058     7fad03658a48          4,120
    7f6d2853ee80     7fad03658a48          4,120
    7f6d285a50f0     7fad03658a48          4,120
    7f6d285cf048     7fad03658a48          4,120
    7f6d28605f78     7fad03658a48          4,120
    7f6d28607368     7fad03658a48          4,120
    7f6d2862b4f8     7fad03658a48          4,120
    7f6d2862e2e8     7fad03658a48          4,120
    7f6d28685638     7fad03658a48          4,120
    7f6d286aeae0     7fad03658a48          4,120
    7f6d2873be90     7fad03658a48          4,120
    7f6d2873d5a0     7fad03658a48          4,120
    7f6d287416d0     7fad03658a48          4,120
    7f6d28742e58     7fad03658a48          4,120
    7f6d287bd308     7fad03658a48          4,120
    7f6d287df270     7fad03658a48          4,120
    7f6d287e4a20     7fad03658a48          4,120
    7f6d287f12c8     7fad03658a48          4,120
    7f6d287fbfc8     7fad03658a48          4,120
    7f6d28879958     7fad03658a48          4,120
    7f6d28887600     7fad03658a48          4,120
    7f6d288a6ac8     7fad03658a48          4,120
    7f6d288d1af8     7fad03658a48          4,120
    7f6d288e4de0     7fad03658a48          4,120
    7f6d28918f88     7fad03658a48          4,120
    7f6d2891f708     7fad03658a48          4,120
    7f6d289684d8     7fad03658a48          4,120
    7f6d2897b2f8     7fad03658a48             40
    7f6d28a38070     7fad03658a48          4,120
    7fad001148d0     7fad03658a48             24

Then I analyzed some of such System.Byte[] instances, all of them has such roots (gcroot 7f6d28a38070, for example):

...
          -> 7f6d1bc4f220     System.Threading.TimerQueueTimer
          -> 7f6d1bc4de58     System.Threading.TimerCallback
          -> 7f6d1bc4de00     System.Timers.Timer
          -> 7f6d1bc4f1b0     System.Timers.ElapsedEventHandler
          -> 7f6d1bc4f198     <StartupCode$Pulsar-Client>.$ClientCnx+clo@150-14
          -> 7f6d1bc4f180     Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers+h@462<System.Timers.ElapsedEventArgs>
          -> 7f6d1bc4f168     Microsoft.FSharp.Control.CommonExtensions+SubscribeToObservable@2274<System.Timers.ElapsedEventArgs>
          -> 7f6d1bc4f150     <StartupCode$Pulsar-Client>.$ClientCnx+clo@150-15
          -> 7f6d0e2edf90     Pulsar.Client.Internal.ClientCnx
          -> 7f6d0e2edef0     Pulsar.Client.Common.Connection
          -> 7f6d1bc44130     Pipelines.Sockets.Unofficial.SocketConnection+WrappedReader
          -> 7f6d1bc43d48     Pipelines.Sockets.Unofficial.SocketConnection
          -> 7f6d0e2e4338     Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs
          -> 7f6d1bc58d38     System.Byte[]

I see absolutely the same picture when analyzing dumps of my production service (but with much higher memory usages) - the most allocated type is System.Byte[], most of objects rooted somewhere in Pulsar.Client library

I don't thinks that this is memory leak, otherwise memory consumption could go higher and higher, but can I avoid such big memory allocations? On production environment with higher loads, proper disposing of consumers and creating them again do not allow me to free much resources and allocations keep growing too much relative to loads my service handles.

Can it be related to messages or connections pooling? I've seen that you implemented Pulsar's PIP-83, but I couldn't found the way to disable pooling in the way it was proposed by Pulsar team (and actually not sure if it will help). Can there be another reason for that?

.NET version: 9.0 (reproducible on 8.0 as well)
Pulsar.Client version: 3.13.1

Thanks!

UPDATE
Just found out issue about the same thing. Does access to RecyclableMemoryStreamManager via reflection is the only way to fix the problem when having many restartable consumers? Any plans about it?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions