Skip to content

implementing storage cleaningsystem with options #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/test-at.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: Check for at command

on: [push, pull_request]

jobs:
check-at-command:
runs-on: ubuntu-latest
steps:
- name: Check for at command
run: dpkg-query -l at
132 changes: 121 additions & 11 deletions worker/cmd/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -52,18 +53,29 @@ var (
TlsInsecure bool
MaxSeed int
TaxonomyFolders = []string{"compositional_skills", "knowledge"}
CleanupStrategy string
)

type CleanupStrategyType string
type DiskPressureEvent int

const (
gitMaxRetries = 5
gitRetryDelay = 2 * time.Second
ilabConfigPath = "config.yaml"
localEndpoint = "http://localhost:8000/v1"
jobSDG = "sdg-svc"
jobGenerateLocal = "generate"
jobPreCheck = "precheck"
sdgModel = "mistralai/mixtral-8x7b-instruct-v0-1"
jsonViewerFilenameSuffix = "-viewer.html"
gitMaxRetries = 5
gitRetryDelay = 2 * time.Second
ilabConfigPath = "config.yaml"
localEndpoint = "http://localhost:8000/v1"
jobSDG = "sdg-svc"
jobGenerateLocal = "generate"
jobPreCheck = "precheck"
sdgModel = "mistralai/mixtral-8x7b-instruct-v0-1"
jsonViewerFilenameSuffix = "-viewer.html"
lazyStategyHours = 72
lazy CleanupStrategyType = "lazy"
immediate CleanupStrategyType = "immediate"
eventBased CleanupStrategyType = "event-based"
lowDiskPressureEvent = iota
mediumDiskPressureEvent
highDiskPressureEvent
)

// Worker encapsulates dependencies and methods to process jobs
Expand All @@ -80,9 +92,10 @@ type Worker struct {
tlsClientKeyPath string
tlsServerCaCertPath string
maxSeed int
cleanupStrategy CleanupStrategyType
}

func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logger *zap.SugaredLogger, job, precheckEndpoint, sdgEndpoint, tlsClientCertPath, tlsClientKeyPath, tlsServerCaCertPath string, maxSeed int) *Worker {
func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logger *zap.SugaredLogger, job, precheckEndpoint, sdgEndpoint, tlsClientCertPath, tlsClientKeyPath, tlsServerCaCertPath string, maxSeed int, cleanupStrategy string) *Worker {
return &Worker{
ctx: ctx,
pool: pool,
Expand All @@ -96,6 +109,7 @@ func NewJobProcessor(ctx context.Context, pool *redis.Pool, svc *s3.Client, logg
tlsClientKeyPath: tlsClientKeyPath,
tlsServerCaCertPath: tlsServerCaCertPath,
maxSeed: maxSeed,
cleanupStrategy: CleanupStrategyType(cleanupStrategy),
}
}

Expand All @@ -122,6 +136,7 @@ func init() {
generateCmd.Flags().StringVarP(&TlsServerCaCertPath, "tls-server-ca-cert", "", "server-ca-crt.pem2", "Path to the TLS server CA certificate. Defaults to 'server-ca-crt.pem2'")
generateCmd.Flags().BoolVarP(&TlsInsecure, "tls-insecure", "", false, "Whether to skip TLS verification")
generateCmd.Flags().IntVarP(&MaxSeed, "max-seed", "m", 40, "Maximum number of seed Q&A pairs to process to SDG.")
generateCmd.Flags().StringVarP(&CleanupStrategy, "cleanup-strategy", "", "lazy", "Which strategy should be used to clean the worker after upload. Options: [\"lazy\", \"immediate\", \"event-based\"]")
_ = generateCmd.MarkFlagRequired("github-token")
rootCmd.AddCommand(generateCmd)
}
Expand Down Expand Up @@ -155,6 +170,7 @@ var generateCmd = &cobra.Command{
sigChan := make(chan os.Signal, 1)
jobChan := make(chan string)
stopChan := make(chan struct{})
dpChan := make(chan DiskPressureEvent)

signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

Expand Down Expand Up @@ -196,15 +212,67 @@ var generateCmd = &cobra.Command{
go func(ch <-chan string) {
defer wg.Done()
for job := range ch {
jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed)
jp := NewJobProcessor(ctx, pool, svc, sugar, job, PreCheckEndpointURL, SdgEndpointURL, TlsClientCertPath, TlsClientKeyPath, TlsServerCaCertPath, MaxSeed, CleanupStrategy)
jp.processJob()
}
}(jobChan)

wg.Add(1)
go func(ch <-chan DiskPressureEvent, stopChan <-chan struct{}) {
defer wg.Done()
timer := time.NewTicker(5 * time.Minute)
for {
select {
case <-stopChan:
sugar.Info("Shutting down disk-pressure listener")
close(jobChan)
return
case <-timer.C:
diskPressure, err := DiskUsage()
var pressureEvent DiskPressureEvent
if err != nil {
sugar.Errorf("Could not get disk usage: %v", err)
continue
}
if diskPressure < 33 {
pressureEvent = lowDiskPressureEvent
} else if diskPressure < 66 {
pressureEvent = mediumDiskPressureEvent
} else {
pressureEvent = highDiskPressureEvent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I could set the strategy to immediate if it hits highDiskPressureEvent...?

}
dpChan <- pressureEvent
}
}
}(dpChan, stopChan)
wg.Wait()
},
}

