diff --git a/.gitignore b/.gitignore index 36a13ee..d1f53d9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ tests/testthat/unittests-files - BatchJobs_*.tar.gz BatchJobs.Rcheck .Rproj.user +BatchJobs.Rproj +.Rbuildignore *.Rproj diff --git a/DESCRIPTION b/DESCRIPTION index 67df3b9..e699e91 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -34,6 +34,6 @@ Suggests: testthat LazyData: yes ByteCompile: yes -Version: 1.8 +Version: 1.8-666 RoxygenNote: 7.1.0 Encoding: UTF-8 diff --git a/R/batchReduceResults.R b/R/batchReduceResults.R index 6b7d605..1cc8acd 100644 --- a/R/batchReduceResults.R +++ b/R/batchReduceResults.R @@ -71,9 +71,9 @@ batchReduceResults = function(reg, reg2, fun, ids, part = NA_character_, init, b more.args = c(more.args, list(..reg = reg, ..fun = fun, ..part = part))) } -batchReduceResultsWrapper = function(aggr, x, ..reg, ..fun, ..part) { +batchReduceResultsWrapper = function(aggr, x, ..reg, ..fun, ..part, ...) { # x is id # use lazy evaluation, if fun doesn't access job or res (unlikely) ..fun(aggr = aggr, job = getJob(..reg, x, check.id = FALSE), - res = getResult(..reg, x, ..part)) + res = getResult(..reg, x, ..part), ...) } diff --git a/R/clusterFunctionsHelpers.R b/R/clusterFunctionsHelpers.R index 4c92621..403c9b8 100644 --- a/R/clusterFunctionsHelpers.R +++ b/R/clusterFunctionsHelpers.R @@ -45,8 +45,8 @@ cfBrewTemplate = function(conf, template, rscript, extension) { assertString(template) assertString(rscript) assertString(extension) - if (conf$debug) { - # if debug, place in jobs dir + if (conf$debug | conf$ssh) { + # if debug or job node over ssh, place in jobs dir outfile = sub("\\.R$", sprintf(".%s", extension), rscript) } else { outfile = tempfile("template") @@ -113,8 +113,14 @@ cfKillBatchJob = function(cmd, batch.job.id, max.tries = 3L) { max.tries = asCount(max.tries) assertCount(max.tries) + conf = getBatchJobsConf() for (tmp in seq_len(max.tries)) { - res = runOSCommandLinux(cmd, batch.job.id, stop.on.exit.code = FALSE) + res = runOSCommandLinux(cmd, batch.job.id, + stop.on.exit.code = FALSE, + ssh = conf$ssh, + nodename = conf$node, + ssh.cmd = "ssh", + ssh.args = "") if (res$exit.code == 0L) return() Sys.sleep(1) diff --git a/R/clusterFunctionsSLURM.R b/R/clusterFunctionsSLURM.R index d3432f0..0debaac 100644 --- a/R/clusterFunctionsSLURM.R +++ b/R/clusterFunctionsSLURM.R @@ -27,7 +27,12 @@ makeClusterFunctionsSLURM = function(template.file, list.jobs.cmd = c("squeue", submitJob = function(conf, reg, job.name, rscript, log.file, job.dir, resources, arrayjobs) { outfile = cfBrewTemplate(conf, template, rscript, "sb") - res = runOSCommandLinux("sbatch", outfile, stop.on.exit.code = FALSE) + res = runOSCommandLinux("sbatch", outfile, + stop.on.exit.code = FALSE, + ssh = conf$ssh, + nodename = conf$node, + ssh.cmd = "ssh", + ssh.args = "") max.jobs.msg = "sbatch: error: Batch job submission failed: Job violates accounting policy (job submit limit, user's size and/or time limits)" temp.error = "Socket timed out on send/recv operation" @@ -50,7 +55,11 @@ makeClusterFunctionsSLURM = function(template.file, list.jobs.cmd = c("squeue", listJobs = function(conf, reg) { # Result is lines of fully quantified batch.job.ids - jids = runOSCommandLinux(list.jobs.cmd[1L], list.jobs.cmd[-1L])$output + jids = runOSCommandLinux(list.jobs.cmd[1L], list.jobs.cmd[-1L], + ssh = conf$ssh, + nodename = conf$node, + ssh.cmd = "ssh", + ssh.args = "")$output stri_extract_first_regex(jids, "[0-9]+") } diff --git a/R/conf.R b/R/conf.R index 18f0514..aa73c9a 100644 --- a/R/conf.R +++ b/R/conf.R @@ -89,7 +89,7 @@ getConfNames = function() { c("cluster.functions", "mail.start", "mail.done", "mail.error", "mail.from", "mail.to", "mail.control", "db.driver", "db.options", "default.resources", "debug", "raise.warnings", "staged.queries", - "max.concurrent.jobs", "fs.timeout", "measure.mem") + "max.concurrent.jobs", "fs.timeout", "measure.mem", "ssh", "node") } checkConf = function(conf) { @@ -102,7 +102,7 @@ checkConf = function(conf) { checkConfElements = function(cluster.functions, mail.to, mail.from, mail.start, mail.done, mail.error, mail.control, db.driver, db.options, default.resources, debug, - raise.warnings, staged.queries, max.concurrent.jobs, fs.timeout, measure.mem) { + raise.warnings, staged.queries, max.concurrent.jobs, fs.timeout, ssh, node, measure.mem) { mail.choices = c("none", "first", "last", "first+last", "all") @@ -136,6 +136,10 @@ checkConfElements = function(cluster.functions, mail.to, mail.from, assertCount(max.concurrent.jobs) if (!missing(fs.timeout)) assertNumber(fs.timeout) + if (!missing(ssh)) + assertFlag(ssh) + if (!missing(node)) + assertString(node) if (!missing(measure.mem)) assertFlag(measure.mem) } @@ -164,11 +168,13 @@ printableConf = function(conf) { " staged.queries: %s", " max.concurrent.jobs: %s", " fs.timeout: %s", - " measure.mem: %s\n", + " measure.mem: %s", + " ssh: %s", + " node: %s\n", sep = "\n") sprintf(fmt, x$cluster.functions$name, x$mail.from, x$mail.to, x$mail.start, x$mail.done, - x$mail.error, convertToShortString(x$default.resources), x$debug, x$raise.warnings, - x$staged.queries, x$max.concurrent.jobs, x$fs.timeout, x$measure.mem) + x$mail.error, convertToShortString(x$default.resources), x$debug, x$raise.warnings, + x$staged.queries, x$max.concurrent.jobs, x$fs.timeout, x$measure.mem, x$ssh, x$node) } diff --git a/R/doJob.R b/R/doJob.R index 8826f4d..b9a80f7 100644 --- a/R/doJob.R +++ b/R/doJob.R @@ -52,7 +52,7 @@ doJob = function(reg, ids, multiple.result.files, staged, disable.mail, first, l } for (i in seq_len(n)) { - job = jobs[[i]] + job = jobs[[as.character(ids[i])]] messagef("########## Executing jid=%s ##########", job$id) started = Sys.time() msg.buf$push(dbMakeMessageStarted(reg, ids[i], time = as.integer(started))) @@ -86,7 +86,7 @@ doJob = function(reg, ids, multiple.result.files, staged, disable.mail, first, l results[i] = "multiple.result.files is TRUE, but your algorithm did not return a list!" error[i] = TRUE } else if (!isProperlyNamed(result)) { - results[i] = "multiple.result.files is TRUE, but some the returned lists is not fully, distinctly named!" + results[i] = "multiple.result.files is TRUE, but some the returned lists is not fully, distinctly named!" error[i] = TRUE } } @@ -118,9 +118,9 @@ doJob = function(reg, ids, multiple.result.files, staged, disable.mail, first, l # check if there are still remaining messages if (!msg.buf$empty()) { mail.extra.msg = paste("Some DB messages could not be flushed.", - "This indicates some DB problem or too much communication with the DB.", - "Everything should still be ok, you only might have to resubmit some jobs as they are not recorded as 'done'.", - sep = "\n") + "This indicates some DB problem or too much communication with the DB.", + "Everything should still be ok, you only might have to resubmit some jobs as they are not recorded as 'done'.", + sep = "\n") warningf(mail.extra.msg) } diff --git a/R/getJob.R b/R/getJob.R index 4219367..aace161 100644 --- a/R/getJob.R +++ b/R/getJob.R @@ -35,5 +35,7 @@ getJobs = function(reg, ids, check.ids = TRUE) { getJobs.Registry = function(reg, ids, check.ids = TRUE) { if (!missing(ids) && check.ids) ids = checkIds(reg, ids) - dbGetJobs(reg, ids) + jobs <- dbGetJobs(reg, ids) + names(jobs) <- sapply(jobs, function(x) x$id) + jobs } diff --git a/R/sourceRegistryFiles.R b/R/sourceRegistryFiles.R index 6870b68..077188a 100644 --- a/R/sourceRegistryFiles.R +++ b/R/sourceRegistryFiles.R @@ -28,7 +28,7 @@ sourceRegistryFilesInternal = function(work.dir, dirs, files, envir = .GlobalEnv if (length(w)) stopf("Directories to source not found, e.g. %s", dirs[w]) - lapply(c(getRScripts(dirs), files), sys.source, envir = envir) + lapply(c(getRScripts(dirs), files), sys.source, envir = envir, chdir = TRUE) invisible(TRUE) } diff --git a/R/submitJobs.R b/R/submitJobs.R index d744943..969b984 100644 --- a/R/submitJobs.R +++ b/R/submitJobs.R @@ -219,6 +219,7 @@ submitJobs = function(reg, ids, resources = list(), wait, max.retries = 10L, chu resources = resources, arrayjobs = if(chunks.as.arrayjobs) length(id) else 1L ) + Sys.sleep(1) } ### validate status returned from cluster functions