diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 5b835f590c..d87227211c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -69,7 +69,7 @@ object KqueueSystem extends PollingSystem { def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () - def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop + def metrics(poller: Poller): PollerMetrics = poller.metrics() private final class FileDescriptorPollerImpl private[KqueueSystem] ( ctx: PollingContext[Poller] @@ -138,7 +138,7 @@ object KqueueSystem extends PollingSystem { } - final class Poller private[KqueueSystem] (kqfd: Int) { + final class Poller private[KqueueSystem] (kqfd: Int) extends PollerMetrics { private[this] val buffer = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents) @inline private[this] def eventlist = @@ -161,13 +161,94 @@ object KqueueSystem extends PollingSystem { event.flags = (flags.toInt | EV_ONESHOT).toUShort callbacks.update(encodeKevent(ident, filter), cb) + incrementOperationCount(filter) changeCount += 1 } + private[this] var totalReadSubmitted = 0L + private[this] var totalReadSucceeded = 0L + private[this] var totalReadErrored = 0L + private[this] var totalReadCanceled = 0L + private[this] var readOutstanding = 0 + + private[this] var totalWriteSubmitted = 0L + private[this] var totalWriteSucceeded = 0L + private[this] var totalWriteErrored = 0L + private[this] var totalWriteCanceled = 0L + private[this] var writeOutstanding = 0 + + override def operationsOutstandingCount(): Int = readOutstanding + writeOutstanding + override def totalOperationsSubmittedCount(): Long = + totalReadSubmitted + totalWriteSubmitted + override def totalOperationsSucceededCount(): Long = + totalReadSucceeded + totalWriteSucceeded + override def totalOperationsErroredCount(): Long = totalReadErrored + totalWriteErrored + override def totalOperationsCanceledCount(): Long = totalReadCanceled + totalWriteCanceled + override def acceptOperationsOutstandingCount(): Int = 0 + override def totalAcceptOperationsSubmittedCount(): Long = 0L + override def totalAcceptOperationsSucceededCount(): Long = 0L + override def totalAcceptOperationsErroredCount(): Long = 0L + override def totalAcceptOperationsCanceledCount(): Long = 0L + override def connectOperationsOutstandingCount(): Int = 0 + override def totalConnectOperationsSubmittedCount(): Long = 0L + override def totalConnectOperationsSucceededCount(): Long = 0L + override def totalConnectOperationsErroredCount(): Long = 0L + override def totalConnectOperationsCanceledCount(): Long = 0L + override def readOperationsOutstandingCount(): Int = readOutstanding + override def totalReadOperationsSubmittedCount(): Long = totalReadSubmitted + override def totalReadOperationsSucceededCount(): Long = totalReadSucceeded + override def totalReadOperationsErroredCount(): Long = totalReadErrored + override def totalReadOperationsCanceledCount(): Long = totalReadCanceled + override def writeOperationsOutstandingCount(): Int = writeOutstanding + override def totalWriteOperationsSubmittedCount(): Long = totalWriteSubmitted + override def totalWriteOperationsSucceededCount(): Long = totalWriteSucceeded + override def totalWriteOperationsErroredCount(): Long = totalWriteErrored + override def totalWriteOperationsCanceledCount(): Long = totalWriteCanceled + + override def toString: String = "Kqueue" + + private[KqueueSystem] def metrics(): PollerMetrics = this + + private[this] def incrementOperationCount(filter: Short): Unit = { + if (filter == EVFILT_READ) { + totalReadSubmitted += 1 + readOutstanding += 1 + } + + if (filter == EVFILT_WRITE) { + totalWriteSubmitted += 1 + writeOutstanding += 1 + } + } + + private[this] def handleOperationCompletion(filter: Short, succeeded: Boolean): Unit = { + if (filter == EVFILT_READ) { + readOutstanding -= 1 + if (succeeded) totalReadSucceeded += 1 else totalReadErrored += 1 + } + + if (filter == EVFILT_WRITE) { + writeOutstanding -= 1 + if (succeeded) totalWriteSucceeded += 1 else totalWriteErrored += 1 + } + } + + private[this] def handleOperationCanceled(filter: Short): Unit = { + if (filter == EVFILT_READ) { + totalReadCanceled += 1 + readOutstanding -= 1 + } + + if (filter == EVFILT_WRITE) { + totalWriteCanceled += 1 + writeOutstanding -= 1 + } + } + private[KqueueSystem] def removeCallback(ident: Int, filter: Short): Unit = { callbacks -= encodeKevent(ident, filter) - () + handleOperationCanceled(filter) } private[KqueueSystem] def close(): Unit = @@ -218,12 +299,15 @@ object KqueueSystem extends PollingSystem { val cb = callbacks.getOrNull(kevent) callbacks -= kevent - if (cb ne null) + if (cb ne null) { + val succeeded = (event.flags.toLong & EV_ERROR) == 0 + handleOperationCompletion(event.filter, succeeded) cb( - if ((event.flags.toLong & EV_ERROR) != 0) + if (succeeded) Either.unit + else Left(new IOException(fromCString(strerror(event.data.toInt)))) - else Either.unit ) + } i += 1 event += 1