Skip to content

Commit 4343e33

Browse files
Use separate script for long stage/unstage commands (#3851)
Signed-off-by: Ben Sherman <[email protected]> Signed-off-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent 17fcc67 commit 4343e33

File tree

5 files changed

+139
-7
lines changed

5 files changed

+139
-7
lines changed

modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import nextflow.util.Escape
3838

3939
import static java.nio.file.StandardOpenOption.*
4040

41+
import nextflow.util.MemoryUnit
42+
4143
/**
4244
* Builder to create the Bash script which is used to
4345
* wrap and launch the user task
@@ -48,10 +50,12 @@ import static java.nio.file.StandardOpenOption.*
4850
@CompileStatic
4951
class BashWrapperBuilder {
5052

53+
private static MemoryUnit DEFAULT_STAGE_FILE_THRESHOLD = MemoryUnit.of('1 MB')
5154
private static int DEFAULT_WRITE_BACK_OFF_BASE = 3
5255
private static int DEFAULT_WRITE_BACK_OFF_DELAY = 250
5356
private static int DEFAULT_WRITE_MAX_ATTEMPTS = 5
5457

58+
private MemoryUnit stageFileThreshold = SysEnv.get('NXF_WRAPPER_STAGE_FILE_THRESHOLD') as MemoryUnit ?: DEFAULT_STAGE_FILE_THRESHOLD
5559
private int writeBackOffBase = SysEnv.get('NXF_WRAPPER_BACK_OFF_BASE') as Integer ?: DEFAULT_WRITE_BACK_OFF_BASE
5660
private int writeBackOffDelay = SysEnv.get('NXF_WRAPPER_BACK_OFF_DELAY') as Integer ?: DEFAULT_WRITE_BACK_OFF_DELAY
5761
private int writeMaxAttempts = SysEnv.get('NXF_WRAPPER_MAX_ATTEMPTS') as Integer ?: DEFAULT_WRITE_MAX_ATTEMPTS
@@ -107,6 +111,10 @@ class BashWrapperBuilder {
107111

108112
private Path wrapperFile
109113

114+
private Path stageFile
115+
116+
private String stageScript
117+
110118
private BashTemplateEngine engine = new BashTemplateEngine()
111119

112120
BashWrapperBuilder( TaskRun task ) {
@@ -184,7 +192,20 @@ class BashWrapperBuilder {
184192
}
185193
result.toString()
186194
}
187-
195+
196+
protected String stageCommand(String stagingScript) {
197+
if( !stagingScript )
198+
return null
199+
200+
final header = "# stage input files\n"
201+
if( stagingScript.size() >= stageFileThreshold.bytes ) {
202+
stageScript = stagingScript
203+
return header + "bash ${stageFile}"
204+
}
205+
else
206+
return header + stagingScript
207+
}
208+
188209
protected Map<String,String> makeBinding() {
189210
/*
190211
* initialise command files
@@ -194,6 +215,7 @@ class BashWrapperBuilder {
194215
startedFile = workDir.resolve(TaskRun.CMD_START)
195216
exitedFile = workDir.resolve(TaskRun.CMD_EXIT)
196217
wrapperFile = workDir.resolve(TaskRun.CMD_RUN)
218+
stageFile = workDir.resolve(TaskRun.CMD_STAGE)
197219

198220
// set true when running with through a container engine
199221
runWithContainer = containerEnabled && !containerNative
@@ -275,7 +297,7 @@ class BashWrapperBuilder {
275297
* staging input files when required
276298
*/
277299
final stagingScript = copyStrategy.getStageInputFilesScript(inputFiles)
278-
binding.stage_inputs = stagingScript ? "# stage input files\n${stagingScript}" : null
300+
binding.stage_inputs = stageCommand(stagingScript)
279301

280302
binding.stdout_file = TaskRun.CMD_OUTFILE
281303
binding.stderr_file = TaskRun.CMD_ERRFILE
@@ -340,6 +362,8 @@ class BashWrapperBuilder {
340362
write0(targetScriptFile(), script)
341363
if( input != null )
342364
write0(targetInputFile(), input.toString())
365+
if( stageScript != null )
366+
write0(targetStageFile(), stageScript)
343367
return result
344368
}
345369

@@ -349,11 +373,16 @@ class BashWrapperBuilder {
349373

350374
protected Path targetInputFile() { return inputFile }
351375

376+
protected Path targetStageFile() { return stageFile }
377+
352378
private Path write0(Path path, String data) {
353379
int attempt=0
354380
while( true ) {
355381
try {
356-
return Files.write(path, data.getBytes(), CREATE,WRITE,TRUNCATE_EXISTING)
382+
try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) {
383+
writer.write(data)
384+
}
385+
return path
357386
}
358387
catch (FileSystemException | SocketException | RuntimeException e) {
359388
final isLocalFS = path.getFileSystem()==FileSystems.default

modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ class TaskRun implements Cloneable {
547547
static final public String CMD_EXIT = '.exitcode'
548548
static final public String CMD_START = '.command.begin'
549549
static final public String CMD_RUN = '.command.run'
550+
static final public String CMD_STAGE = '.command.stage'
550551
static final public String CMD_TRACE = '.command.trace'
551552
static final public String CMD_ENV = '.command.env'
552553

modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.nio.file.Path
2121
import java.nio.file.Paths
2222

2323
import nextflow.Session
24+
import nextflow.SysEnv
2425
import nextflow.container.ContainerConfig
2526
import nextflow.container.DockerBuilder
2627
import nextflow.container.SingularityBuilder
@@ -48,6 +49,8 @@ class BashWrapperBuilderTest extends Specification {
4849
bean.workDir = Paths.get('/work/dir')
4950
if( !bean.script )
5051
bean.script = 'echo Hello world!'
52+
if( !bean.containsKey('inputFiles') )
53+
bean.inputFiles = [:]
5154
new BashWrapperBuilder(bean as TaskBean) {
5255
@Override
5356
protected String getSecretsEnv() {
@@ -362,7 +365,17 @@ class BashWrapperBuilderTest extends Specification {
362365

363366
given:
364367
def folder = Paths.get('/work/dir')
365-
def inputs = ['sample_1.fq':Paths.get('/some/data/sample_1.fq'), 'sample_2.fq':Paths.get('/some/data/sample_2.fq'), ]
368+
def inputs = [
369+
'sample_1.fq': Paths.get('/some/data/sample_1.fq'),
370+
'sample_2.fq': Paths.get('/some/data/sample_2.fq'),
371+
]
372+
def stageScript = '''\
373+
# stage input files
374+
rm -f sample_1.fq
375+
rm -f sample_2.fq
376+
ln -s /some/data/sample_1.fq sample_1.fq
377+
ln -s /some/data/sample_2.fq sample_2.fq
378+
'''.stripIndent().rightTrim()
366379

367380
when:
368381
def binding = newBashWrapperBuilder([
@@ -371,15 +384,44 @@ class BashWrapperBuilderTest extends Specification {
371384
inputFiles: inputs ]).makeBinding()
372385

373386
then:
374-
binding.stage_inputs == '''\
375-
# stage input files
387+
binding.stage_inputs == stageScript
388+
}
389+
390+
def 'should stage inputs to external file' () {
391+
given:
392+
SysEnv.push([NXF_WRAPPER_STAGE_FILE_THRESHOLD: '100'])
393+
and:
394+
def folder = Files.createTempDirectory('test')
395+
and:
396+
def inputs = [
397+
'sample_1.fq': Paths.get('/some/data/sample_1.fq'),
398+
'sample_2.fq': Paths.get('/some/data/sample_2.fq'),
399+
]
400+
def stageScript = '''\
376401
rm -f sample_1.fq
377402
rm -f sample_2.fq
378403
ln -s /some/data/sample_1.fq sample_1.fq
379404
ln -s /some/data/sample_2.fq sample_2.fq
380405
'''.stripIndent().rightTrim()
406+
and:
407+
def builder = newBashWrapperBuilder([
408+
workDir: folder,
409+
targetDir: folder,
410+
inputFiles: inputs ])
381411

412+
when:
413+
def binding = builder.makeBinding()
414+
then:
415+
binding.stage_inputs == "# stage input files\nbash ${folder}/.command.stage"
382416

417+
when:
418+
builder.build()
419+
then:
420+
folder.resolve('.command.stage').text == stageScript
421+
422+
cleanup:
423+
SysEnv.pop()
424+
folder?.deleteDir()
383425
}
384426

385427
def 'should unstage outputs' () {
@@ -996,7 +1038,9 @@ class BashWrapperBuilderTest extends Specification {
9961038
def 'should include fix ownership command' () {
9971039

9981040
given:
999-
def bean = Mock(TaskBean)
1041+
def bean = Mock(TaskBean) {
1042+
inputFiles >> [:]
1043+
}
10001044
def copy = Mock(ScriptFileCopyStrategy)
10011045
bean.workDir >> Paths.get('/work/dir')
10021046
and:

tests/checks/stage-file.nf/.checks

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
set -e
2+
3+
#
4+
# run normal mode
5+
#
6+
echo ''
7+
NXF_WRAPPER_STAGE_FILE_THRESHOLD='100' $NXF_RUN | tee stdout
8+
[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process'` == 2 ]] || false
9+
10+
TASK_DIR=`$NXF_CMD log last -F "process == 'bar'"`
11+
[[ `cat $TASK_DIR/.command.stage | wc -l` -eq 19 ]] || false
12+
13+
14+
#
15+
# RESUME mode
16+
#
17+
echo ''
18+
NXF_WRAPPER_STAGE_FILE_THRESHOLD='100' $NXF_RUN -resume | tee stdout
19+
[[ `grep 'INFO' .nextflow.log | grep -c 'Cached process'` == 2 ]] || false
20+
21+
TASK_DIR=`$NXF_CMD log last -F "process == 'bar'"`
22+
[[ `cat $TASK_DIR/.command.stage | wc -l` -eq 19 ]] || false
23+

tests/stage-file.nf

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
2+
process foo {
3+
input:
4+
val n_files
5+
6+
output:
7+
path '*.fastq'
8+
9+
script:
10+
"""
11+
for i in `seq 1 ${n_files}`; do
12+
touch sample_\${i}.fastq
13+
done
14+
"""
15+
}
16+
17+
process bar {
18+
input:
19+
path '*.fastq'
20+
21+
output:
22+
stdout
23+
24+
script:
25+
"""
26+
for f in `ls *.fastq`; do
27+
echo \$f
28+
cat \$f
29+
done
30+
"""
31+
}
32+
33+
workflow {
34+
foo(10) | bar
35+
}

0 commit comments

Comments
 (0)