Skip to content

Commit b48d701

Browse files
implementing storage cleaningsystem with options
Signed-off-by: greg pereira <[email protected]>
1 parent 753273d commit b48d701

File tree

3 files changed

+133
-11
lines changed

3 files changed

+133
-11
lines changed

.github/workflows/test-at.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name: Check for at command
2+
3+
on: [push, pull_request]
4+
5+
jobs:
6+
check-at-command:
7+
runs-on: ubuntu-latest
8+
steps:
9+
- name: Check for at command
10+
run: dpkg-query -l at

worker/cmd/generate.go

Lines changed: 121 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"os/signal"
1717
"path"
1818
"path/filepath"
19+
"strconv"
1920
"strings"
2021
"sync"
2122
"syscall"
@@ -52,18 +53,29 @@ var (
5253
TlsInsecure bool
5354
MaxSeed int
5455
TaxonomyFolders = []string{"compositional_skills", "knowledge"}
56+
CleanupStrategy string
5557
)
5658

59+
type CleanupStrategyType string
60+
type DiskPressureEvent int
61+
5762
const (
58-
gitMaxRetries = 5
59-
gitRetryDelay = 2 * time.Second
60-
ilabConfigPath = "config.yaml"
61-
localEndpoint = "http://localhost:8000/v1"
62-
jobSDG = "sdg-svc"
63-
jobGenerateLocal = "generate"
64-
jobPreCheck = "precheck"
65-
sdgModel = "mistralai/mixtral-8x7b-instruct-v0-1"
66-
jsonViewerFilenameSuffix = "-viewer.html"
63+
gitMaxRetries = 5
64+
gitRetryDelay = 2 * time.Second
65+
ilabConfigPath = "config.yaml"
66+
localEndpoint = "http://localhost:8000/v1"
67+
jobSDG = "sdg-svc"
68+
jobGenerateLocal = "generate"
69+
jobPreCheck = "precheck"
70+
sdgModel = "mistralai/mixtral-8x7b-instruct-v0-1"
71+
jsonViewerFilenameSuffix = "-viewer.html"
72+
lazyStategyHours = 72
73+
lazy CleanupStrategyType = "lazy"
74+
immediate CleanupStrategyType = "immediate"
75+
eventBased CleanupStrategyType = "event-based"
76+
lowDiskPressureEvent = iota
77+
mediumDiskPressureEvent
78+
highDiskPressureEvent
6779
)
6880

6981
// Worker encapsulates dependencies and methods to process jobs
@@ -80,9 +92,10 @@ type Worker struct {
8092
tlsClientKeyPath string
8193
tlsServerCaCertPath string
8294
maxSeed int
95+
cleanupStrategy CleanupStrategyType
8396
}
8497

85-
func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logger *zap.SugaredLogger, job, precheckEndpoint, sdgEndpoint, tlsClientCertPath, tlsClientKeyPath, tlsServerCaCertPath string, maxSeed int) *Worker {
98+
func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logger *zap.SugaredLogger, job, precheckEndpoint, sdgEndpoint, tlsClientCertPath, tlsClientKeyPath, tlsServerCaCertPath string, maxSeed int, cleanupStrategy string) *Worker {
8699
return &Worker{
87100
ctx: ctx,
88101
pool: pool,
@@ -96,6 +109,7 @@ func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logg
96109
tlsClientKeyPath: tlsClientKeyPath,
97110
tlsServerCaCertPath: tlsServerCaCertPath,
98111
maxSeed: maxSeed,
112+
cleanupStrategy: CleanupStrategyType(cleanupStrategy),
99113
}
100114
}
101115

@@ -122,6 +136,7 @@ func init() {
122136
generateCmd.Flags().StringVarP(&TlsServerCaCertPath, "tls-server-ca-cert", "", "server-ca-crt.pem2", "Path to the TLS server CA certificate. Defaults to 'server-ca-crt.pem2'")
123137
generateCmd.Flags().BoolVarP(&TlsInsecure, "tls-insecure", "", false, "Whether to skip TLS verification")
124138
generateCmd.Flags().IntVarP(&MaxSeed, "max-seed", "m", 40, "Maximum number of seed Q&A pairs to process to SDG.")
139+
generateCmd.Flags().StringVarP(&CleanupStrategy, "cleanup-strategy", "", "lazy", "Which strategy should be used to clean the worker after upload. Options: [\"lazy\", \"immediate\", \"event-based\"]")
125140
_ = generateCmd.MarkFlagRequired("github-token")
126141
rootCmd.AddCommand(generateCmd)
127142
}
@@ -155,6 +170,7 @@ var generateCmd = &cobra.Command{
155170
sigChan := make(chan os.Signal, 1)
156171
jobChan := make(chan string)
157172
stopChan := make(chan struct{})
173+
dpChan := make(chan DiskPressureEvent)
158174

159175
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
160176

@@ -196,15 +212,67 @@ var generateCmd = &cobra.Command{
196212
go func(ch <-chan string) {
197213
defer wg.Done()
198214
for job := range ch {
199-
jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed)
215+
jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed, CleanupStrategy)
200216
jp.processJob()
201217
}
202218
}(jobChan)
203219

