Skip to content

Commit b8cf823

Browse files
committed
Fold PublishIndexOp into PublishOp, add test for OutputDsl,
Signed-off-by: Ben Sherman <[email protected]>
1 parent ef4305d commit b8cf823

File tree

5 files changed

+190
-231
lines changed

5 files changed

+190
-231
lines changed

modules/nextflow/src/main/groovy/nextflow/extension/PublishIndexOp.groovy

Lines changed: 0 additions & 144 deletions
This file was deleted.

modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import groovyx.gpars.dataflow.DataflowReadChannel
2424
import nextflow.Global
2525
import nextflow.Session
2626
import nextflow.processor.PublishDir
27+
import nextflow.util.CsvWriter
2728
/**
2829
* Publish files from a source channel.
2930
*
@@ -37,13 +38,22 @@ class PublishOp {
3738

3839
private PublishDir publisher
3940

41+
private Path targetDir
42+
43+
private IndexOpts indexOpts
44+
45+
private List indexRecords = []
46+
4047
private volatile boolean complete
4148

4249
private Session getSession() { Global.session as Session }
4350

4451
PublishOp(DataflowReadChannel source, Map opts) {
4552
this.source = source
4653
this.publisher = PublishDir.create(opts)
54+
this.targetDir = opts.path as Path
55+
if( opts.index )
56+
this.indexOpts = new IndexOpts(targetDir, opts.index as Map)
4757
}
4858

4959
boolean getComplete() { complete }
@@ -64,13 +74,32 @@ class PublishOp {
6474
final files = entry.value
6575
publisher.apply(files, sourceDir)
6676
}
77+
78+
if( indexOpts ) {
79+
final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value
80+
final normalized = normalizePaths(record)
81+
log.trace "Normalized record for index file: ${normalized}"
82+
indexRecords << normalized
83+
}
6784
}
6885

6986
protected void onComplete(nope) {
87+
if( indexOpts && indexRecords.size() > 0 ) {
88+
log.trace "Saving records to index file: ${indexRecords}"
89+
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path)
90+
session.notifyFilePublish(indexOpts.path)
91+
}
92+
7093
log.trace "Publish operator complete"
7194
this.complete = true
7295
}
7396

97+
/**
98+
* Extract files from a received value for publishing.
99+
*
100+
* @param result
101+
* @param value
102+
*/
74103
protected Map<Path,Set<Path>> collectFiles(Map<Path,Set<Path>> result, value) {
75104
if( value instanceof Path ) {
76105
final sourceDir = getTaskDir(value)
@@ -86,9 +115,47 @@ class PublishOp {
86115
}
87116

88117
/**
89-
* Given a path try to infer the task directory to which the path below
90-
* ie. the directory starting with a workflow work dir and having at lest
91-
* two sub-directories eg work-dir/xx/yyyyyy/etc
118+
* Normalize the paths in a record by converting
119+
* work directory paths to publish paths.
120+
*
121+
* @param value
122+
*/
123+
protected Object normalizePaths(value) {
124+
if( value instanceof Path )
125+
return List.of(normalizePath(value))
126+
127+
if( value instanceof Collection ) {
128+
return value.collect { el ->
129+
if( el instanceof Path )
130+
return normalizePath(el)
131+
if( el instanceof Collection<Path> )
132+
return normalizePaths(el)
133+
return el
134+
}
135+
}
136+
137+
if( value instanceof Map ) {
138+
return value.collectEntries { k, v ->
139+
if( v instanceof Path )
140+
return List.of(k, normalizePath(v))
141+
if( v instanceof Collection<Path> )
142+
return List.of(k, normalizePaths(v))
143+
return List.of(k, v)
144+
}
145+
}
146+
147+
throw new IllegalArgumentException("Index file record must be a list, map, or file: ${value} [${value.class.simpleName}]")
148+
}
149+
150+
private Path normalizePath(Path path) {
151+
final sourceDir = getTaskDir(path)
152+
return targetDir.resolve(sourceDir.relativize(path))
153+
}
154+
155+
/**
156+
* Try to infer the parent task directory to which a path belongs. It
157+
* should be a directory starting with a session work dir and having
158+
* at lest two sub-directories, e.g. work/ab/cdef/etc
92159
*
93160
* @param path
94161
*/
@@ -111,4 +178,22 @@ class PublishOp {
111178
return null
112179
}
113180

181+
static class IndexOpts {
182+
Path path
183+
Closure mapper
184+
def /* boolean | List<String> */ header = false
185+
String sep = ','
186+
187+
IndexOpts(Path targetDir, Map opts) {
188+
this.path = targetDir.resolve(opts.path as String)
189+
190+
if( opts.mapper )
191+
this.mapper = opts.mapper as Closure
192+
if( opts.header != null )
193+
this.header = opts.header
194+
if( opts.sep )
195+
this.sep = opts.sep as String
196+
}
197+
}
198+
114199
}

modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import nextflow.exception.ScriptRuntimeException
2626
import nextflow.extension.CH
2727
import nextflow.extension.MixOp
2828
import nextflow.extension.PublishOp
29-
import nextflow.extension.PublishIndexOp
29+
import nextflow.file.FileHelper
3030

3131
/**
3232
* Implements the DSL for publishing workflow outputs
@@ -43,10 +43,12 @@ class OutputDsl {
4343

4444
private Map defaults = [:]
4545

46+
private volatile List<PublishOp> ops = []
47+
4648
void directory(String directory) {
4749
if( this.directory )
4850
throw new ScriptRuntimeException("Publish directory cannot be defined more than once in the workflow publish definition")
49-
this.directory = (directory as Path).complete()
51+
this.directory = FileHelper.toCanonicalPath(directory)
5052
}
5153

5254
void contentType(String value) {
@@ -120,22 +122,13 @@ class OutputDsl {
120122
: sources.first()
121123
final opts = publishOptions(name, publishConfigs[name] ?: [:])
122124

123-
new PublishOp(CH.getReadChannel(mixed), opts).apply()
124-
125-
if( opts.index ) {
126-
final basePath = opts.path as Path
127-
final indexOpts = opts.index as Map
128-
final indexPath = indexOpts.path as String
129-
if( !indexPath )
130-
throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option")
131-
new PublishIndexOp(CH.getReadChannel(mixed), basePath, indexPath, indexOpts).apply()
132-
}
125+
ops << new PublishOp(CH.getReadChannel(mixed), opts).apply()
133126
}
134127
}
135128

136129
private Map publishOptions(String name, Map overrides) {
137130
if( !directory )
138-
directory = Paths.get('.').complete()
131+
directory = FileHelper.toCanonicalPath('.')
139132

140133
final opts = defaults + overrides
141134
if( opts.containsKey('ignoreErrors') )
@@ -147,9 +140,20 @@ class OutputDsl {
147140
if( path.startsWith('/') )
148141
throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should be a relative path")
149142
opts.path = directory.resolve(path)
143+
144+
if( opts.index && !(opts.index as Map).path )
145+
throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option")
146+
150147
return opts
151148
}
152149

150+
boolean getComplete() {
151+
for( final op : ops )
152+
if( !op.complete )
153+
return false
154+
return true
155+
}
156+
153157
static class TargetDsl {
154158

155159
private Map opts = [:]

0 commit comments

Comments
 (0)