Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ed46a47
Cast string "array.id" to integer
Oct 9, 2014
419fbb3
makeClusterFunctionsTorque with SSH
Oct 9, 2014
29369d9
Added array.id[] to list of jobs
Oct 9, 2014
a17396e
Queue server over ssh in config
Oct 9, 2014
3dec83d
Line break typo
Oct 9, 2014
e84462f
Removed site specific .login$
Oct 16, 2014
e5c731c
Patched from tudo
Oct 29, 2014
f98ea1c
commits from tudo added
Oct 29, 2014
2d00cd1
added ssh option back to listJobs
Oct 29, 2014
a3b5ba8
broken listJobs for arrayjobs repaired
Oct 29, 2014
74942f2
object name corrected
Oct 29, 2014
95d8dbb
ignores updated
Jan 12, 2015
1328376
count all jobs for max concurrent jobs
Jan 13, 2015
8394987
count all jobs for max concurrent jobs
Jan 13, 2015
dbc3429
add config option max.arrayjobs
Feb 17, 2015
660bd65
Add max.arrayjobs to getConfig()
Feb 24, 2015
6216bc9
Merged with tudo-r/Batchjobs
Mar 5, 2015
a1b540b
Merge remote-tracking branch 'origin/limitjobs' into merge-updates
Mar 5, 2015
c7cfdd6
Merge branch 'merge-updates'
May 13, 2015
b175fed
merge completed
Jun 2, 2015
cf55738
IMBS custom configuration output
Jun 2, 2015
8751f72
Merge branch 'merge-updates'
Jul 7, 2015
267ec17
Updated version from tudo merged
Oct 19, 2015
8ef616b
added max.arrayjobs as allowed config parameter
Oct 19, 2015
89c9a27
adapted to new runOSCommandLinux interface
Oct 19, 2015
db1e83f
Merge branch 'master' of https://github.com/tudo-r/BatchJobs into mer…
Feb 8, 2016
a0f2e50
simplified removal of array id from batch id
Feb 8, 2016
01cca09
output ids in array for debug
Feb 8, 2016
c274b8f
select correct jobs when chunking
Feb 8, 2016
503aeea
removed debugging code lines
Feb 8, 2016
2106b28
return jobs as named list
Feb 8, 2016
f7f93a3
access job by name
Feb 8, 2016
6efeb0c
get list element not list of elements
Feb 8, 2016
a7b07bb
staged.queries is also used if fs.timeout is set
Feb 9, 2016
ef5d5f1
Merge branch 'merge-updates'
Feb 9, 2016
0bcbb81
wait for pbs script to appear on file system. On heavy loaded network…
May 19, 2016
455f152
wait for pbs script to appear on file system. On heavy loaded network…
May 19, 2016
12cf7ad
Merge branch 'master' of https://github.com/dagola/BatchJobs
May 19, 2016
ce69e21
wait for pbs file on network file systems
May 19, 2016
f02b5f4
Merge branch 'master' of https://github.com/tudo-r/BatchJobs into mer…
Jul 25, 2016
9fa26fc
Merge branch 'master' into merge-updates
Jul 25, 2016
256ab95
adds -9.13.2.19 to indicate IMBS version
Jul 25, 2016
906379b
removes deprecated alljobs argument
Aug 17, 2016
1371e57
submit, list and kill jobs via ssh
Aug 29, 2016
5b4e939
Merge branch 'master' of github.com:tudo-r/BatchJobs into slurm
Sep 1, 2016
2d76057
new version
Sep 1, 2016
60c683c
remove max.arrayjobs
Sep 1, 2016
5796015
remove duplicated max.conc.jobs
Sep 1, 2016
9b0346c
Merge pull request #1 from imbs-hl/slurm
mnwright Sep 1, 2016
60717d0
get config for ssh access
Sep 2, 2016
203b834
Merge branch 'master' of https://github.com/dagola/BatchJobs
Sep 2, 2016
de817f5
changes to the directory containing file for sourcing
Nov 6, 2016
64a1c1a
pass more.args down to reducing function
Mar 23, 2017
5c25d36
fixed conflicts
Jun 7, 2018
add47a8
Merge branch 'master' of https://github.com/tudo-r/BatchJobs into tud…
May 20, 2020
17cd843
Merge branch 'tudo-r-master'
May 20, 2020
3b04b93
add evil IMBS version flag
May 20, 2020
bc5491a
fix conflict
Jun 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
tests/testthat/unittests-files

