From c3c05372ad8de5f5e6bb1daacb4471550c570d6a Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 1 May 2022 19:26:54 +0200 Subject: [PATCH 01/16] Topic channel WIP Signed-off-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/Channel.groovy | 4 ++ .../main/groovy/nextflow/extension/CH.groovy | 53 ++++++++++++++++++- .../groovy/nextflow/extension/MixOp.groovy | 17 +++++- .../groovy/nextflow/script/ProcessDef.groovy | 11 ++-- .../script/params/BaseOutParam.groovy | 11 ++++ .../script/params/ParamsOutTest.groovy | 3 +- 6 files changed, 92 insertions(+), 7 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index 822ce03537..31579e594a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -121,6 +121,10 @@ class Channel { return CH.queue() } + static DataflowWriteChannel topic(String name) { + return CH.topic(name) + } + /** * Create a empty channel i.e. only emits a STOP signal * diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index 2ba44edde4..4b028a8ee7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -1,5 +1,7 @@ package nextflow.extension +import static nextflow.Channel.empty + import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j @@ -30,7 +32,15 @@ class CH { return (Session) Global.session } - static private Map bridges = new HashMap<>(10) + static class Topic { + String name + DataflowBroadcast broadcaster = new DataflowBroadcast() + List writers = new ArrayList<>(10) + } + + static final private List allTopics = new ArrayList<>(10) + + static final private Map bridges = new HashMap<>(10) static DataflowReadChannel getReadChannel(channel) { if (channel instanceof DataflowQueue) @@ -74,6 +84,18 @@ class CH { def broadcast = bridges.get(queue) queue.into(broadcast) } + + // connect all topics + for( Topic topic : allTopics ) { + if( topic.writers ) { + def ch = new ArrayList(topic.writers) + if( ch.size()==1 ) ch.add(empty()) + new MixOp(ch.collect(it -> getReadChannel(it))).withTarget(topic.broadcaster).apply() + } + else { + topic.broadcaster.bind(Channel.STOP) + } + } } static void init() { bridges.clear() } @@ -104,6 +126,35 @@ class CH { return new DataflowQueue() } + static DataflowBroadcast topic(String name) { + if( !NF.isDsl2() ) + throw new IllegalStateException("Channel 'topic' is only available with DSL2") + synchronized (allTopics) { + def topic = allTopics.find(it -> it.name == name) + if( topic!=null ) + return topic.broadcaster + // create a new topic + topic = new Topic(name:name) + allTopics.add(topic) + return topic.broadcaster + } + } + + static DataflowWriteChannel topicWriter(String name) { + if( !NF.isDsl2() ) + throw new IllegalStateException("Channel 'topic' is only available with DSL2") + synchronized (allTopics) { + def topic = allTopics.find(it -> it.name == name) + if( topic==null ) { + topic = new Topic(name:name) + allTopics.add(topic) + } + def result = CH.create() + topic.writers.add(result) + return result + } + } + static boolean isChannel(obj) { obj instanceof DataflowReadChannel || obj instanceof DataflowWriteChannel } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy index f8e0aed304..4c3f70d192 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy @@ -25,7 +25,6 @@ import groovy.transform.CompileStatic import groovyx.gpars.dataflow.DataflowReadChannel import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Channel - /** * Implements Nextflow Mix operator * @@ -36,6 +35,7 @@ class MixOp { private DataflowReadChannel source private List others + private DataflowWriteChannel target MixOp(DataflowReadChannel source, DataflowReadChannel other) { this.source = source @@ -47,8 +47,21 @@ class MixOp { this.others = others.toList() } + MixOp(List channels) { + if( channels.size()<2 ) + throw new IllegalArgumentException("Mix operator requires at least 2 source channels") + this.source = channels.get(0) + this.others = channels.subList(1, channels.size()) + } + + MixOp withTarget(DataflowWriteChannel target) { + this.target = target + return this + } + DataflowWriteChannel apply() { - def target = CH.create() + if( target == null ) + target = CH.create() def count = new AtomicInteger( others.size()+1 ) def handlers = [ onNext: { target << it }, diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index 7cfc6309cc..6e5032b14f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -28,7 +28,6 @@ import nextflow.script.params.BaseOutParam import nextflow.script.params.EachInParam import nextflow.script.params.InputsList import nextflow.script.params.OutputsList - /** * Models a nextflow process definition * @@ -194,8 +193,14 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { throw new ScriptRuntimeException("Process `$processName` inputs and outputs do not have the same cardinality - Feedback loop is not supported" ) for(int i=0; i Date: Mon, 30 Oct 2023 18:51:01 +0100 Subject: [PATCH 02/16] Fix typo Signed-off-by: Paolo Di Tommaso --- .../nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index 6e5032b14f..0e5c0a4fa5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -196,7 +196,7 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { final param = (declaredOutputs[i] as BaseOutParam) final topicName = param.channelTopicName if( topicName && feedbackChannels ) - throw new IllegalArgumentException("Output topic conflict with recursion feature - check process '$name' should not declared any outout 'topic'" ) + throw new IllegalArgumentException("Output topic conflict with recursion feature - check process '$name' should not declared any output 'topic'" ) final ch = feedbackChannels ? feedbackChannels[i] : ( topicName ? CH.topicWriter(topicName) : CH.create(singleton) ) From 93b449c632c9169b304994e3cb73b15d90ac0f1d Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 30 Oct 2023 18:51:49 +0100 Subject: [PATCH 03/16] Remove deprecated code [ci fast] Signed-off-by: Paolo Di Tommaso --- modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index 4b028a8ee7..2426c1695b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -127,8 +127,6 @@ class CH { } static DataflowBroadcast topic(String name) { - if( !NF.isDsl2() ) - throw new IllegalStateException("Channel 'topic' is only available with DSL2") synchronized (allTopics) { def topic = allTopics.find(it -> it.name == name) if( topic!=null ) @@ -141,8 +139,6 @@ class CH { } static DataflowWriteChannel topicWriter(String name) { - if( !NF.isDsl2() ) - throw new IllegalStateException("Channel 'topic' is only available with DSL2") synchronized (allTopics) { def topic = allTopics.find(it -> it.name == name) if( topic==null ) { From c73a7aa2e2479c5ac91d8797fd28ea8f173d99e3 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 1 Nov 2023 14:38:14 +0100 Subject: [PATCH 04/16] Improve support for topic channels Signed-off-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/Channel.groovy | 1 + .../src/main/groovy/nextflow/NF.groovy | 4 +++ .../main/groovy/nextflow/NextflowMeta.groovy | 7 +++++ .../nextflow/ast/NextflowDSLImpl.groovy | 10 ++++-- .../main/groovy/nextflow/extension/CH.groovy | 31 +++++++++++++------ .../script/params/BaseOutParam.groovy | 5 +++ 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index 31579e594a..84c0d2bf0e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -122,6 +122,7 @@ class Channel { } static DataflowWriteChannel topic(String name) { + if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS) return CH.topic(name) } diff --git a/modules/nextflow/src/main/groovy/nextflow/NF.groovy b/modules/nextflow/src/main/groovy/nextflow/NF.groovy index 33836c6c62..3e02805646 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NF.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NF.groovy @@ -72,4 +72,8 @@ class NF { static boolean isRecurseEnabled() { NextflowMeta.instance.preview.recursion } + + static boolean isTopicChannelEnabled() { + NextflowMeta.instance.preview.topic + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy index 73783af860..b642697709 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy @@ -43,6 +43,7 @@ class NextflowMeta { volatile float dsl boolean strict boolean recursion + boolean topic void setDsl( float num ) { if( num == 1 ) @@ -59,6 +60,12 @@ class NextflowMeta { log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" this.recursion = recurse } + + void setTopic(Boolean value) { + if( topic ) + log.warn "TOPIC CHANNELS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" + this.topic = value + } } static class Features implements Flags { diff --git a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy index ecb11da515..73dbbc0aa2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy @@ -981,15 +981,16 @@ class NextflowDSLImpl implements ASTTransformation { /** * Transform a map entry `emit: something` into `emit: 'something' + * and `topic: something` into `topic: 'something' * (ie. as a constant) in a map expression passed as argument to * a method call. This allow the syntax * * output: - * path 'foo', emit: bar + * path 'foo', emit: bar, topic: baz * * @param call */ - protected void fixOutEmitOption(MethodCallExpression call) { + protected void fixOutEmitAndTopicOptions(MethodCallExpression call) { List args = isTupleX(call.arguments)?.expressions if( !args ) return if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return @@ -1002,6 +1003,9 @@ class NextflowDSLImpl implements ASTTransformation { if( key?.text == 'emit' && val ) { map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text)) } + else if( key?.text == 'topic' && val ) { + map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text)) + } } } @@ -1021,7 +1025,7 @@ class NextflowDSLImpl implements ASTTransformation { // prefix the method name with the string '_out_' methodCall.setMethod( new ConstantExpression('_out_' + methodName) ) fixMethodCall(methodCall) - fixOutEmitOption(methodCall) + fixOutEmitAndTopicOptions(methodCall) } else if( methodName in ['into','mode'] ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index 2426c1695b..390064d34c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -1,6 +1,6 @@ package nextflow.extension -import static nextflow.Channel.empty +import static nextflow.Channel.* import groovy.transform.CompileStatic import groovy.transform.PackageScope @@ -17,8 +17,6 @@ import nextflow.Channel import nextflow.Global import nextflow.NF import nextflow.Session -import static nextflow.Channel.STOP - /** * Helper class to handle channel internal api ops * @@ -78,22 +76,37 @@ class CH { } static void broadcast() { - // connect all dataflow queue variables to associated broadcast channel + // connect all broadcast topics, note this must be before the following + // "bridging" step because it can modify the final network topology + connectTopics() + // bridge together all broadcast channels + bridgeChannels() + } + + static private void bridgeChannels() { + // connect all dataflow queue variables to associated broadcast channel for( DataflowQueue queue : bridges.keySet() ) { log.trace "Bridging dataflow queue=$queue" def broadcast = bridges.get(queue) queue.into(broadcast) } + } - // connect all topics + static private void connectTopics() { for( Topic topic : allTopics ) { if( topic.writers ) { - def ch = new ArrayList(topic.writers) - if( ch.size()==1 ) ch.add(empty()) - new MixOp(ch.collect(it -> getReadChannel(it))).withTarget(topic.broadcaster).apply() + // the list of all writing dataflow queues for this topic + final ch = new ArrayList(topic.writers) + // the mix operator requires at least two sources, add an empty channel if needed + if( ch.size()==1 ) + ch.add(empty()) + // get a list of sources for the mix operator + final sources = ch.collect(it -> getReadChannel(it)) + // mix all of them + new MixOp(sources).withTarget(topic.broadcaster).apply() } else { - topic.broadcaster.bind(Channel.STOP) + topic.broadcaster.bind(STOP) } } } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/params/BaseOutParam.groovy b/modules/nextflow/src/main/groovy/nextflow/script/params/BaseOutParam.groovy index d781da5262..bbbd9cd785 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/params/BaseOutParam.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/params/BaseOutParam.groovy @@ -219,6 +219,11 @@ abstract class BaseOutParam extends BaseParam implements OutParam { throw new IllegalArgumentException("Output `topic` option it not allowed in tuple components") if( !name ) throw new IllegalArgumentException("Missing output `topic` name") + if( !ConfigHelper.isValidIdentifier(name) ) { + final msg = "Output topic '$name' is not a valid name -- Make sure it starts with an alphabetic or underscore character and it does not contain any blank, dot or other special characters" + throw new IllegalArgumentException(msg) + } + this.channelTopicName = name return this } From 358224fc7be7a9245e5dd29b318061b4e304fbb3 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 1 Nov 2023 15:00:22 +0100 Subject: [PATCH 05/16] Add tests [ci fast] Signed-off-by: Paolo Di Tommaso --- .../nextflow/script/params/OutParam.groovy | 2 + .../script/params/ParamsOutTest.groovy | 80 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/modules/nextflow/src/main/groovy/nextflow/script/params/OutParam.groovy b/modules/nextflow/src/main/groovy/nextflow/script/params/OutParam.groovy index 7f17a5ea3b..2a1570a2f3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/params/OutParam.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/params/OutParam.groovy @@ -51,4 +51,6 @@ interface OutParam extends Cloneable { String getChannelEmitName() + String getChannelTopicName() + } diff --git a/modules/nextflow/src/test/groovy/nextflow/script/params/ParamsOutTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/params/ParamsOutTest.groovy index ce75d7508e..f907d61e11 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/params/ParamsOutTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/params/ParamsOutTest.groovy @@ -1197,4 +1197,84 @@ class ParamsOutTest extends Dsl2Spec { outs[2].inner[1].name == 'bar' } + + + def 'should define out with topic' () { + setup: + def text = ''' + process hola { + output: + val x, topic: ch0 + env FOO, topic: ch1 + path '-', topic: ch2 + stdout topic: ch3 + /return/ + } + + workflow { hola() } + ''' + + def binding = [:] + def process = parseAndReturnProcess(text, binding) + + when: + def outs = process.config.getOutputs() as List + then: + outs[0].name == 'x' + outs[0].channelTopicName == 'ch0' + and: + outs[1].name == 'FOO' + outs[1].channelTopicName == 'ch1' + and: + outs[2] instanceof StdOutParam // <-- note: declared as `path`, turned into a `stdout` + outs[2].name == '-' + outs[2].channelTopicName == 'ch2' + and: + outs[3] instanceof StdOutParam + outs[3].name == '-' + outs[3].channelTopicName == 'ch3' + } + + def 'should define out tuple with topic'() { + + setup: + def text = ''' + process hola { + output: + tuple val(x), val(y), topic: ch1 + tuple path('foo'), topic: ch2 + tuple stdout,env(bar), topic: ch3 + + /return/ + } + + workflow { hola() } + ''' + + def binding = [:] + def process = parseAndReturnProcess(text, binding) + + when: + def outs = process.config.getOutputs() as List + + then: + outs[0].name == 'tupleoutparam<0>' + outs[0].channelTopicName == 'ch1' + outs[0].inner[0] instanceof ValueOutParam + outs[0].inner[0].name == 'x' + outs[0].inner[1] instanceof ValueOutParam + outs[0].inner[1].name == 'y' + and: + outs[1].name == 'tupleoutparam<1>' + outs[1].channelTopicName == 'ch2' + outs[1].inner[0] instanceof FileOutParam + and: + outs[2].name == 'tupleoutparam<2>' + outs[2].channelTopicName == 'ch3' + outs[2].inner[0] instanceof StdOutParam + outs[2].inner[0].name == '-' + outs[2].inner[1] instanceof EnvOutParam + outs[2].inner[1].name == 'bar' + + } } From ec156bc68232e48ba2462ebbfcf2b15af975c8e2 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 9 Nov 2023 05:37:55 -0600 Subject: [PATCH 06/16] Add integration test, documentation Signed-off-by: Ben Sherman --- docs/channel.md | 41 +++++++++++++ docs/config.md | 9 +++ docs/process.md | 81 ++++++++++++------------- docs/workflow.md | 4 +- tests/checks/topic-channel.nf/.checks | 15 +++++ tests/checks/topic-channel.nf/.expected | 2 + tests/topic-channel.nf | 37 +++++++++++ 7 files changed, 145 insertions(+), 44 deletions(-) create mode 100644 tests/checks/topic-channel.nf/.checks create mode 100644 tests/checks/topic-channel.nf/.expected create mode 100644 tests/topic-channel.nf diff --git a/docs/channel.md b/docs/channel.md index 754d48c4ad..7c57cec49e 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -424,6 +424,47 @@ Y See also: [fromList](#fromlist) factory method. +(channel-topic)= + +### topic + +:::{versionadded} 23.11.0-edge +::: + +:::{note} +This feature requires the `nextflow.preview.topic` feature flag to be enabled. +::: + +The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from multiple sources. + +Any process can send items to a topic by using the `topic` option on an output: + +```groovy +process foo { + output: + val('foo'), topic: 'my-topic' +} + +process bar { + output: + val('bar'), topic: 'my-topic' +} +``` + +Then, the `topic` method can be used to consume all items in the topic: + +```groovy +Channel.topic('my-topic').view() +``` + +This approach is a convenient way to collect outputs from many sources without having to write the necessary channel logic. You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other. + +:::{warning} +Any process that consumes a topic channel should not send any outputs to that topic, or else the pipeline will hang forever. +::: + +See also: {ref}`process-additional-options` for process outputs. + (channel-value)= ### value diff --git a/docs/config.md b/docs/config.md index 954bb43a28..e41ff2442c 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1806,3 +1806,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview` : *Experimental: may change in a future release.* : When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information. + +`nextflow.preview.topic` + +: :::{versionadded} 23.11.0-edge + ::: + +: *Experimental: may change in a future release.* + +: When `true`, enables {ref}`topic channels `. diff --git a/docs/process.md b/docs/process.md index 73d6dec191..5848997886 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1166,62 +1166,59 @@ process foo { ``` ::: -### Optional outputs +(process-additional-options)= -In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced: +### Additional options -```groovy -output: - path("output.txt"), optional: true -``` +The following options are available for all process outputs: -In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`. +`emit: ` -(process-multiple-outputs)= +: Defines the name of the output channel, which can be used to access the channel by name from the process output: -### Multiple outputs + ```groovy + process FOO { + output: + path 'hello.txt', emit: hello + path 'bye.txt', emit: bye + + """ + echo "hello" > hello.txt + echo "bye" > bye.txt + """ + } -When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero): + workflow { + FOO() + FOO.out.hello.view() + } + ``` -```groovy -process FOO { - output: - path 'bye_file.txt' - path 'hi_file.txt' + See {ref}`workflow-process-invocation` for more details. - """ - echo "bye" > bye_file.txt - echo "hi" > hi_file.txt - """ -} +`optional: true | false` -workflow { - FOO() - FOO.out[1].view() -} -``` +: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel. -You can also use the `emit` option to assign a name to each output and access them by name: + ```groovy + output: + path("output.txt"), optional: true + ``` -```groovy -process FOO { - output: - path 'bye_file.txt', emit: bye_file - path 'hi_file.txt', emit: hi_file + In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`. - """ - echo "bye" > bye_file.txt - echo "hi" > hi_file.txt - """ -} +: :::{note} + While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional. + ::: -workflow { - FOO() - FOO.out.hi_file.view() -} -``` +`topic: ` + +: :::{versionadded} 23.11.0-edge + ::: + +: *Experimental: may change in a future release.* -See {ref}`workflow-process-invocation` for more details. +: Defines the {ref}`topic channel ` to which the output will be sent. Cannot be used with the `emit` option on the same output. ## When diff --git a/docs/workflow.md b/docs/workflow.md index a81dcc1a83..38b311689d 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -104,7 +104,7 @@ workflow { } ``` -When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below). +When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below). The process output(s) can also be accessed like the return value of a function: @@ -144,7 +144,7 @@ workflow { } ``` -See {ref}`process-multiple-outputs` for more details. +See {ref}`process outputs ` for more details. ### Process named stdout diff --git a/tests/checks/topic-channel.nf/.checks b/tests/checks/topic-channel.nf/.checks new file mode 100644 index 0000000000..425cb15ae2 --- /dev/null +++ b/tests/checks/topic-channel.nf/.checks @@ -0,0 +1,15 @@ +# +# initial run +# +echo Initial run +$NXF_RUN + +cmp versions.txt .expected || false + +# +# Resumed run +# +echo Resumed run +$NXF_RUN -resume + +cmp versions.txt .expected || false diff --git a/tests/checks/topic-channel.nf/.expected b/tests/checks/topic-channel.nf/.expected new file mode 100644 index 0000000000..1bb4d6b958 --- /dev/null +++ b/tests/checks/topic-channel.nf/.expected @@ -0,0 +1,2 @@ +bar: 0.9.0 +foo: 0.1.0 diff --git a/tests/topic-channel.nf b/tests/topic-channel.nf new file mode 100644 index 0000000000..08f11a3ddb --- /dev/null +++ b/tests/topic-channel.nf @@ -0,0 +1,37 @@ + +nextflow.preview.topic = true + +process foo { + input: + val(index) + + output: + stdout topic: versions + + script: + """ + echo 'foo: 0.1.0' + """ +} + +process bar { + input: + val(index) + + output: + stdout topic: versions + + script: + """ + echo 'bar: 0.9.0' + """ +} + +workflow { + Channel.of( 1..3 ) | foo + Channel.of( 1..3 ) | bar + + Channel.topic('versions') + | unique + | collectFile(name: 'versions.txt', sort: true, storeDir: '.') +} From e367d8229cddfac0c7e5d3d7f1cbe16fcadfa7c6 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 16 Nov 2023 06:30:47 -0600 Subject: [PATCH 07/16] Update channel.md Signed-off-by: Ben Sherman --- docs/channel.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/channel.md b/docs/channel.md index 7c57cec49e..2c5b30c648 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -435,7 +435,7 @@ See also: [fromList](#fromlist) factory method. This feature requires the `nextflow.preview.topic` feature flag to be enabled. ::: -The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from multiple sources. +The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from many sources implicitly based on a "topic" identifier. Any process can send items to a topic by using the `topic` option on an output: @@ -457,7 +457,7 @@ Then, the `topic` method can be used to consume all items in the topic: Channel.topic('my-topic').view() ``` -This approach is a convenient way to collect outputs from many sources without having to write the necessary channel logic. You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other. +This approach is a convenient way to collect related items from many different sources without explicitly defining the channel logic (the `topic` method is essentially an implicit {ref}`operator-mix` operation). You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other. :::{warning} Any process that consumes a topic channel should not send any outputs to that topic, or else the pipeline will hang forever. From 490a598281a47afd780d2c09216be755ade39e4b Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 16 Nov 2023 06:58:44 -0600 Subject: [PATCH 08/16] Refactor topic list to map, minor edits Signed-off-by: Ben Sherman --- .../main/groovy/nextflow/extension/CH.groovy | 41 +++++++++---------- .../groovy/nextflow/script/ProcessDef.groovy | 2 +- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index a4d9762cc6..daa78d378c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -31,12 +31,11 @@ class CH { } static class Topic { - String name - DataflowBroadcast broadcaster = new DataflowBroadcast() - List writers = new ArrayList<>(10) + List sources = new ArrayList<>(10) + DataflowBroadcast target = new DataflowBroadcast() } - static final private List allTopics = new ArrayList<>(10) + static final private Map allTopics = new HashMap<>(10) static final private Map bridges = new HashMap<>(10) @@ -91,20 +90,20 @@ class CH { } static private void connectTopics() { - for( Topic topic : allTopics ) { - if( topic.writers ) { - // the list of all writing dataflow queues for this topic - final ch = new ArrayList(topic.writers) + for( Topic topic : allTopics.values() ) { + if( topic.sources ) { + // get the list of source channels for this topic + final ch = new ArrayList(topic.sources) // the mix operator requires at least two sources, add an empty channel if needed if( ch.size()==1 ) ch.add(empty()) - // get a list of sources for the mix operator + // map write channels to read channels final sources = ch.collect(it -> getReadChannel(it)) // mix all of them - new MixOp(sources).withTarget(topic.broadcaster).apply() + new MixOp(sources).withTarget(topic.target).apply() } else { - topic.broadcaster.bind(STOP) + topic.target.bind(STOP) } } } @@ -139,25 +138,23 @@ class CH { static DataflowBroadcast topic(String name) { synchronized (allTopics) { - def topic = allTopics.find(it -> it.name == name) + def topic = allTopics[name] if( topic!=null ) - return topic.broadcaster + return topic.target // create a new topic - topic = new Topic(name:name) - allTopics.add(topic) - return topic.broadcaster + topic = new Topic() + allTopics[name] = topic + return topic.target } } static DataflowWriteChannel topicWriter(String name) { synchronized (allTopics) { - def topic = allTopics.find(it -> it.name == name) - if( topic==null ) { - topic = new Topic(name:name) - allTopics.add(topic) - } + if( name !in allTopics ) + allTopics[name] = new Topic() + def topic = allTopics[name] def result = CH.create() - topic.writers.add(result) + topic.sources.add(result) return result } } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index 0e5c0a4fa5..f2ff8c0d6e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -196,7 +196,7 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { final param = (declaredOutputs[i] as BaseOutParam) final topicName = param.channelTopicName if( topicName && feedbackChannels ) - throw new IllegalArgumentException("Output topic conflict with recursion feature - check process '$name' should not declared any output 'topic'" ) + throw new IllegalArgumentException("Output topic conflicts with recursion feature - process `$processName` should not declare any output topic" ) final ch = feedbackChannels ? feedbackChannels[i] : ( topicName ? CH.topicWriter(topicName) : CH.create(singleton) ) From c48656b888187946140ece9636f412792f824c18 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 17 Nov 2023 05:04:00 -0600 Subject: [PATCH 09/16] Update docs Signed-off-by: Ben Sherman --- docs/channel.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/channel.md b/docs/channel.md index 2c5b30c648..2b4fe6f049 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -435,9 +435,9 @@ See also: [fromList](#fromlist) factory method. This feature requires the `nextflow.preview.topic` feature flag to be enabled. ::: -The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from many sources implicitly based on a "topic" identifier. +The `topic` method is used to create a queue channel that receives the items from all process outputs in a particular "topic". -Any process can send items to a topic by using the `topic` option on an output: +A process output can be assigned to a topic using the `topic` option on an output: ```groovy process foo { From 11ee5ed9890791a790db7f81046ea3c15bc486dd Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Sun, 19 Nov 2023 22:04:21 -0600 Subject: [PATCH 10/16] Fix docs Signed-off-by: Ben Sherman --- docs/process.md | 2 +- tests/topic-channel.nf | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/process.md b/docs/process.md index 5848997886..19c92adc72 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1218,7 +1218,7 @@ The following options are available for all process outputs: : *Experimental: may change in a future release.* -: Defines the {ref}`topic channel ` to which the output will be sent. Cannot be used with the `emit` option on the same output. +: Defines the {ref}`topic channel ` to which the output will be sent. ## When diff --git a/tests/topic-channel.nf b/tests/topic-channel.nf index 08f11a3ddb..c5eb17bc97 100644 --- a/tests/topic-channel.nf +++ b/tests/topic-channel.nf @@ -6,7 +6,7 @@ process foo { val(index) output: - stdout topic: versions + stdout emit: versions, topic: versions script: """ @@ -19,7 +19,7 @@ process bar { val(index) output: - stdout topic: versions + stdout emit: versions, topic: versions script: """ From 27a4a0f9734589d71116a38a70ceb39fff217440 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Sun, 19 Nov 2023 22:06:26 -0600 Subject: [PATCH 11/16] Refactor "topic channel" -> "channel topic" Signed-off-by: Ben Sherman --- docs/channel.md | 2 +- docs/config.md | 2 +- docs/process.md | 2 +- modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/channel.md b/docs/channel.md index 2b4fe6f049..bc351e4b66 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -460,7 +460,7 @@ Channel.topic('my-topic').view() This approach is a convenient way to collect related items from many different sources without explicitly defining the channel logic (the `topic` method is essentially an implicit {ref}`operator-mix` operation). You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other. :::{warning} -Any process that consumes a topic channel should not send any outputs to that topic, or else the pipeline will hang forever. +Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever. ::: See also: {ref}`process-additional-options` for process outputs. diff --git a/docs/config.md b/docs/config.md index 6dec511162..84bea8a9b3 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1817,4 +1817,4 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview` : *Experimental: may change in a future release.* -: When `true`, enables {ref}`topic channels `. +: When `true`, enables {ref}`channel topics `. diff --git a/docs/process.md b/docs/process.md index 19c92adc72..e9fbfa7900 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1218,7 +1218,7 @@ The following options are available for all process outputs: : *Experimental: may change in a future release.* -: Defines the {ref}`topic channel ` to which the output will be sent. +: Defines the {ref}`channel topic ` to which the output will be sent. ## When diff --git a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy index 8074dd968a..bd350b945d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy @@ -63,7 +63,7 @@ class NextflowMeta { void setTopic(Boolean value) { if( topic ) - log.warn "TOPIC CHANNELS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" + log.warn "CHANNEL TOPICS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" this.topic = value } } From 70a480e40f91b5fc4f08d7f3026244a5cbf9f94a Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 20 Nov 2023 14:36:28 +0100 Subject: [PATCH 12/16] Minor refactor [ci fast] Signed-off-by: Paolo Di Tommaso --- .../main/groovy/nextflow/extension/CH.groovy | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index daa78d378c..c0d4c6cb9c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -31,8 +31,8 @@ class CH { } static class Topic { + DataflowBroadcast broadcaster = new DataflowBroadcast() List sources = new ArrayList<>(10) - DataflowBroadcast target = new DataflowBroadcast() } static final private Map allTopics = new HashMap<>(10) @@ -100,10 +100,10 @@ class CH { // map write channels to read channels final sources = ch.collect(it -> getReadChannel(it)) // mix all of them - new MixOp(sources).withTarget(topic.target).apply() + new MixOp(sources).withTarget(topic.broadcaster).apply() } else { - topic.target.bind(STOP) + topic.broadcaster.bind(STOP) } } } @@ -140,20 +140,22 @@ class CH { synchronized (allTopics) { def topic = allTopics[name] if( topic!=null ) - return topic.target + return topic.broadcaster // create a new topic topic = new Topic() allTopics[name] = topic - return topic.target + return topic.broadcaster } } static DataflowWriteChannel topicWriter(String name) { synchronized (allTopics) { - if( name !in allTopics ) - allTopics[name] = new Topic() - def topic = allTopics[name] - def result = CH.create() + def topic = allTopics.get(name) + if( topic==null ) { + topic = new Topic() + allTopics.put(name, topic) + } + final result = CH.create() topic.sources.add(result) return result } From b490e33137daa647a71cece099370ff4087a1e4d Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Wed, 22 Nov 2023 16:01:57 +0100 Subject: [PATCH 13/16] Remove unused test file Signed-off-by: Paolo Di Tommaso --- validation/google_credentials.gpg | Bin 1726 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 validation/google_credentials.gpg diff --git a/validation/google_credentials.gpg b/validation/google_credentials.gpg deleted file mode 100644 index b0dfd31a26150fda8afee59ba09389dc0f353239..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1726 zcmV;v20{6Z4Fm}T0g0{CpW8S<7j3FZ$gYUPUUD zMLs+pltI<3&pX+HsP8iE<=Vaygaf!oQ0f6JqxcmjpQA3gfh@Ly%oHtghHeq81b9Su z*k`B-<{6&*%N_(?UK$q-p({z<&Wnw)RM+ifaZSv&aa;C(v2)DiNsk58ijs=9Sr$2AHu^XM~E(I_av{&JPoUm~*3^(FmujYp8|9 z5b_suQUsz&rgb}oG{Pt4Hh|C9ZlYtSLZAl7VzIYS>xQ)tH&g-&*1Lnpa^IbcYAm03 z2lt*J7%Q=x4}`2qC%~NZUo&7^T`uRLDq-C7W1-VI#!BsJ%#-0140TR|qcwP6%0>BwZ0RvVApB~iHf3DAuM2gB^Qw`tzEh!pdp zYIkSCMq7}Qqqyi`GqHmu9d3=EvsQs44|;WXCk?e&OOEip0I2y((S-&h?4#Y2bA@c8 z2<-tNIFK31#uxX4S+t429+^YFH;t;ZeR%hT zA-H<@PEkmkM*cS|PZqOk{MA4F3XxUz{cBq9I)`OYk$f~ik>w|@u(%QVKgrk3`naM3 z+JNKM+g@~;?~QE1Y%=TPMGh%HkIkwNPmsg~KVhK7QBird&;_&9vh9#LW9T~mo@C+w zQ2qhMwECqSkZ`G6VRV954A`7Fosjm`ld_zm0_dTuA^V%HDBCnu~e8rjwmY!wOsqSf&>quHv2uMhPxBa5l( zpCP(^Z$wDnboXJ_EJM>NTJN0k@?~bsO=qtt<&KQs(~C+GEu@CNBg0PPC#AvnWUr^N zccRvVDc6(^Z6x#u%l2NV-_7$M!3!)?s11eMhk+^W#4QH90Zt6b{zJDwAeqv3p&?x` z49L@!rc0`K>A;NM>U7OT2=K-EdTPMLWlaL&YpkU7wd4pz{)?rdd8h2MAAt+gPE9)` zW75A}c60#0V*It7zb|=i)*gPoU9YHDsTwJ`1*>YdFW!uqTLjb~JvD)_H942GKyV8% zt2>e<6;>~t3ka}6?Ub@m#0N(~|Gc#h%;BeWWn)mco-Puw{@1^iKL#YZ$~7AE8Yo`J z)Lk_T2HLF62&ax){X8n1kioK7>gHF~tOMJ8$$eXQzw?ni>rc!?u--5;D%lr!UM$hX z#r`6^+(S1&jtxJTFeZ+I9Jp{o<>wSjEI4?YnR!ZKM+`I^O56hdiQyi5`QQzROs>tW!d-(>WQC*fQ9A z^Wu3_?b&@EB?CErz`UK(yHjF(`-FXVd|;^M(4|fAOMH5=*e+mj2r_j+US7J+HzK6{ zrVKYUN560u<0f}kUL)3NPRS4|bV;);fWWUCyfbtP;q7mR_1PXxeX0q15&?6#CHS4* zXqQYs$6e$L^c-G0EqNJxko%}}UxXJGvSjja{rDQZqvbNbQ{M04(Ne9;p0Y2&1AeD|$ma*YU!P+W=jj&E*w*UYD From 6d13c9d5674c26023efddde6a8f5d5db181d2f85 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 23 Nov 2023 11:40:14 +0100 Subject: [PATCH 14/16] wip Signed-off-by: Paolo Di Tommaso --- docs/channel.md | 25 +++++++++++++------ docs/config.md | 2 +- .../groovy/nextflow/extension/MixOp.groovy | 1 + .../groovy/nextflow/script/ProcessDef.groovy | 1 + 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/docs/channel.md b/docs/channel.md index 2b4fe6f049..0148e5dc9f 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -13,23 +13,34 @@ A channel has two major properties: ## Channel types -In Nextflow there are two kinds of channels: *queue channels* and *value channels*. +In Nextflow there are three kinds of channels: *value channels*, *queue channels* and *topic channels*. + +(channel-type-value)= + +### Value channel + +A *value channel* contains a single value and can be consumed any number of times by a process or an operator. + +A value channel can be created with the [value](#value) factory method or by any operator that produces a single value +({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value +channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel. (channel-type-queue)= ### Queue channel -A *queue channel* is a non-blocking unidirectional FIFO queue which connects two processes, channel factories, or operators. +A *queue channel* is a non-blocking unidirectional FIFO queue connecting a *producer* process (i.e. outputting a value) +to a consumer , channel factories, or operators. -A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, {ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs `). +A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, +{ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs `). -(channel-type-value)= +(channel-type-topic)= -### Value channel +### Topic channel -A *value channel* contains a single value and can be consumed any number of times by a process or operator. +A *topic channel*, similarly to a *queue channel* is non-blocking unidirectional FIFO queue, however it connects multiple -A value channel can be created with the [value](#value) factory method or by any operator that produces a single value ({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel. For example: diff --git a/docs/config.md b/docs/config.md index 192f9d1df2..2848954d5f 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1864,4 +1864,4 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview` : *Experimental: may change in a future release.* -: When `true`, enables {ref}`topic channels `. +: When `true`, enables {ref}`topic channels ` feature. diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy index 4c3f70d192..3fe59ffe1e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy @@ -25,6 +25,7 @@ import groovy.transform.CompileStatic import groovyx.gpars.dataflow.DataflowReadChannel import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Channel + /** * Implements Nextflow Mix operator * diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index f2ff8c0d6e..d6f5b2c2a3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -28,6 +28,7 @@ import nextflow.script.params.BaseOutParam import nextflow.script.params.EachInParam import nextflow.script.params.InputsList import nextflow.script.params.OutputsList + /** * Models a nextflow process definition * From d58c752c7713aac9e1176c900a0110ae6092a525 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Fri, 24 Nov 2023 18:46:48 +0100 Subject: [PATCH 15/16] Minor changes + improve docs [ci fast] Signed-off-by: Paolo Di Tommaso --- docs/channel.md | 55 ++++++++++--------- .../main/groovy/nextflow/extension/CH.groovy | 2 +- .../groovy/nextflow/extension/MixOp.groovy | 2 +- .../groovy/nextflow/script/ProcessDef.groovy | 2 +- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/docs/channel.md b/docs/channel.md index 7e2e0d691b..162eb599f0 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -13,34 +13,27 @@ A channel has two major properties: ## Channel types -In Nextflow there are three kinds of channels: *value channels*, *queue channels* and *topic channels*. - -(channel-type-value)= - -### Value channel - -A *value channel* contains a single value and can be consumed any number of times by a process or an operator. - -A value channel can be created with the [value](#value) factory method or by any operator that produces a single value -({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value -channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel. +In Nextflow there are two kinds of channels: *queue channels* and *value channels*. (channel-type-queue)= ### Queue channel A *queue channel* is a non-blocking unidirectional FIFO queue connecting a *producer* process (i.e. outputting a value) -to a consumer , channel factories, or operators. +to a consumer process, or an operators. -A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, -{ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs `). +A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, {ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs `). -(channel-type-topic)= +(channel-type-value)= -### Topic channel +### Value channel -A *topic channel*, similarly to a *queue channel* is non-blocking unidirectional FIFO queue, however it connects multiple +A *value channel* can be bound (i.e. assigned) with one and only one value, and can be consumed any number of times by +a process or an operator. +A value channel can be created with the [value](#value) factory method or by any operator that produces a single value +({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value +channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel. For example: @@ -63,7 +56,8 @@ workflow { } ``` -In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly wrapped in a value channel, and the output is also emitted as a value channel. +In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly +wrapped in a value channel, and the output is also emitted as a value channel. See also: {ref}`process-multiple-input-channels`. @@ -74,7 +68,8 @@ See also: {ref}`process-multiple-input-channels`. Channels may be created explicitly using the following channel factory methods. :::{versionadded} 20.07.0 -`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or `Channel.of()`, and so on. +`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or +`Channel.of()`, and so on. ::: (channel-empty)= @@ -446,29 +441,39 @@ See also: [fromList](#fromlist) factory method. This feature requires the `nextflow.preview.topic` feature flag to be enabled. ::: -The `topic` method is used to create a queue channel that receives the items from all process outputs in a particular "topic". +A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and +{ref}`channel-type-queue`. + +A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects +multiple *producer* processes with multiple *consumer* processes or operators. + +:::{tip} +You can think about it as a channel that is shared across many different process using the same *topic name*. +::: -A process output can be assigned to a topic using the `topic` option on an output: +A process output can be assigned to a topic using the `topic` option on an output, for example: ```groovy process foo { output: - val('foo'), topic: 'my-topic' + val('foo'), topic: my_topic } process bar { output: - val('bar'), topic: 'my-topic' + val('bar'), topic: my_topic } ``` -Then, the `topic` method can be used to consume all items in the topic: +The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process +input or operator composition as any other Nextflow channel: ```groovy Channel.topic('my-topic').view() ``` -This approach is a convenient way to collect related items from many different sources without explicitly defining the channel logic (the `topic` method is essentially an implicit {ref}`operator-mix` operation). You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other. +This approach is a convenient way to collect related items from many different sources without explicitly defining +the logic connecting many different queue channels altogether, commonly using the `mix` operator. :::{warning} Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever. diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index c0d4c6cb9c..6069317f86 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -148,7 +148,7 @@ class CH { } } - static DataflowWriteChannel topicWriter(String name) { + static DataflowWriteChannel createTopicSource(String name) { synchronized (allTopics) { def topic = allTopics.get(name) if( topic==null ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy index 3fe59ffe1e..62ba37d5f2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy @@ -62,7 +62,7 @@ class MixOp { DataflowWriteChannel apply() { if( target == null ) - target = CH.create() + target = CH.create() def count = new AtomicInteger( others.size()+1 ) def handlers = [ onNext: { target << it }, diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index d6f5b2c2a3..caac95d11d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -200,7 +200,7 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { throw new IllegalArgumentException("Output topic conflicts with recursion feature - process `$processName` should not declare any output topic" ) final ch = feedbackChannels ? feedbackChannels[i] - : ( topicName ? CH.topicWriter(topicName) : CH.create(singleton) ) + : ( topicName ? CH.createTopicSource(topicName) : CH.create(singleton) ) param.setInto(ch) } } From e02dda2d45457270ab2b467c08088f754b23670e Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Fri, 24 Nov 2023 19:02:25 +0100 Subject: [PATCH 16/16] More docs [ci skip] Signed-off-by: Paolo Di Tommaso --- docs/channel.md | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docs/channel.md b/docs/channel.md index 162eb599f0..539d5d2898 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -429,6 +429,56 @@ Y ``` See also: [fromList](#fromlist) factory method. +(channel-topic)= + +### topic + +:::{versionadded} 23.11.0-edge +::: + +:::{note} +This feature requires the `nextflow.preview.topic` feature flag to be enabled. +::: + +A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and +{ref}`channel-type-queue`. + +A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects +multiple *producer* processes with multiple *consumer* processes or operators. + +:::{tip} +You can think about it as a channel that is shared across many different process using the same *topic name*. +::: + +A process output can be assigned to a topic using the `topic` option on an output, for example: + +```groovy +process foo { + output: + val('foo'), topic: my_topic +} + +process bar { + output: + val('bar'), topic: my_topic +} +``` + +The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process +input or operator composition as any other Nextflow channel: + +```groovy +channel.topic('my-topic').view() +``` + +This approach is a convenient way to collect related items from many different sources without explicitly defining +the logic connecting many different queue channels altogether, commonly using the `mix` operator. + +:::{warning} +Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever. +::: + +See also: {ref}`process-additional-options` for process outputs. (channel-topic)=