diff --git a/.github/workflows/test-at.yaml b/.github/workflows/test-at.yaml new file mode 100644 index 00000000..081d7ed5 --- /dev/null +++ b/.github/workflows/test-at.yaml @@ -0,0 +1,10 @@ +name: Check for at command + +on: [push, pull_request] + +jobs: + check-at-command: + runs-on: ubuntu-latest + steps: + - name: Check for at command + run: dpkg-query -l at diff --git a/worker/cmd/generate.go b/worker/cmd/generate.go index 89a4565e..6ffea7d6 100644 --- a/worker/cmd/generate.go +++ b/worker/cmd/generate.go @@ -16,6 +16,7 @@ import ( "os/signal" "path" "path/filepath" + "strconv" "strings" "sync" "syscall" @@ -52,18 +53,29 @@ var ( TlsInsecure bool MaxSeed int TaxonomyFolders = []string{"compositional_skills", "knowledge"} + CleanupStrategy string ) +type CleanupStrategyType string +type DiskPressureEvent int + const ( - gitMaxRetries = 5 - gitRetryDelay = 2 * time.Second - ilabConfigPath = "config.yaml" - localEndpoint = "http://localhost:8000/v1" - jobSDG = "sdg-svc" - jobGenerateLocal = "generate" - jobPreCheck = "precheck" - sdgModel = "mistralai/mixtral-8x7b-instruct-v0-1" - jsonViewerFilenameSuffix = "-viewer.html" + gitMaxRetries = 5 + gitRetryDelay = 2 * time.Second + ilabConfigPath = "config.yaml" + localEndpoint = "http://localhost:8000/v1" + jobSDG = "sdg-svc" + jobGenerateLocal = "generate" + jobPreCheck = "precheck" + sdgModel = "mistralai/mixtral-8x7b-instruct-v0-1" + jsonViewerFilenameSuffix = "-viewer.html" + lazyStategyHours = 72 + lazy CleanupStrategyType = "lazy" + immediate CleanupStrategyType = "immediate" + eventBased CleanupStrategyType = "event-based" + lowDiskPressureEvent = iota + mediumDiskPressureEvent + highDiskPressureEvent ) // Worker encapsulates dependencies and methods to process jobs @@ -80,9 +92,10 @@ type Worker struct { tlsClientKeyPath string tlsServerCaCertPath string maxSeed int + cleanupStrategy CleanupStrategyType } -func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logger *zap.SugaredLogger, job, precheckEndpoint, sdgEndpoint, tlsClientCertPath, tlsClientKeyPath, tlsServerCaCertPath string, maxSeed int) *Worker { +func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logger *zap.SugaredLogger, job, precheckEndpoint, sdgEndpoint, tlsClientCertPath, tlsClientKeyPath, tlsServerCaCertPath string, maxSeed int, cleanupStrategy string) *Worker { return &Worker{ ctx: ctx, pool: pool, @@ -96,6 +109,7 @@ func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logg tlsClientKeyPath: tlsClientKeyPath, tlsServerCaCertPath: tlsServerCaCertPath, maxSeed: maxSeed, + cleanupStrategy: CleanupStrategyType(cleanupStrategy), } } @@ -122,6 +136,7 @@ func init() { generateCmd.Flags().StringVarP(&TlsServerCaCertPath, "tls-server-ca-cert", "", "server-ca-crt.pem2", "Path to the TLS server CA certificate. Defaults to 'server-ca-crt.pem2'") generateCmd.Flags().BoolVarP(&TlsInsecure, "tls-insecure", "", false, "Whether to skip TLS verification") generateCmd.Flags().IntVarP(&MaxSeed, "max-seed", "m", 40, "Maximum number of seed Q&A pairs to process to SDG.") + generateCmd.Flags().StringVarP(&CleanupStrategy, "cleanup-strategy", "", "lazy", "Which strategy should be used to clean the worker after upload. Options: [\"lazy\", \"immediate\", \"event-based\"]") _ = generateCmd.MarkFlagRequired("github-token") rootCmd.AddCommand(generateCmd) } @@ -155,6 +170,7 @@ var generateCmd = &cobra.Command{ sigChan := make(chan os.Signal, 1) jobChan := make(chan string) stopChan := make(chan struct{}) + dpChan := make(chan DiskPressureEvent) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) @@ -196,15 +212,67 @@ var generateCmd = &cobra.Command{ go func(ch <-chan string) { defer wg.Done() for job := range ch { - jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed) + jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed, CleanupStrategy) jp.processJob() } }(jobChan) + wg.Add(1) + go func(ch <-chan DiskPressureEvent, stopChan <-chan struct{}) { + defer wg.Done() + timer := time.NewTicker(5 * time.Minute) + for { + select { + case <-stopChan: + sugar.Info("Shutting down disk-pressure listener") + close(jobChan) + return + case <-timer.C: + diskPressure, err := DiskUsage() + var pressureEvent DiskPressureEvent + if err != nil { + sugar.Errorf("Could not get disk usage: %v", err) + continue + } + if diskPressure < 33 { + pressureEvent = lowDiskPressureEvent + } else if diskPressure < 66 { + pressureEvent = mediumDiskPressureEvent + } else { + pressureEvent = highDiskPressureEvent + + } + dpChan <- pressureEvent + } + } + }(dpChan, stopChan) wg.Wait() }, } +// return disk pressure of root directory as an integer of percentage +func DiskUsage() (int, error) { + var cmd *exec.Cmd + var out []byte + var err error + + if _, err := os.Stat("/etc/os-release"); err == nil { + cmd = exec.Command("df", "--output=pcent", "/", "|", "tail", "-n", "1", "|", "sed", "'s/.$//'") + } else { + cmd = exec.Command("df", "-P", "/", "|", "tail", "-n", "1", "|", "awk", "'{print $5}'", "|", "sed", "'s/.$//'") + } + if out, err = cmd.Output(); err != nil { + return 0, err + } + usageStr := strings.TrimSpace(string(out)) + usage, err := strconv.Atoi(usageStr) + if err != nil { + return 0, err + } + + return usage, nil +} + // runPrecheck runs lab chat against git diffed yaml files func (w *Worker) runPrecheck(lab, outputDir, modelName string) error { workDir := "." @@ -372,6 +440,28 @@ func (w *Worker) runPrecheck(lab, outputDir, modelName string) error { time.Sleep(1 * time.Second) } } + // Cleanup actions + files_path := fmt.Sprintf("%s/*", workDir) + var files_path_abs string + files_path_abs, err = filepath.Abs(files_path) + if err != nil { + w.logger.Errorf("Could not get absolute path of workDir path: %v", err) + } + if w.cleanupStrategy == lazy { + cmd = exec.Command("rm", "-rf", files_path_abs, "|", "at", "now", "+", strconv.Itoa(lazyStategyHours), "hours") + if _, err = cmd.Output(); err != nil { + w.logger.Errorf("Could schedule cleanup of files from directory: %s in %s hours. Error: %v", files_path_abs, strconv.Itoa(lazyStategyHours), err) + return err + } + w.logger.Infof("Cleanup has been scheduled on %s directory in %s hours.", files_path_abs, strconv.Itoa(lazyStategyHours)) + } else if w.cleanupStrategy == immediate { + cmd = exec.Command("rm", "-rf", files_path_abs) + if _, err = cmd.Output(); err != nil { + w.logger.Errorf("Could not clean files from directory: %s. Error: %v", files_path_abs, err) + return err + } + } + return nil } @@ -603,6 +693,26 @@ func (w *Worker) processJob() { // Notify the "results" queue that the job is done with the public URL w.postJobResults(indexPublicURL, jobType) sugar.Infof("Job done") + + // Cleanup actions + files_path := fmt.Sprintf("%s/*", outputDir) + var files_path_abs string + files_path_abs, err = filepath.Abs(files_path) + if err != nil { + sugar.Errorf("Could not get absolute path of workDir path: %v", err) + } + if w.cleanupStrategy == lazy { + cmd = exec.Command("rm", "-rf", files_path_abs, "|", "at", "now", "+", strconv.Itoa(lazyStategyHours), "hours") + if _, err = cmd.Output(); err != nil { + sugar.Errorf("Could schedule cleanup of files from directory: %s in %s hours. Error: %v", files_path_abs, strconv.Itoa(lazyStategyHours), err) + } + sugar.Infof("Cleanup has been scheduled on %s directory in %s hours.", files_path_abs, strconv.Itoa(lazyStategyHours)) + } else if w.cleanupStrategy == immediate { + cmd = exec.Command("rm", "-rf", files_path_abs) + if _, err = cmd.Output(); err != nil { + sugar.Errorf("Could not clean files from directory: %s. Error: %v", files_path_abs, err) + } + } } // gitOperations handles the Git-related operations for a job and returns the head hash diff --git a/worker/cmd/generate_test.go b/worker/cmd/generate_test.go index 6102c18e..d98b00d0 100644 --- a/worker/cmd/generate_test.go +++ b/worker/cmd/generate_test.go @@ -158,6 +158,7 @@ func TestFetchModelName(t *testing.T) { "dummy-client-key-path.pem", "dummy-ca-cert-path.pem", 20, + "immediate", ) modelName, err := w.fetchModelName(false) @@ -219,6 +220,7 @@ func TestFetchModelNameWithInvalidObject(t *testing.T) { "dummy-client-key-path.pem", "dummy-ca-cert-path.pem", 20, + "lazy", ) modelName, err := w.fetchModelName(false)