Skip to content

Implement PollerMetrics for KqueueSystem #4336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 90 additions & 6 deletions core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object KqueueSystem extends PollingSystem {

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop
def metrics(poller: Poller): PollerMetrics = poller.metrics()

private final class FileDescriptorPollerImpl private[KqueueSystem] (
ctx: PollingContext[Poller]
Expand Down Expand Up @@ -138,7 +138,7 @@ object KqueueSystem extends PollingSystem {

}

final class Poller private[KqueueSystem] (kqfd: Int) {
final class Poller private[KqueueSystem] (kqfd: Int) extends PollerMetrics {

private[this] val buffer = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents)
@inline private[this] def eventlist =
Expand All @@ -161,13 +161,94 @@ object KqueueSystem extends PollingSystem {
event.flags = (flags.toInt | EV_ONESHOT).toUShort

callbacks.update(encodeKevent(ident, filter), cb)
incrementOperationCount(filter)

changeCount += 1
}

private[this] var totalReadSubmitted = 0L
private[this] var totalReadSucceeded = 0L
private[this] var totalReadErrored = 0L
private[this] var totalReadCanceled = 0L
private[this] var readOutstanding = 0

private[this] var totalWriteSubmitted = 0L
private[this] var totalWriteSucceeded = 0L
private[this] var totalWriteErrored = 0L
private[this] var totalWriteCanceled = 0L
private[this] var writeOutstanding = 0

override def operationsOutstandingCount(): Int = readOutstanding + writeOutstanding
override def totalOperationsSubmittedCount(): Long =
totalReadSubmitted + totalWriteSubmitted
override def totalOperationsSucceededCount(): Long =
totalReadSucceeded + totalWriteSucceeded
override def totalOperationsErroredCount(): Long = totalReadErrored + totalWriteErrored
override def totalOperationsCanceledCount(): Long = totalReadCanceled + totalWriteCanceled
override def acceptOperationsOutstandingCount(): Int = 0
override def totalAcceptOperationsSubmittedCount(): Long = 0L
override def totalAcceptOperationsSucceededCount(): Long = 0L
override def totalAcceptOperationsErroredCount(): Long = 0L
override def totalAcceptOperationsCanceledCount(): Long = 0L
override def connectOperationsOutstandingCount(): Int = 0
override def totalConnectOperationsSubmittedCount(): Long = 0L
override def totalConnectOperationsSucceededCount(): Long = 0L
override def totalConnectOperationsErroredCount(): Long = 0L
override def totalConnectOperationsCanceledCount(): Long = 0L
override def readOperationsOutstandingCount(): Int = readOutstanding
override def totalReadOperationsSubmittedCount(): Long = totalReadSubmitted
override def totalReadOperationsSucceededCount(): Long = totalReadSucceeded
override def totalReadOperationsErroredCount(): Long = totalReadErrored
override def totalReadOperationsCanceledCount(): Long = totalReadCanceled
override def writeOperationsOutstandingCount(): Int = writeOutstanding
override def totalWriteOperationsSubmittedCount(): Long = totalWriteSubmitted
override def totalWriteOperationsSucceededCount(): Long = totalWriteSucceeded
override def totalWriteOperationsErroredCount(): Long = totalWriteErrored
override def totalWriteOperationsCanceledCount(): Long = totalWriteCanceled

override def toString: String = "Kqueue"

private[KqueueSystem] def metrics(): PollerMetrics = this

private[this] def incrementOperationCount(filter: Short): Unit = {
if (filter == EVFILT_READ) {
totalReadSubmitted += 1
readOutstanding += 1
}

if (filter == EVFILT_WRITE) {
totalWriteSubmitted += 1
writeOutstanding += 1
}
}

private[this] def handleOperationCompletion(filter: Short, succeeded: Boolean): Unit = {
if (filter == EVFILT_READ) {
readOutstanding -= 1
if (succeeded) totalReadSucceeded += 1 else totalReadErrored += 1
}

if (filter == EVFILT_WRITE) {
writeOutstanding -= 1
if (succeeded) totalWriteSucceeded += 1 else totalWriteErrored += 1
}
}

private[this] def handleOperationCanceled(filter: Short): Unit = {
if (filter == EVFILT_READ) {
totalReadCanceled += 1
readOutstanding -= 1
}

if (filter == EVFILT_WRITE) {
totalWriteCanceled += 1
writeOutstanding -= 1
}
}

private[KqueueSystem] def removeCallback(ident: Int, filter: Short): Unit = {
callbacks -= encodeKevent(ident, filter)
()
handleOperationCanceled(filter)
}

private[KqueueSystem] def close(): Unit =
Expand Down Expand Up @@ -218,12 +299,15 @@ object KqueueSystem extends PollingSystem {
val cb = callbacks.getOrNull(kevent)
callbacks -= kevent

if (cb ne null)
if (cb ne null) {
val succeeded = (event.flags.toLong & EV_ERROR) == 0
handleOperationCompletion(event.filter, succeeded)
cb(
if ((event.flags.toLong & EV_ERROR) != 0)
if (succeeded) Either.unit
else
Left(new IOException(fromCString(strerror(event.data.toInt))))
else Either.unit
)
}

i += 1
event += 1
Expand Down
Loading