diff --git a/benchmarks/src/main/scala/cats/effect/benchmarks/ScalQueueBenchmark.scala b/benchmarks/src/main/scala/cats/effect/benchmarks/ScalQueueBenchmark.scala index eb26954b27..802275b8bf 100644 --- a/benchmarks/src/main/scala/cats/effect/benchmarks/ScalQueueBenchmark.scala +++ b/benchmarks/src/main/scala/cats/effect/benchmarks/ScalQueueBenchmark.scala @@ -48,11 +48,11 @@ class ScalQueueBenchmark { @Param(Array("4")) // keep this a power of 2 var threads: Int = _ - val thing = new AnyRef + val thing = new Runnable { def run(): Unit = {} } @Benchmark def scalConcurrentEnqueueDequeue(): Unit = { - val q = new ScalQueue[AnyRef](threads) + val q = new ScalQueue(threads) val latch = new CountDownLatch(threads) // every thread will send and receive this number of events diff --git a/build.sbt b/build.sbt index e2aa5b60d0..f2ca16b5e9 100644 --- a/build.sbt +++ b/build.sbt @@ -725,6 +725,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) // internal API change, makes CpuStarvationMetrics available on all platforms ProblemFilters.exclude[MissingClassProblem]( "cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"), + // introduced by #4292, external queue metrics + // WorkStealingThreadPoolMetrics is a sealed trait, so we control all of its implementations. + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "cats.effect.unsafe.metrics.WorkStealingThreadPoolMetrics.externalQueue"), ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMetrics"), // package-private classes moved to the `cats.effect.unsafe.metrics` package ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation"), @@ -735,7 +739,8 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) "cats.effect.unsafe.WorkerThread.getSuspendedFiberCount"), // protected constructor modified when fixing #4359 ProblemFilters.exclude[DirectMissingMethodProblem]( - "cats.effect.unsafe.IORuntimeBuilder.$default$10") + "cats.effect.unsafe.IORuntimeBuilder.$default$10"), + ProblemFilters.exclude[Problem]("cats.effect.unsafe.ScalQueue*") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/LocalQueue.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/LocalQueue.scala index f8bcf5af1f..9c6384fa90 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/LocalQueue.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/LocalQueue.scala @@ -193,7 +193,7 @@ private final class LocalQueue extends LocalQueuePadding { * a reference to an uncontended source of randomness, to be passed along to the striped * concurrent queues when executing their enqueue operations */ - def enqueue(fiber: Runnable, external: ScalQueue[AnyRef], random: ThreadLocalRandom): Unit = { + def enqueue(fiber: Runnable, external: ScalQueue, random: ThreadLocalRandom): Unit = { // A plain, unsynchronized load of the tail of the local queue. val tl = tail @@ -281,7 +281,7 @@ private final class LocalQueue extends LocalQueuePadding { // Enqueue all of the batches of fibers on the batched queue with a bulk // add operation. - external.offerAll(batches, random) + external.offerAllBatches(batches, random) // Loop again for a chance to insert the original fiber to be enqueued // on the local queue. } @@ -657,7 +657,7 @@ private final class LocalQueue extends LocalQueuePadding { * a reference to an uncontended source of randomness, to be passed along to the striped * concurrent queues when executing their enqueue operations */ - def drainBatch(external: ScalQueue[AnyRef], random: ThreadLocalRandom): Unit = { + def drainBatch(external: ScalQueue, random: ThreadLocalRandom): Unit = { // A plain, unsynchronized load of the tail of the local queue. val tl = tail @@ -702,8 +702,7 @@ private final class LocalQueue extends LocalQueuePadding { totalSpilloverCount += SpilloverBatchSize Tail.updater.lazySet(this, tl) } - - external.offer(batch, random) + external.offerBatch(batch, random) return } } diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/ScalQueue.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/ScalQueue.scala index 655d39eaa4..ec47873d22 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/ScalQueue.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/ScalQueue.scala @@ -17,6 +17,7 @@ package cats.effect.unsafe import java.util.concurrent.{ConcurrentLinkedQueue, ThreadLocalRandom} +import java.util.concurrent.atomic.AtomicLong /** * A striped queue implementation inspired by the [[https://scal.cs.uni-salzburg.at/dq/ Scal]] @@ -31,7 +32,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, ThreadLocalRandom} * @param threadCount * the number of threads to load balance between */ -private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { +private[effect] final class ScalQueue(threadCount: Int) { /** * Calculates the next power of 2 using bitwise operations. This value actually represents the @@ -55,12 +56,30 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { */ private[this] val numQueues: Int = mask + 1 + private def createAtomicLongArray(size: Int): Array[AtomicLong] = { + val counts = new Array[AtomicLong](size) + var i = 0 + while (i < size) { + counts(i) = new AtomicLong(0) + i += 1 + } + counts + } + + // Metrics counters for tracking external queue submissions and present counts + private[this] val totalSingletonCounts: Array[AtomicLong] = createAtomicLongArray(numQueues) + private[this] val singletonCounts: Array[AtomicLong] = createAtomicLongArray(numQueues) + private[this] val totalBatchCounts: Array[AtomicLong] = createAtomicLongArray(numQueues) + private[this] val batchCounts: Array[AtomicLong] = createAtomicLongArray(numQueues) + private[this] val totalFiberCounts: Array[AtomicLong] = createAtomicLongArray(numQueues) + private[this] val fiberCounts: Array[AtomicLong] = createAtomicLongArray(numQueues) + /** * The concurrent queues backing this Scal queue. */ - private[this] val queues: Array[ConcurrentLinkedQueue[A]] = { + private[this] val queues: Array[ConcurrentLinkedQueue[AnyRef]] = { val nq = numQueues - val queues = new Array[ConcurrentLinkedQueue[A]](nq) + val queues = new Array[ConcurrentLinkedQueue[AnyRef]](nq) var i = 0 while (i < nq) { queues(i) = new ConcurrentLinkedQueue() @@ -70,16 +89,49 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { } /** - * Enqueues a single element on the Scal queue. + * Enqueues a single element (singleton task) on the Scal queue. * * @param a * the element to be enqueued * @param random * an uncontended source of randomness, used for randomly choosing a destination queue */ - def offer(a: A, random: ThreadLocalRandom): Unit = { + def offer(a: Runnable, random: ThreadLocalRandom): Unit = { val idx = random.nextInt(numQueues) queues(idx).offer(a) + + // Track as singleton task - using the same index for striped counters + totalSingletonCounts(idx).incrementAndGet() + singletonCounts(idx).incrementAndGet() + + // Also increment total fiber counts + totalFiberCounts(idx).incrementAndGet() + fiberCounts(idx).incrementAndGet() + () + } + + /** + * Enqueues a batch element (Array of tasks) on the Scal queue. + * + * @param batch + * the batch to be enqueued + * @param random + * an uncontended source of randomness, used for randomly choosing a destination queue + */ + + def offerBatch(batch: Array[Runnable], random: ThreadLocalRandom): Unit = { + val idx = random.nextInt(numQueues) + queues(idx).offer(batch) + + // Track as batch task + totalBatchCounts(idx).incrementAndGet() + batchCounts(idx).incrementAndGet() + + // Also increment total fiber counts by batch size + val batchSize = batch.length + totalFiberCounts(idx).addAndGet(batchSize.toLong) + fiberCounts(idx).addAndGet(batchSize.toLong) + () } @@ -104,7 +156,7 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { * @param random * an uncontended source of randomness, used for randomly choosing a destination queue */ - def offerAll(as: Array[? <: A], random: ThreadLocalRandom): Unit = { + def offerAll(as: Array[Runnable], random: ThreadLocalRandom): Unit = { val nq = numQueues val len = as.length var i = 0 @@ -112,8 +164,50 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { val fiber = as(i) val idx = random.nextInt(nq) queues(idx).offer(fiber) + + // Track as singleton task + totalSingletonCounts(idx).incrementAndGet() + singletonCounts(idx).incrementAndGet() + + // Also increment total fiber counts + totalFiberCounts(idx).incrementAndGet() + fiberCounts(idx).incrementAndGet() + i += 1 } + () + } + + /** + * Offers multiple batches of tasks to this Scal queue. + * + * @param batches + * an array of runnable batches to be offered to the queue + * @param random + * a reference to an uncontended source of randomness, to be passed along to the striped + * concurrent queues when executing their offer operations + */ + def offerAllBatches(batches: Array[Array[Runnable]], random: ThreadLocalRandom): Unit = { + val nq = numQueues + val len = batches.length + var i = 0 + while (i < len) { + val batch = batches(i) + val idx = random.nextInt(nq) + queues(idx).offer(batch) + + // Track as batch task + totalBatchCounts(idx).incrementAndGet() + batchCounts(idx).incrementAndGet() + + // Also increment total fiber counts by batch size + val batchSize = batch.length + totalFiberCounts(idx).addAndGet(batchSize.toLong) + fiberCounts(idx).addAndGet(batchSize.toLong) + + i += 1 + } + () } /** @@ -125,19 +219,37 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { * @return * an element from this Scal queue or `null` if this queue is empty */ - def poll(random: ThreadLocalRandom): A = { + + def poll(random: ThreadLocalRandom): AnyRef = { val nq = numQueues val from = random.nextInt(nq) var i = 0 - var a = null.asInstanceOf[A] + var element: AnyRef = null - while ((a eq null) && i < nq) { + while ((element eq null) && i < nq) { val idx = (from + i) & mask - a = queues(idx).poll() + element = queues(idx).poll() + + // If we found an element, decrement the appropriate counter + if (element ne null) { + if (element.isInstanceOf[Array[Runnable]]) { + batchCounts(idx).decrementAndGet() + // Decrement fiber present count by batch size + val batchSize = element.asInstanceOf[Array[Runnable]].length + fiberCounts(idx).addAndGet(-batchSize.toLong) + () + } else { + singletonCounts(idx).decrementAndGet() + // Decrement fiber present count by 1 + fiberCounts(idx).decrementAndGet() + () + } + } + i += 1 } - a + element } /** @@ -156,13 +268,29 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { * @param a * the element to be removed */ - def remove(a: A): Unit = { + def remove(a: AnyRef): Unit = { val nq = numQueues var i = 0 var done = false while (!done && i < nq) { done = queues(i).remove(a) + + // If we removed the element, decrement the appropriate counter + if (done) { + if (a.isInstanceOf[Array[Runnable]]) { + batchCounts(i).decrementAndGet() + // Decrement fiber present count by batch size + val batchSize = a.asInstanceOf[Array[Runnable]].length + fiberCounts(i).addAndGet(-batchSize.toLong) + () + } else { + singletonCounts(i).decrementAndGet(); + // Decrement fiber present count by 1 + fiberCounts(i).decrementAndGet(); () + } + } + i += 1 } } @@ -214,11 +342,96 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { * Clears all concurrent queues that make up this Scal queue. */ def clear(): Unit = { - val nq = numQueues var i = 0 - while (i < nq) { + while (i < numQueues) { queues(i).clear() + + // Reset all counters for this stripe + singletonCounts(i).set(0) + batchCounts(i).set(0) + fiberCounts(i).set(0) + + i += 1 + } + } + + /** + * Returns the total number of singleton tasks submitted to this queue. + */ + def getTotalSingletonCount(): Long = { + var total = 0L + var i = 0 + while (i < numQueues) { + total += totalSingletonCounts(i).get() + i += 1 + } + total + } + + /** + * Returns the number of singleton tasks currently in this queue. + */ + def getSingletonCount(): Long = { + var total = 0L + var i = 0 + while (i < numQueues) { + total += singletonCounts(i).get() + i += 1 + } + total + } + + /** + * Returns the total number of batch tasks submitted to this queue. + */ + def getTotalBatchCount(): Long = { + var total = 0L + var i = 0 + while (i < numQueues) { + total += totalBatchCounts(i).get() + i += 1 + } + total + } + + /** + * Returns the number of batch tasks currently in this queue. + */ + def getBatchCount(): Long = { + var total = 0L + var i = 0 + while (i < numQueues) { + total += batchCounts(i).get() + i += 1 + } + total + } + + /** + * Returns the total number of fibers (individual tasks + fibers in batches) submitted to this + * queue. + */ + def getTotalFiberCount(): Long = { + var total = 0L + var i = 0 + while (i < numQueues) { + total += totalFiberCounts(i).get() + i += 1 + } + total + } + + /** + * Returns the number of fibers (individual tasks + fibers in batches) currently in this + * queue. + */ + def getFiberCount(): Long = { + var total = 0L + var i = 0 + while (i < numQueues) { + total += fiberCounts(i).get() i += 1 } + total } } diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 558d33bffe..1b58bde484 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -121,9 +121,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( worker.ownsPoller(poller) } else false } - - private[this] val externalQueue: ScalQueue[AnyRef] = - new ScalQueue(threadCount << 2) + private[unsafe] val externalQueue: ScalQueue = new ScalQueue(threadCount << 2) /** * Represents two unsigned 16 bit integers. The 16 most significant bits track the number of diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkerThread.scala index d7b7c9af93..8267b866de 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -52,7 +52,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( private[unsafe] var parked: AtomicReference[ParkedSignal], // External queue used by the local queue for offloading excess fibers, as well as // for drawing fibers when the local queue is exhausted. - private[this] val external: ScalQueue[AnyRef], + private[this] val external: ScalQueue, // A worker-thread-local weak bag for tracking suspended fibers. private[this] var fiberBag: WeakBag[Runnable], private[this] var sleepers: TimerHeap, diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/metrics/WorkStealingThreadPoolMetrics.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/metrics/WorkStealingThreadPoolMetrics.scala index 3ba78b289a..47602d5ade 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/metrics/WorkStealingThreadPoolMetrics.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/metrics/WorkStealingThreadPoolMetrics.scala @@ -85,12 +85,54 @@ sealed trait WorkStealingThreadPoolMetrics { */ def suspendedFiberCount(): Long + /** + * External queue metrics for this work-stealing thread pool. + */ + def externalQueue: ExternalQueueMetrics + /** * The list of worker-specific metrics of this work-stealing thread pool. */ def workerThreads: List[WorkerThreadMetrics] } +/** + * Represents metrics for the external task queue in a work-stealing thread pool. + */ +sealed trait ExternalQueueMetrics { + + /** + * Returns the total number of singleton tasks submitted to the queue. + */ + def totalSingletonCount(): Long + + /** + * Returns the number of singleton tasks currently in the queue. + */ + def singletonCount(): Long + + /** + * Returns the total number of batch tasks submitted to the queue. + */ + def totalBatchCount(): Long + + /** + * Returns the number of batch tasks currently in the queue. + */ + def batchCount(): Long + + /** + * Returns the total number of fibers (individual tasks + fibers in batches) submitted to the + * queue. + */ + def totalFiberCount(): Long + + /** + * Returns the number of fibers (individual tasks + fibers in batches) currently in the queue. + */ + def fiberCount(): Long +} + sealed trait WorkerThreadMetrics { /** @@ -264,10 +306,23 @@ object WorkStealingThreadPoolMetrics { def localQueueFiberCount(): Long = wstp.getLocalQueueFiberCount() def suspendedFiberCount(): Long = wstp.getSuspendedFiberCount() + val externalQueue: ExternalQueueMetrics = externalQueueMetrics(wstp.externalQueue) + val workerThreads: List[WorkerThreadMetrics] = List.range(0, workerThreadCount()).map(workerThreadMetrics(wstp, _)) } + private def externalQueueMetrics(queue: ScalQueue): ExternalQueueMetrics = + new ExternalQueueMetrics { + + def totalSingletonCount(): Long = queue.getTotalSingletonCount() + def singletonCount(): Long = queue.getSingletonCount() + def totalBatchCount(): Long = queue.getTotalBatchCount() + def batchCount(): Long = queue.getBatchCount() + def totalFiberCount(): Long = queue.getTotalFiberCount() + def fiberCount(): Long = queue.getFiberCount() + } + private def workerThreadMetrics[P <: AnyRef]( wstp: WorkStealingThreadPool[P], idx: Int diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala new file mode 100644 index 0000000000..070cd3e484 --- /dev/null +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala @@ -0,0 +1,217 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe + +import cats.effect.IOSuite + +import java.util.concurrent.ThreadLocalRandom + +class ScalQueueSuite extends IOSuite { + + /** + * Tests that the ScalQueue metrics correctly track singleton submissions. + */ + test("ScalQueue metrics track singleton submissions") { + // Create a queue with 4 stripes + val queue = new ScalQueue(4) + val random = ThreadLocalRandom.current() + + // Get initial metrics before any operations + val initialSingletonCount = queue.getTotalSingletonCount() + + // Verify initial metrics are all zero + assertEquals(initialSingletonCount, 0L, "Initial singleton count should be zero") + + // Add a singleton task (a simple no-op Runnable) + queue.offer(() => (), random) + + // Verify that the singleton count has increased to exactly 1 + val afterSingletonCount = queue.getTotalSingletonCount() + assertEquals(afterSingletonCount, 1L, "Singleton count should be exactly 1") + + // Test striping by adding several more singleton tasks + var j = 0 + while (j < 4) { + queue.offer(() => (), random) + j += 1 + } + + // Verify the updated singleton count is exactly 5 (1 initial + 4 more) + val afterStripingSingletonCount = queue.getTotalSingletonCount() + assertEquals( + afterStripingSingletonCount, + 5L, + "Singleton count should be exactly 5 after striping") + } + + /** + * Tests that the ScalQueue metrics correctly track batch submissions. + */ + test("ScalQueue metrics track batch submissions") { + // Create a queue with 4 stripes + val queue = new ScalQueue(4) + val random = ThreadLocalRandom.current() + + // Get initial metrics before any operations + + val initialBatchCount = queue.getTotalBatchCount() + val initialFiberCount = queue.getTotalFiberCount() + + // Verify initial metrics are all zero + assertEquals(initialBatchCount, 0L, "Initial batch count should be zero") + assertEquals(initialFiberCount, 0L, "Initial fiber count should be zero") + + // Add a singleton task for later fiber count verification + queue.offer(() => (), random) + + // Create a batch of 10 no-op tasks + val batchSize = 10 + val batch = new Array[Runnable](batchSize) + var i = 0 + while (i < batchSize) { + batch(i) = () => () + i += 1 + } + + // Add the batch to the queue + queue.offerBatch(batch, random) + + // Verify that the batch count has increased to exactly 1 + val afterBatchCount = queue.getTotalBatchCount() + assertEquals(afterBatchCount, 1L, "Batch count should be exactly 1") + + // Verify that the fiber count includes all tasks (singleton + batch) + val afterFiberCount = queue.getTotalFiberCount() + assertEquals( + afterFiberCount, + 1L + batchSize.toLong, + "Fiber count should include singleton and batch tasks") + } + + /** + * Tests that the ScalQueue metrics correctly track polling operations. + */ + test("ScalQueue metrics track polling operations") { + // Create a queue with 4 stripes + val queue = new ScalQueue(4) + val random = ThreadLocalRandom.current() + + // Add a singleton task + queue.offer(() => (), random) + + // Test striping by adding several more singleton tasks + var j = 0 + while (j < 4) { + queue.offer(() => (), random) + j += 1 + } + + // Create a batch of 10 no-op tasks + val batchSize = 10 + val batch = new Array[Runnable](batchSize) + var i = 0 + while (i < batchSize) { + batch(i) = () => () + i += 1 + } + + // Add the batch to the queue + queue.offerBatch(batch, random) + + // Poll some tasks to verify they can be retrieved + var polledCount = 0 + var element: AnyRef = null + + i = 0 + while (i < 10) { + element = queue.poll(random) + if (element ne null) { + polledCount += 1 + } + i += 1 + } + + // Verify we were able to poll at least one task + assertEquals(polledCount, 6, "Should have polled exactly 6 items (5 singletons + 1 batch)") + + // Check current in-queue metrics + val currentSingletonCount = queue.getSingletonCount() + val currentBatchCount = queue.getBatchCount() + val currentFiberCount = queue.getFiberCount() + + // Note: Some tasks may have been polled already + assert( + currentSingletonCount <= 5, + "Current singleton count should not exceed total singleton submissions") + assert( + currentBatchCount <= 1, + "Current batch count should not exceed total batch submissions") + assert( + currentFiberCount <= 15, + "Current fiber count should not exceed total fiber submissions") + } + + /** + * Tests that the ScalQueue metrics correctly track queue draining. + */ + test("ScalQueue metrics track queue draining") { + // Create a queue with 4 stripes + val queue = new ScalQueue(4) + val random = ThreadLocalRandom.current() + + // Add a singleton task + queue.offer(() => (), random) + + // Test striping by adding several more singleton tasks + var j = 0 + while (j < 4) { + queue.offer(() => (), random) + j += 1 + } + + // Create a batch of 10 no-op tasks + val batchSize = 10 + val batch = new Array[Runnable](batchSize) + var i = 0 + while (i < batchSize) { + batch(i) = () => () + i += 1 + } + + // Add the batch to the queue + queue.offerBatch(batch, random) + + // Drain the queue completely + var element: AnyRef = queue.poll(random) + while (element ne null) { + + // Get the next element + element = queue.poll(random) + } + + // Verify the queue is now empty + assert(queue.isEmpty(), "Queue should be empty after draining") + + // Verify present counts are now zero + assertEquals( + queue.getSingletonCount(), + 0L, + "Singleton present count should be zero after draining") + assertEquals(queue.getBatchCount(), 0L, "Batch present count should be zero after draining") + assertEquals(queue.getFiberCount(), 0L, "Fiber present count should be zero after draining") + } +}