11import { createLogStream , SerializeStream , parseLog , formatLog } from './log.js'
2- import { Transform } from 'node:stream'
2+ import { Transform , PassThrough } from 'node:stream'
33import { writeClient } from './telemetry.js'
44import { Point } from '@influxdata/influxdb-client'
55import fs from 'node:fs/promises'
@@ -8,38 +8,59 @@ import { paths } from './paths.js'
88import { Tail } from 'tail'
99import { maybeCreateFile } from './util.js'
1010import { platform } from 'node:os'
11+ import assert from 'node:assert'
12+ import { join } from 'node:path'
1113
1214// Metrics stream
1315// - Filters duplicate entries
1416// - Writes `jobs-completed` to InfluxDB
1517// - Serializes JSON
16- export const createMetricsStream = metricsPath => {
18+ // - Persists to
19+ // - module specific metrics file
20+ // - merged metrics file
21+ export const createMetricsStream = async moduleName => {
22+ assert ( moduleName , 'moduleName required' )
1723 const deduplicateStream = new DeduplicateStream ( )
24+ const splitStream = new PassThrough ( { objectMode : true } )
1825 deduplicateStream
19- . pipe ( new TelemetryStream ( ) )
26+ . pipe ( new TelemetryStream ( moduleName ) )
27+ . pipe ( splitStream )
28+ splitStream
2029 . pipe ( new SerializeStream ( ) )
21- . pipe ( createLogStream ( metricsPath ) )
30+ . pipe ( createLogStream ( join ( paths . metrics , `${ moduleName } .log` ) ) )
31+ splitStream
32+ . pipe ( new MergeMetricsStream ( ...await Promise . all ( [
33+ getLatestMetrics ( ) ,
34+ getLatestMetrics ( moduleName )
35+ ] ) ) )
36+ . pipe ( new SerializeStream ( ) )
37+ . pipe ( createLogStream ( paths . allMetrics ) )
2238 return deduplicateStream
2339}
2440
25- export const maybeCreateMetricsFile = async ( ) => {
41+ export const maybeCreateMetricsFile = async ( moduleName ) => {
2642 await maybeCreateFile (
27- paths . metrics ,
43+ getMetricsFilePath ( moduleName ) ,
2844 formatLog (
2945 JSON . stringify ( { totalJobsCompleted : 0 , totalEarnings : '0' } ) + '\n'
3046 )
3147 )
3248}
3349
3450const metricsLogLineToObject = metrics => JSON . parse ( parseLog ( metrics ) . text )
51+ const getMetricsFilePath = moduleName => {
52+ return moduleName
53+ ? join ( paths . metrics , `${ moduleName } .log` )
54+ : paths . allMetrics
55+ }
3556
36- export const getLatestMetrics = async ( ) => {
37- const metrics = await fs . readFile ( paths . metrics , 'utf-8' )
57+ export const getLatestMetrics = async ( moduleName ) => {
58+ const metrics = await fs . readFile ( getMetricsFilePath ( moduleName ) , 'utf-8' )
3859 return metricsLogLineToObject ( metrics . trim ( ) . split ( '\n' ) . pop ( ) )
3960}
4061
41- export const followMetrics = async function * ( { signal } = { } ) {
42- const tail = new Tail ( paths . metrics , {
62+ export const followMetrics = async function * ( { moduleName , signal } = { } ) {
63+ const tail = new Tail ( getMetricsFilePath ( moduleName ) , {
4364 nLines : 1 ,
4465 useWatchFile : platform ( ) === 'win32'
4566 } )
@@ -69,16 +90,18 @@ class DeduplicateStream extends Transform {
6990}
7091
7192class TelemetryStream extends Transform {
72- constructor ( ) {
93+ constructor ( moduleName ) {
94+ assert ( moduleName , 'moduleName required' )
7395 super ( { objectMode : true } )
7496 this . lastMetrics = null
97+ this . moduleName = moduleName
7598 }
7699
77100 _transform ( metrics , _ , callback ) {
78101 if ( typeof this . lastMetrics ?. totalJobsCompleted === 'number' ) {
79102 writeClient . writePoint (
80103 new Point ( 'jobs-completed' )
81- . stringField ( 'module' , 'saturn' )
104+ . stringField ( 'module' , this . moduleName )
82105 . intField (
83106 'value' ,
84107 metrics . totalJobsCompleted - this . lastMetrics . totalJobsCompleted
@@ -89,3 +112,28 @@ class TelemetryStream extends Transform {
89112 callback ( null , metrics )
90113 }
91114}
115+
116+ class MergeMetricsStream extends Transform {
117+ constructor ( allMetrics , moduleMetrics ) {
118+ assert . strictEqual (
119+ typeof allMetrics ?. totalJobsCompleted ,
120+ 'number' ,
121+ 'allMetrics.totalJobsCompleted required'
122+ )
123+ assert . strictEqual (
124+ typeof moduleMetrics ?. totalJobsCompleted ,
125+ 'number' ,
126+ 'moduleMetrics.totalJobsCompleted required'
127+ )
128+ super ( { objectMode : true } )
129+ this . allMetrics = allMetrics
130+ this . moduleMetrics = moduleMetrics
131+ }
132+
133+ _transform ( metrics , _ , callback ) {
134+ const diff = metrics . totalJobsCompleted - this . moduleMetrics . totalJobsCompleted
135+ this . allMetrics . totalJobsCompleted += diff
136+ this . moduleMetrics . totalJobsCompleted = metrics . totalJobsCompleted
137+ callback ( null , this . allMetrics )
138+ }
139+ }
0 commit comments