-
Notifications
You must be signed in to change notification settings - Fork 140
Create separate worker usage data collection and move hardware emit there #1293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
f02ecae
e57ce8d
e8abca3
51cb988
4d28be1
ffd2d75
21fe267
51f7207
e304034
24b4a84
a89107f
a9c526f
fa1e190
3628eb9
96e4267
5092606
1a52b18
7f8a165
d0dac1c
64cddbc
cde3ba4
822564b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"github.com/shirou/gopsutil/cpu" | ||
"github.com/uber-go/tally" | ||
"go.uber.org/cadence/internal/common/metrics" | ||
"go.uber.org/zap" | ||
"runtime" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type ( | ||
workerUsageCollector struct { | ||
workerType string | ||
cooldownTime time.Duration | ||
logger *zap.Logger | ||
ctx context.Context | ||
wg *sync.WaitGroup // graceful stop | ||
cancel context.CancelFunc | ||
metricsScope tally.Scope | ||
} | ||
|
||
workerUsageCollectorOptions struct { | ||
Enabled bool | ||
Cooldown time.Duration | ||
MetricsScope tally.Scope | ||
WorkerType string | ||
} | ||
|
||
hardwareUsage struct { | ||
NumCPUCores int | ||
CPUPercent float64 | ||
NumGoRoutines int | ||
TotalMemory float64 | ||
MemoryUsedHeap float64 | ||
MemoryUsedStack float64 | ||
} | ||
) | ||
|
||
func newWorkerUsageCollector( | ||
options workerUsageCollectorOptions, | ||
logger *zap.Logger, | ||
) *workerUsageCollector { | ||
if !options.Enabled { | ||
return nil | ||
} | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
return &workerUsageCollector{ | ||
workerType: options.WorkerType, | ||
cooldownTime: options.Cooldown, | ||
metricsScope: options.MetricsScope, | ||
logger: logger, | ||
ctx: ctx, | ||
cancel: cancel, | ||
wg: &sync.WaitGroup{}, | ||
} | ||
} | ||
|
||
func (w *workerUsageCollector) Start() { | ||
w.wg.Add(1) | ||
go func() { | ||
|
||
defer func() { | ||
if p := recover(); p != nil { | ||
w.logger.Error("Unhandled panic in workerUsageCollector.") | ||
w.logger.Error(p.(error).Error()) | ||
timl3136 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
}() | ||
defer w.wg.Done() | ||
|
||
ticker := time.NewTicker(w.cooldownTime) | ||
timl3136 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
for { | ||
select { | ||
case <-w.ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
// Given that decision worker and activity worker are running in the same host, we only need to collect | ||
// hardware usage from one of them. | ||
if w.workerType == "DecisionWorker" { | ||
|
||
hardwareUsageData := w.collectHardwareUsage() | ||
if w.metricsScope != nil { | ||
w.emitHardwareUsage(hardwareUsageData) | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
return | ||
} | ||
|
||
func (w *workerUsageCollector) Stop() { | ||
w.cancel() | ||
w.wg.Wait() | ||
} | ||
|
||
func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { | ||
cpuPercent, err := cpu.Percent(0, false) | ||
if err != nil { | ||
w.logger.Warn("Failed to get cpu percent", zap.Error(err)) | ||
} | ||
cpuCores, err := cpu.Counts(false) | ||
if err != nil { | ||
w.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) | ||
} | ||
|
||
var memStats runtime.MemStats | ||
runtime.ReadMemStats(&memStats) | ||
return hardwareUsage{ | ||
NumCPUCores: cpuCores, | ||
CPUPercent: cpuPercent[0], | ||
NumGoRoutines: runtime.NumGoroutine(), | ||
TotalMemory: float64(memStats.Sys), | ||
MemoryUsedHeap: float64(memStats.HeapAlloc), | ||
MemoryUsedStack: float64(memStats.StackInuse), | ||
} | ||
} | ||
|
||
// emitHardwareUsage emits collected hardware usage metrics to metrics scope | ||
func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { | ||
w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) | ||
w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) | ||
w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) | ||
w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) | ||
w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) | ||
w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.