BatchJobs_*.tar.gz
BatchJobs.Rcheck
.Rproj.user
BatchJobs.Rproj
.Rbuildignore
*.Rproj
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ Suggests:
testthat
LazyData: yes
ByteCompile: yes
Version: 1.8
Version: 1.8-666
RoxygenNote: 7.1.0
Encoding: UTF-8
4 changes: 2 additions & 2 deletions R/batchReduceResults.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ batchReduceResults = function(reg, reg2, fun, ids, part = NA_character_, init, b
more.args = c(more.args, list(..reg = reg, ..fun = fun, ..part = part)))
}

batchReduceResultsWrapper = function(aggr, x, ..reg, ..fun, ..part) {
batchReduceResultsWrapper = function(aggr, x, ..reg, ..fun, ..part, ...) {
# x is id
# use lazy evaluation, if fun doesn't access job or res (unlikely)
..fun(aggr = aggr, job = getJob(..reg, x, check.id = FALSE),
res = getResult(..reg, x, ..part))
res = getResult(..reg, x, ..part), ...)
}
12 changes: 9 additions & 3 deletions R/clusterFunctionsHelpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ cfBrewTemplate = function(conf, template, rscript, extension) {
assertString(template)
assertString(rscript)
assertString(extension)
if (conf$debug) {
# if debug, place in jobs dir
if (conf$debug | conf$ssh) {
# if debug or job node over ssh, place in jobs dir
outfile = sub("\\.R$", sprintf(".%s", extension), rscript)
} else {
outfile = tempfile("template")
Expand Down Expand Up @@ -113,8 +113,14 @@ cfKillBatchJob = function(cmd, batch.job.id, max.tries = 3L) {
max.tries = asCount(max.tries)
assertCount(max.tries)

conf = getBatchJobsConf()
for (tmp in seq_len(max.tries)) {
res = runOSCommandLinux(cmd, batch.job.id, stop.on.exit.code = FALSE)
res = runOSCommandLinux(cmd, batch.job.id,
stop.on.exit.code = FALSE,
ssh = conf$ssh,
nodename = conf$node,
ssh.cmd = "ssh",
ssh.args = "")
if (res$exit.code == 0L)
return()
Sys.sleep(1)
Expand Down
13 changes: 11 additions & 2 deletions R/clusterFunctionsSLURM.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ makeClusterFunctionsSLURM = function(template.file, list.jobs.cmd = c("squeue",

submitJob = function(conf, reg, job.name, rscript, log.file, job.dir, resources, arrayjobs) {
outfile = cfBrewTemplate(conf, template, rscript, "sb")
res = runOSCommandLinux("sbatch", outfile, stop.on.exit.code = FALSE)
res = runOSCommandLinux("sbatch", outfile,
stop.on.exit.code = FALSE,
ssh = conf$ssh,
nodename = conf$node,
ssh.cmd = "ssh",
ssh.args = "")

max.jobs.msg = "sbatch: error: Batch job submission failed: Job violates accounting policy (job submit limit, user's size and/or time limits)"
temp.error = "Socket timed out on send/recv operation"
Expand All @@ -50,7 +55,11 @@ makeClusterFunctionsSLURM = function(template.file, list.jobs.cmd = c("squeue",

listJobs = function(conf, reg) {
# Result is lines of fully quantified batch.job.ids
jids = runOSCommandLinux(list.jobs.cmd[1L], list.jobs.cmd[-1L])$output
jids = runOSCommandLinux(list.jobs.cmd[1L], list.jobs.cmd[-1L],
ssh = conf$ssh,
nodename = conf$node,
ssh.cmd = "ssh",
ssh.args = "")$output
stri_extract_first_regex(jids, "[0-9]+")
}

Expand Down
16 changes: 11 additions & 5 deletions R/conf.R
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ getConfNames = function() {
c("cluster.functions", "mail.start", "mail.done", "mail.error",
"mail.from", "mail.to", "mail.control", "db.driver", "db.options",
"default.resources", "debug", "raise.warnings", "staged.queries",
"max.concurrent.jobs", "fs.timeout", "measure.mem")
"max.concurrent.jobs", "fs.timeout", "measure.mem", "ssh", "node")
}

checkConf = function(conf) {
Expand All @@ -102,7 +102,7 @@ checkConf = function(conf) {

checkConfElements = function(cluster.functions, mail.to, mail.from,
mail.start, mail.done, mail.error, mail.control, db.driver, db.options, default.resources, debug,
raise.warnings, staged.queries, max.concurrent.jobs, fs.timeout, measure.mem) {
raise.warnings, staged.queries, max.concurrent.jobs, fs.timeout, ssh, node, measure.mem) {

mail.choices = c("none", "first", "last", "first+last", "all")

Expand Down Expand Up @@ -136,6 +136,10 @@ checkConfElements = function(cluster.functions, mail.to, mail.from,
assertCount(max.concurrent.jobs)
if (!missing(fs.timeout))
assertNumber(fs.timeout)
if (!missing(ssh))
assertFlag(ssh)
if (!missing(node))
assertString(node)
if (!missing(measure.mem))
assertFlag(measure.mem)
}
Expand Down Expand Up @@ -164,11 +168,13 @@ printableConf = function(conf) {
" staged.queries: %s",
" max.concurrent.jobs: %s",
" fs.timeout: %s",
" measure.mem: %s\n",
" measure.mem: %s",
" ssh: %s",
" node: %s\n",
sep = "\n")
sprintf(fmt, x$cluster.functions$name, x$mail.from, x$mail.to, x$mail.start, x$mail.done,
x$mail.error, convertToShortString(x$default.resources), x$debug, x$raise.warnings,
x$staged.queries, x$max.concurrent.jobs, x$fs.timeout, x$measure.mem)
x$mail.error, convertToShortString(x$default.resources), x$debug, x$raise.warnings,
x$staged.queries, x$max.concurrent.jobs, x$fs.timeout, x$measure.mem, x$ssh, x$node)
}


Expand Down
10 changes: 5 additions & 5 deletions R/doJob.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ doJob = function(reg, ids, multiple.result.files, staged, disable.mail, first, l
}

for (i in seq_len(n)) {
job = jobs[[i]]
job = jobs[[as.character(ids[i])]]
messagef("########## Executing jid=%s ##########", job$id)
started = Sys.time()
msg.buf$push(dbMakeMessageStarted(reg, ids[i], time = as.integer(started)))
Expand Down Expand Up @@ -86,7 +86,7 @@ doJob = function(reg, ids, multiple.result.files, staged, disable.mail, first, l
results[i] = "multiple.result.files is TRUE, but your algorithm did not return a list!"
error[i] = TRUE
} else if (!isProperlyNamed(result)) {
results[i] = "multiple.result.files is TRUE, but some the returned lists is not fully, distinctly named!"
results[i] = "multiple.result.files is TRUE, but some the returned lists is not fully, distinctly named!"
error[i] = TRUE
}
}
Expand Down Expand Up @@ -118,9 +118,9 @@ doJob = function(reg, ids, multiple.result.files, staged, disable.mail, first, l
# check if there are still remaining messages
if (!msg.buf$empty()) {
mail.extra.msg = paste("Some DB messages could not be flushed.",
"This indicates some DB problem or too much communication with the DB.",
"Everything should still be ok, you only might have to resubmit some jobs as they are not recorded as 'done'.",
sep = "\n")
"This indicates some DB problem or too much communication with the DB.",
"Everything should still be ok, you only might have to resubmit some jobs as they are not recorded as 'done'.",
sep = "\n")
warningf(mail.extra.msg)
}

Expand Down
4 changes: 3 additions & 1 deletion R/getJob.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ getJobs = function(reg, ids, check.ids = TRUE) {
getJobs.Registry = function(reg, ids, check.ids = TRUE) {
if (!missing(ids) && check.ids)
ids = checkIds(reg, ids)
dbGetJobs(reg, ids)
jobs <- dbGetJobs(reg, ids)
names(jobs) <- sapply(jobs, function(x) x$id)
jobs
}
2 changes: 1 addition & 1 deletion R/sourceRegistryFiles.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sourceRegistryFilesInternal = function(work.dir, dirs, files, envir = .GlobalEnv
if (length(w))
stopf("Directories to source not found, e.g. %s", dirs[w])

lapply(c(getRScripts(dirs), files), sys.source, envir = envir)
lapply(c(getRScripts(dirs), files), sys.source, envir = envir, chdir = TRUE)
invisible(TRUE)
}

Expand Down
1 change: 1 addition & 0 deletions R/submitJobs.R
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ submitJobs = function(reg, ids, resources = list(), wait, max.retries = 10L, chu
resources = resources,
arrayjobs = if(chunks.as.arrayjobs) length(id) else 1L
)
Sys.sleep(1)
}

### validate status returned from cluster functions
Expand Down