Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 128 additions & 120 deletions src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf
Original file line number Diff line number Diff line change
@@ -1,67 +1,136 @@
def publishFiles(Map args) {
def key_ = args.get("key")
def _publishingProcessFactory(String wfKey) {
// autodetect process key
def procKeyPrefix = "PublishFiles_${wfKey}"
def scriptMeta = nextflow.script.ScriptMeta.current()
def existing = scriptMeta.getProcessNames().findAll{it.startsWith(procKeyPrefix)}
def numbers = existing.collect{it.replace(procKeyPrefix, "0").toInteger()}
def newNumber = (numbers + [-1]).max() + 1

assert key_ != null : "publishFiles: key must be specified"

workflow publishFilesWf {
take: input_ch
main:
input_ch
| map { tup ->
def id_ = tup[0]
def state_ = tup[1]
def procKey = newNumber == 0 ? procKeyPrefix : "$procKeyPrefix$newNumber"

// the input files and the target output filenames
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
def inputFiles_ = inputoutputFilenames_[0]
def outputFilenames_ = inputoutputFilenames_[1]

[id_, inputFiles_, outputFilenames_]
}
| publishFilesProc
emit: input_ch
if (newNumber > 0) {
log.warn "Key for module '${wfKey}' is duplicated.\n",
"If you run a component multiple times in the same workflow,\n" +
"it's recommended you set a unique key for every call,\n" +
"for example: ${wfKey}.run(key: \"foo\")."
}
return publishFilesWf

def publishDir = getPublishDir()

// generate process string
def procStr =
"""nextflow.enable.dsl=2
|
|process $procKey {
|publishDir path: "$publishDir", mode: "copy"
|tag "\$id"
|input:
| tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|output:
| tuple val(id), path{outputFiles}
|script:
|def copyCommands = [
| inputFiles instanceof List ? inputFiles : [inputFiles],
| outputFiles instanceof List ? outputFiles : [outputFiles]
|]
| .transpose()
| .collectMany{infile, outfile ->
| if (infile.toString() != outfile.toString()) {
| [
| "[ -d \\"\\\$(dirname '\${outfile.toString()}')\\" ] || mkdir -p \\"\\\$(dirname '\${outfile.toString()}')\\"",
| "cp -r '\${infile.toString()}' '\${outfile.toString()}'"
| ]
| } else {
| // no need to copy if infile is the same as outfile
| []
| }
| }
|
|\"\"\"
|echo "Copying output files to destination folder"
|\${copyCommands.join("\\n ")}
|\"\"\"
|}
|""".stripMargin()

// write process to temp file
def tempFile = java.nio.file.Files.createTempFile("viash-process-${procKey}-", ".nf")
// addShutdownHook { java.nio.file.Files.deleteIfExists(tempFile) }
tempFile.text = procStr

// create process from temp file
def binding = new nextflow.script.ScriptBinding([:])
def session = nextflow.Nextflow.getSession()
def parser = new nextflow.script.ScriptParser(session)
.setModule(true)
.setBinding(binding)
def moduleScript = parser.runScript(tempFile)
.getScript()

// register module in meta
def module = new nextflow.script.IncludeDef.Module(name: procKey)
scriptMeta.addModule(moduleScript, module.name, module.alias)

// retrieve and return process from meta
return scriptMeta.getProcess(procKey)
}

process publishFilesProc {
// todo: check publishpath?
publishDir path: "${getPublishDir()}/", mode: "copy"
tag "$id"
input:
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
output:
tuple val(id), path{outputFiles}
script:
def copyCommands = [
inputFiles instanceof List ? inputFiles : [inputFiles],
outputFiles instanceof List ? outputFiles : [outputFiles]
]
.transpose()
.collectMany{infile, outfile ->
if (infile.toString() != outfile.toString()) {
[
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
"cp -r '${infile.toString()}' '${outfile.toString()}'"
]
} else {
// no need to copy if infile is the same as outfile
[]
}
def processStateForPar(par, id_, key_, state_, origState_) {
def plainName_ = par.plainName
// if the state does not contain the key, it's an
// optional argument for which the component did
// not generate any output OR multiple channels were emitted
// and the output was just not added to using the channel
// that is now being parsed
if (!state_.containsKey(plainName_)) {
return []
}
def value = state_[plainName_]
// if the orig state does not contain this filename,
// it's an optional argument for which the user specified
// that it should not be returned as a state
if (!origState_.containsKey(plainName_)) {
return []
}
def filenameTemplate = origState_[plainName_]
// if the pararameter is multiple: true, fetch the template
if (par.multiple && filenameTemplate instanceof List) {
filenameTemplate = filenameTemplate[0]
}
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)

if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
// the index of the file
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
def outputPerFile = value.withIndex().collect{ val, ix ->
def filename_ix = filename.replace("*", ix.toString())
def inputPath = val instanceof File ? val.toPath() : val
[inputPath: inputPath, outputFilename: filename_ix]
}
"""
echo "Copying output files to destination folder"
${copyCommands.join("\n ")}
"""
}
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
[key, outputPerFile.collect{dic -> dic[key]}]
}
return [[key: plainName_] + transposedOutputs]
}

def value_ = java.nio.file.Paths.get(filename)
def inputPath = value instanceof File ? value.toPath() : value
return [[inputPath: [inputPath], outputFilename: [filename]]]

}