220+
wg.Add(1)
221+
go func(ch <-chan DiskPressureEvent, stopChan <-chan struct{}) {
222+
defer wg.Done()
223+
timer := time.NewTicker(5 * time.Minute)
224+
for {
225+
select {
226+
case <-stopChan:
227+
sugar.Info("Shutting down disk-pressure listener")
228+
close(jobChan)
229+
return
230+
case <-timer.C:
231+
diskPressure, err := DiskUsage()
232+
var pressureEvent DiskPressureEvent
233+
if err != nil {
234+
sugar.Errorf("Could not get disk usage: %v", err)
235+
continue
236+
}
237+
if diskPressure < 33 {
238+
pressureEvent = lowDiskPressureEvent
239+
} else if diskPressure < 66 {
240+
pressureEvent = mediumDiskPressureEvent
241+
} else {
242+
pressureEvent = highDiskPressureEvent
243+
244+
}
245+
dpChan <- pressureEvent
246+
}
247+
}
248+
}(dpChan, stopChan)
204249
wg.Wait()
205250
},
206251
}
207252

253+
// return disk pressure of root directory as an integer of percentage
254+
func DiskUsage() (int, error) {
255+
var cmd *exec.Cmd
256+
var out []byte
257+
var err error
258+
259+
if _, err := os.Stat("/etc/os-release"); err == nil {
260+
cmd = exec.Command("df", "--output=pcent", "/", "|", "tail", "-n", "1", "|", "sed", "'s/.$//'")
261+
} else {
262+
cmd = exec.Command("df", "-P", "/", "|", "tail", "-n", "1", "|", "awk", "'{print $5}'", "|", "sed", "'s/.$//'")
263+
}
264+
if out, err = cmd.Output(); err != nil {
265+
return 0, err
266+
}
267+
usageStr := strings.TrimSpace(string(out))
268+
usage, err := strconv.Atoi(usageStr)
269+
if err != nil {
270+
return 0, err
271+
}
272+
273+
return usage, nil
274+
}
275+
208276
// runPrecheck runs lab chat against git diffed yaml files
209277
func (w *Worker) runPrecheck(lab, outputDir, modelName string) error {
210278
workDir := "."
@@ -372,6 +440,28 @@ func (w *Worker) runPrecheck(lab, outputDir, modelName string) error {
372440
time.Sleep(1 * time.Second)
373441
}
374442
}
443+
// Cleanup actions
444+
files_path := fmt.Sprintf("%s/*", workDir)
445+
var files_path_abs string
446+
files_path_abs, err = filepath.Abs(files_path)
447+
if err != nil {
448+
w.logger.Errorf("Could not get absolute path of workDir path: %v", err)
449+
}
450+
if w.cleanupStrategy == lazy {
451+
cmd = exec.Command("rm", "-rf", files_path_abs, "|", "at", "now", "+", strconv.Itoa(lazyStategyHours), "hours")
452+
if _, err = cmd.Output(); err != nil {
453+
w.logger.Errorf("Could schedule cleanup of files from directory: %s in %s hours. Error: %v", files_path_abs, strconv.Itoa(lazyStategyHours), err)
454+
return err
455+
}
456+
w.logger.Infof("Cleanup has been scheduled on %s directory in %s hours.", files_path_abs, strconv.Itoa(lazyStategyHours))
457+
} else if w.cleanupStrategy == immediate {
458+
cmd = exec.Command("rm", "-rf", files_path_abs)
459+
if _, err = cmd.Output(); err != nil {
460+
w.logger.Errorf("Could not clean files from directory: %s. Error: %v", files_path_abs, err)
461+
return err
462+
}
463+
}
464+
375465
return nil
376466
}
377467

@@ -603,6 +693,26 @@ func (w *Worker) processJob() {
603693
// Notify the "results" queue that the job is done with the public URL
604694
w.postJobResults(indexPublicURL, jobType)
605695
sugar.Infof("Job done")
696+
697+
// Cleanup actions
698+
files_path := fmt.Sprintf("%s/*", outputDir)
699+
var files_path_abs string
700+
files_path_abs, err = filepath.Abs(files_path)
701+
if err != nil {
702+
sugar.Errorf("Could not get absolute path of workDir path: %v", err)
703+
}
704+
if w.cleanupStrategy == lazy {
705+
cmd = exec.Command("rm", "-rf", files_path_abs, "|", "at", "now", "+", strconv.Itoa(lazyStategyHours), "hours")
706+
if _, err = cmd.Output(); err != nil {
707+
sugar.Errorf("Could schedule cleanup of files from directory: %s in %s hours. Error: %v", files_path_abs, strconv.Itoa(lazyStategyHours), err)
708+
}
709+
sugar.Infof("Cleanup has been scheduled on %s directory in %s hours.", files_path_abs, strconv.Itoa(lazyStategyHours))
710+
} else if w.cleanupStrategy == immediate {
711+
cmd = exec.Command("rm", "-rf", files_path_abs)
712+
if _, err = cmd.Output(); err != nil {
713+
sugar.Errorf("Could not clean files from directory: %s. Error: %v", files_path_abs, err)
714+
}
715+
}
606716
}
607717

608718
// gitOperations handles the Git-related operations for a job and returns the head hash

worker/cmd/generate_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func TestFetchModelName(t *testing.T) {
158158
"dummy-client-key-path.pem",
159159
"dummy-ca-cert-path.pem",
160160
20,
161+
"immediate",
161162
)
162163

163164
modelName, err := w.fetchModelName(false)
@@ -219,6 +220,7 @@ func TestFetchModelNameWithInvalidObject(t *testing.T) {
219220
"dummy-client-key-path.pem",
220221
"dummy-ca-cert-path.pem",
221222
20,
223+
"lazy",
222224
)
223225
modelName, err := w.fetchModelName(false)
224226

0 commit comments

Comments
 (0)