// return disk pressure of root directory as an integer of percentage
func DiskUsage() (int, error) {
var cmd *exec.Cmd
var out []byte
var err error

if _, err := os.Stat("/etc/os-release"); err == nil {
cmd = exec.Command("df", "--output=pcent", "/", "|", "tail", "-n", "1", "|", "sed", "'s/.$//'")
} else {
cmd = exec.Command("df", "-P", "/", "|", "tail", "-n", "1", "|", "awk", "'{print $5}'", "|", "sed", "'s/.$//'")
}
if out, err = cmd.Output(); err != nil {
return 0, err
}
usageStr := strings.TrimSpace(string(out))
usage, err := strconv.Atoi(usageStr)
if err != nil {
return 0, err
}

return usage, nil
}

// runPrecheck runs lab chat against git diffed yaml files
func (w *Worker) runPrecheck(lab, outputDir, modelName string) error {
workDir := "."
Expand Down Expand Up @@ -372,6 +440,28 @@ func (w *Worker) runPrecheck(lab, outputDir, modelName string) error {
time.Sleep(1 * time.Second)
}
}
// Cleanup actions
files_path := fmt.Sprintf("%s/*", workDir)
var files_path_abs string
files_path_abs, err = filepath.Abs(files_path)
if err != nil {
w.logger.Errorf("Could not get absolute path of workDir path: %v", err)
}
if w.cleanupStrategy == lazy {
cmd = exec.Command("rm", "-rf", files_path_abs, "|", "at", "now", "+", strconv.Itoa(lazyStategyHours), "hours")
if _, err = cmd.Output(); err != nil {
w.logger.Errorf("Could schedule cleanup of files from directory: %s in %s hours. Error: %v", files_path_abs, strconv.Itoa(lazyStategyHours), err)
return err
}
w.logger.Infof("Cleanup has been scheduled on %s directory in %s hours.", files_path_abs, strconv.Itoa(lazyStategyHours))
} else if w.cleanupStrategy == immediate {
cmd = exec.Command("rm", "-rf", files_path_abs)
if _, err = cmd.Output(); err != nil {
w.logger.Errorf("Could not clean files from directory: %s. Error: %v", files_path_abs, err)
return err
}
}

return nil
}

Expand Down Expand Up @@ -603,6 +693,26 @@ func (w *Worker) processJob() {
// Notify the "results" queue that the job is done with the public URL
w.postJobResults(indexPublicURL, jobType)
sugar.Infof("Job done")

// Cleanup actions
files_path := fmt.Sprintf("%s/*", outputDir)
var files_path_abs string
files_path_abs, err = filepath.Abs(files_path)
if err != nil {
sugar.Errorf("Could not get absolute path of workDir path: %v", err)
}
if w.cleanupStrategy == lazy {
cmd = exec.Command("rm", "-rf", files_path_abs, "|", "at", "now", "+", strconv.Itoa(lazyStategyHours), "hours")
if _, err = cmd.Output(); err != nil {
sugar.Errorf("Could schedule cleanup of files from directory: %s in %s hours. Error: %v", files_path_abs, strconv.Itoa(lazyStategyHours), err)
}
sugar.Infof("Cleanup has been scheduled on %s directory in %s hours.", files_path_abs, strconv.Itoa(lazyStategyHours))
} else if w.cleanupStrategy == immediate {
cmd = exec.Command("rm", "-rf", files_path_abs)
if _, err = cmd.Output(); err != nil {
sugar.Errorf("Could not clean files from directory: %s. Error: %v", files_path_abs, err)
}
}
}

// gitOperations handles the Git-related operations for a job and returns the head hash
Expand Down
2 changes: 2 additions & 0 deletions worker/cmd/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func TestFetchModelName(t *testing.T) {
"dummy-client-key-path.pem",
"dummy-ca-cert-path.pem",
20,
"immediate",
)

modelName, err := w.fetchModelName(false)
Expand Down Expand Up @@ -219,6 +220,7 @@ func TestFetchModelNameWithInvalidObject(t *testing.T) {
"dummy-client-key-path.pem",
"dummy-ca-cert-path.pem",
20,
"lazy",
)
modelName, err := w.fetchModelName(false)

Expand Down