Skip to content

Commit 7a04b11

Browse files
✨ Add support for finding processes by their name/the command that was started (#659)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add support for finding processes by their name or the command that was started. This achieved similar functionality to `ps -aux | grep <name>`. Currently only the linux implementation is provided, the other implementations will be created in other PRs. ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent e0899e0 commit 7a04b11

File tree

5 files changed

+357
-0
lines changed

5 files changed

+357
-0
lines changed

changes/20250729164750.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: Add support for finding processes by their name/the command that was started

utils/parallelisation/parallelisation.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"go.uber.org/atomic"
15+
"golang.org/x/sync/errgroup"
1516

1617
"github.com/ARM-software/golang-utils/utils/commonerrors"
1718
)
@@ -267,3 +268,58 @@ func WaitUntil(ctx context.Context, evalCondition func(ctx2 context.Context) (bo
267268
SleepWithContext(ctx, pauseBetweenEvaluations)
268269
}
269270
}
271+
272+
func newWorker[JobType, ResultType any](ctx context.Context, f func(context.Context, JobType) (ResultType, bool, error), jobs chan JobType, results chan ResultType) (err error) {
273+
for job := range jobs {
274+
result, ok, subErr := f(ctx, job)
275+
if subErr != nil {
276+
err = commonerrors.WrapError(commonerrors.ErrUnexpected, subErr, "an error occurred whilst handling a job")
277+
return
278+
}
279+
280+
err = DetermineContextError(ctx)
281+
if err != nil {
282+
return
283+
}
284+
285+
if ok {
286+
results <- result
287+
}
288+
}
289+
290+
return
291+
}
292+
293+
// WorkerPool parallelises an action using a worker pool of the size provided by numWorkers and retrieves all the results when all the actions have completed. It is similar to Parallelise but it uses generics instead of reflection and allows you to control the pool size
294+
func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int, jobs []InputType, f func(context.Context, InputType) (ResultType, bool, error)) (results []ResultType, err error) {
295+
if numWorkers < 1 {
296+
err = commonerrors.New(commonerrors.ErrInvalid, "numWorkers must be greater than or equal to 1")
297+
return
298+
}
299+
300+
numJobs := len(jobs)
301+
jobsChan := make(chan InputType, numJobs)
302+
resultsChan := make(chan ResultType, numJobs)
303+
304+
g, gCtx := errgroup.WithContext(ctx)
305+
g.SetLimit(numWorkers)
306+
for range numWorkers {
307+
g.Go(func() error { return newWorker(gCtx, f, jobsChan, resultsChan) })
308+
}
309+
for _, job := range jobs {
310+
jobsChan <- job
311+
}
312+
313+
close(jobsChan)
314+
err = g.Wait()
315+
close(resultsChan)
316+
if err != nil {
317+
return
318+
}
319+
320+
for result := range resultsChan {
321+
results = append(results, result)
322+
}
323+
324+
return
325+
}