// this assumes that the state contains no other values other than those specified in the config
def publishFilesByConfig(Map args) {
def config = args.get("config")
assert config != null : "publishFilesByConfig: config must be specified"

def key_ = args.get("key", config.name)
def parameter_info = args.get("par")
assert parameter_info != null : "publishFilesByConfig: par must be specified"
def key_ = args.get("key")
assert key_ != null : "publishFilesByConfig: key must be specified"

workflow publishFilesSimpleWf {
Expand All @@ -72,79 +141,18 @@ def publishFilesByConfig(Map args) {
def id_ = tup[0]
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']


// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where

// the processed state is a list of [key, inputPath, outputFilename] tuples, where
// - key is a String
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
// - inputPath is a List[Path]
// - outputFilename is a List[String]
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
def processedState =
config.allArguments
.findAll { it.direction == "output" }
.collectMany { par ->
def plainName_ = par.plainName
// if the state does not contain the key, it's an
// optional argument for which the component did
// not generate any output OR multiple channels were emitted
// and the output was just not added to using the channel
// that is now being parsed
if (!state_.containsKey(plainName_)) {
return []
}
def value = state_[plainName_]
// if the parameter is not a file, it should be stored
// in the state as-is, but is not something that needs
// to be copied from the source path to the dest path
if (par.type != "file") {
return [[inputPath: [], outputFilename: []]]
}
// if the orig state does not contain this filename,
// it's an optional argument for which the user specified
// that it should not be returned as a state
if (!origState_.containsKey(plainName_)) {
return []
}
def filenameTemplate = origState_[plainName_]
// if the pararameter is multiple: true, fetch the template
if (par.multiple && filenameTemplate instanceof List) {
filenameTemplate = filenameTemplate[0]
}
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
// the index of the file
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
def outputPerFile = value.withIndex().collect{ val, ix ->
def filename_ix = filename.replace("*", ix.toString())
def inputPath = val instanceof File ? val.toPath() : val
[inputPath: inputPath, outputFilename: filename_ix]
}
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
[key, outputPerFile.collect{dic -> dic[key]}]
}
return [[key: plainName_] + transposedOutputs]
} else {
def value_ = java.nio.file.Paths.get(filename)
def inputPath = value instanceof File ? value.toPath() : value
return [[inputPath: [inputPath], outputFilename: [filename]]]
}
}

// - (inputPath, outputFilename) are the files that will be copied from src to dest
def processedState = processStateForPar(parameter_info, id_, key_, state_, origState_)
def inputPaths = processedState.collectMany{it.inputPath}
def outputFilenames = processedState.collectMany{it.outputFilename}


[id_, inputPaths, outputFilenames]
}
| publishFilesProc
| _publishingProcessFactory(parameter_info.plainName)
emit: input_ch
}
return publishFilesSimpleWf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
| map{ tup ->
tup.take(4)
}

safeJoin(chPublishFiles, chArgsWithDefaults, key_)
def chPublishFilesWithDefaults = safeJoin(chPublishFiles, chArgsWithDefaults, key_)
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
// output tuple format: [id, new_state, orig_state]
| map { tup ->
tup.drop(2).take(3)
}
| publishFilesByConfig(key: key_, config: meta.config)

meta.config.allArguments.findAll {
it.type == "file" && it.direction == "output"
}.each{par ->
chPublishFilesWithDefaults
| publishFilesByConfig(key: key_, par: par)
}

}
// Join the state from the events that were emitted from different channels
def chJoined = chInitialOutputProcessed
Expand Down
Loading