utils/parallelisation/parallelisation_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,3 +463,102 @@ func TestWaitUntil(t *testing.T) {
463463
errortest.AssertError(t, err, commonerrors.ErrUnexpected)
464464
})
465465
}
466+
467+
func TestWorkerPool(t *testing.T) {
468+
for _, test := range []struct {
469+
name string
470+
numWorkers int
471+
jobs []int
472+
results []int
473+
workerFunc func(context.Context, int) (int, bool, error)
474+
err error
475+
}{
476+
{
477+
name: "Success",
478+
numWorkers: 3,
479+
jobs: []int{1, 2, 3, 4, 5},
480+
results: []int{2, 4, 6, 8, 10},
481+
workerFunc: func(ctx context.Context, job int) (int, bool, error) {
482+
return job * 2, true, nil
483+
},
484+
err: nil,
485+
},
486+
{
487+
name: "Invalid Num Workers",
488+
numWorkers: 0,
489+
jobs: []int{1, 2, 3},
490+
results: nil,
491+
workerFunc: func(ctx context.Context, job int) (int, bool, error) {
492+
return 0, true, nil
493+
},
494+
err: commonerrors.ErrInvalid,
495+
},
496+
{
497+
name: "Worker Returns Error",
498+
numWorkers: 2,
499+
jobs: []int{1, 2, 3},
500+
results: nil,
501+
workerFunc: func(ctx context.Context, job int) (int, bool, error) {
502+
if job == 2 {
503+
return 0, false, errors.New("fail")
504+
}
505+
return job, true, nil
506+
},
507+
err: commonerrors.ErrUnexpected,
508+
},
509+
{
510+
name: "Some ok False",
511+
numWorkers: 1,
512+
jobs: []int{1, 2, 3},
513+
results: []int{1, 3},
514+
workerFunc: func(ctx context.Context, job int) (int, bool, error) {
515+
return job, job != 2, nil
516+
},
517+
err: nil,
518+
},
519+
{
520+
name: "All ok False",
521+
numWorkers: 1,
522+
jobs: []int{1, 2, 3},
523+
results: []int{},
524+
workerFunc: func(ctx context.Context, job int) (int, bool, error) {
525+
return job, false, nil
526+
},
527+
err: nil,
528+
},
529+
{
530+
name: "Empty Jobs",
531+
numWorkers: 2,
532+
jobs: []int{},
533+
results: []int{},
534+
workerFunc: func(ctx context.Context, job int) (int, bool, error) {
535+
return job, true, nil
536+
},
537+
err: nil,
538+
},
539+
} {
540+
t.Run(test.name, func(t *testing.T) {
541+
ctx := context.Background()
542+
543+
results, err := WorkerPool(ctx, test.numWorkers, test.jobs, test.workerFunc)
544+
545+
if test.err != nil {
546+
errortest.AssertError(t, err, test.err)
547+
} else {
548+
require.NoError(t, err)
549+
assert.ElementsMatch(t, test.results, results)
550+
}
551+
})
552+
}
553+
554+
t.Run("Context cancelled", func(t *testing.T) {
555+
ctx, cancel := context.WithCancel(context.Background())
556+
cancel()
557+
558+
_, err := WorkerPool(ctx, 100, []int{1, 2, 3}, func(ctx context.Context, job int) (int, bool, error) {
559+
return job, true, nil
560+
})
561+
562+
errortest.AssertError(t, err, commonerrors.ErrCancelled)
563+
})
564+
}

utils/proc/find/find_linux.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//go:build linux
2+
3+
package find
4+
5+
import (
6+
"bytes"
7+
"context"
8+
"fmt"
9+
"regexp"
10+
"strconv"
11+
"strings"
12+
13+
"github.com/ARM-software/golang-utils/utils/commonerrors"
14+
"github.com/ARM-software/golang-utils/utils/filesystem"
15+
"github.com/ARM-software/golang-utils/utils/parallelisation"
16+
"github.com/ARM-software/golang-utils/utils/proc"
17+
)
18+
19+
const (
20+
procFS = "/proc"
21+
procDataFile = "cmdline"
22+
)
23+
24+
func checkProcessMatch(ctx context.Context, fs filesystem.FS, re *regexp.Regexp, procEntry string) (ok bool, err error) {
25+
err = parallelisation.DetermineContextError(ctx)
26+
if err != nil {
27+
return
28+
}
29+
30+
data, err := fs.ReadFile(procEntry)
31+
if err != nil {
32+
if commonerrors.CorrespondTo(err, "no bytes were read") {
33+
err = nil
34+
return // ignore special descriptors since our cmdline will have content (we still have to check since all files in proc have size zero)
35+
}
36+
err = commonerrors.WrapErrorf(commonerrors.ErrUnexpected, err, "could not read proc entry '%v'", procEntry)
37+
return
38+
}
39+
40+
data = bytes.ReplaceAll(data, []byte{0}, []byte{' '}) // https://man7.org/linux/man-pages/man5/proc_pid_cmdline.5.html
41+
42+
ok = re.Match(data)
43+
return
44+
}
45+
46+
func parseProcess(ctx context.Context, entry string) (p proc.IProcess, err error) {
47+
err = parallelisation.DetermineContextError(ctx)
48+
if err != nil {
49+
return
50+
}
51+
52+
pid, err := strconv.Atoi(strings.Trim(strings.TrimSuffix(strings.TrimPrefix(entry, procFS), fmt.Sprintf("%v", procDataFile)), "/"))
53+
if err != nil {
54+
err = commonerrors.WrapErrorf(commonerrors.ErrUnexpected, err, "could not parse PID from proc path '%v'", entry)
55+
return
56+
}
57+
58+
p, err = proc.FindProcess(ctx, pid)
59+
if err != nil {
60+
err = commonerrors.WrapErrorf(commonerrors.ErrUnexpected, err, "could not find process '%v'", pid)
61+
return
62+
}
63+
64+
return
65+
}
66+
67+
// FindProcessByRegexForFS will search a given filesystem for the processes that match a specific regex
68+
func FindProcessByRegexForFS(ctx context.Context, fs filesystem.FS, re *regexp.Regexp) (processes []proc.IProcess, err error) {
69+
if !filesystem.Exists(procFS) {
70+
err = commonerrors.Newf(commonerrors.ErrNotFound, "the proc filesystem was not found at '%v'", procFS)
71+
return
72+
}
73+
err = parallelisation.DetermineContextError(ctx)
74+
if err != nil {
75+
return
76+
}
77+
78+
searchGlobTerm := fmt.Sprintf("%v/*/%v", procFS, procDataFile)
79+
procEntries, err := fs.Glob(searchGlobTerm)
80+
if err != nil {
81+
err = commonerrors.WrapErrorf(commonerrors.ErrUnexpected, err, "an error occurred when searching for processes using the following glob '%v'", searchGlobTerm)
82+
return
83+
}
84+
85+
processes, err = parallelisation.WorkerPool(ctx, 10, procEntries, func(ctx context.Context, entry string) (p proc.IProcess, matches bool, err error) {
86+
matches, err = checkProcessMatch(ctx, fs, re, entry)
87+
if err != nil || !matches {
88+
return
89+
}
90+
91+
p, err = parseProcess(ctx, entry)
92+
if err != nil {
93+
return
94+
}
95+
96+
matches = true
97+
return
98+
})
99+
100+
return
101+
}
102+
103+
// FindProcessByRegex will search for the processes that match a specific regex
104+
func FindProcessByRegex(ctx context.Context, re *regexp.Regexp) (processes []proc.IProcess, err error) {
105+
return FindProcessByRegexForFS(ctx, filesystem.GetGlobalFileSystem(), re)
106+
}
107+
108+
// FindProcessByName will search for the processes that match a specific name
109+
func FindProcessByName(ctx context.Context, name string) (processes []proc.IProcess, err error) {
110+
return FindProcessByRegex(ctx, regexp.MustCompile(fmt.Sprintf(".*%v.*", regexp.QuoteMeta(name))))
111+
}

utils/proc/find/find_linux_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//go:build linux
2+
3+
package find
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"testing"
9+
10+
"github.com/go-faker/faker/v4"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/ARM-software/golang-utils/utils/commonerrors"
15+
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
16+
"github.com/ARM-software/golang-utils/utils/logs"
17+
"github.com/ARM-software/golang-utils/utils/logs/logstest"
18+
"github.com/ARM-software/golang-utils/utils/subprocess"
19+
)
20+
21+
func TestFind(t *testing.T) {
22+
for _, test := range []struct {
23+
name string
24+
processes int
25+
}{
26+
{
27+
name: "One process",
28+
processes: 1,
29+
},
30+
{
31+
name: "Many processes",
32+
processes: 10,
33+
},
34+
{
35+
name: "No process",
36+
processes: 0,
37+
},
38+
} {
39+
t.Run(test.name, func(t *testing.T) {
40+
ctx, cancel := context.WithCancel(context.Background())
41+
defer cancel()
42+
43+
processString := faker.Sentence()
44+
for range test.processes {
45+
l, err := logs.NewLogrLogger(logstest.NewStdTestLogger(), test.name)
46+
require.NoError(t, err)
47+
48+
cmd, err := subprocess.New(ctx, l, "start", "success", "failed", "sh", "-c", fmt.Sprintf("sleep 10 ; echo '%v'", processString))
49+
require.NoError(t, err)
50+
51+
err = cmd.Start()
52+
require.NoError(t, err)
53+
}
54+
55+
processes, err := FindProcessByName(ctx, processString)
56+
assert.NoError(t, err)
57+
assert.Len(t, processes, test.processes)
58+
59+
// stopping processes shows they were parsed correctly
60+
for _, process := range processes {
61+
err = process.Terminate(ctx)
62+
require.NoError(t, err)
63+
}
64+
processes, err = FindProcessByName(ctx, processString)
65+
require.NoError(t, err)
66+
assert.Empty(t, processes)
67+
})
68+
}
69+
70+
t.Run("Cancel context", func(t *testing.T) {
71+
ctx, cancel := context.WithCancel(context.Background())
72+
73+
processString := faker.Sentence()
74+
75+
l, err := logs.NewLogrLogger(logstest.NewStdTestLogger(), "context cancelled")
76+
require.NoError(t, err)
77+
78+
cmd, err := subprocess.New(ctx, l, "start", "success", "failed", "sh", "-c", fmt.Sprintf("sleep 10 ; echo '%v'", processString))
79+
require.NoError(t, err)
80+
81+
err = cmd.Start()
82+
require.NoError(t, err)
83+
cancel()
84+
85+
processes, err := FindProcessByName(ctx, processString)
86+
errortest.AssertError(t, err, commonerrors.ErrCancelled)
87+
assert.Empty(t, processes)
88+
})
89+
90+
}

0 commit comments

Comments
 (0)