Skip to content

Commit 921313d

Browse files
Add "topic" channel (#4459)
This commit adds experimental support for topic channel. A topic channel allows multiple processes to write over the same channel identified by a shared topic name. Signed-off-by: Paolo Di Tommaso <[email protected]> Signed-off-by: Ben Sherman <[email protected]> Co-authored-by: Ben Sherman <[email protected]>
1 parent b641d67 commit 921313d

File tree

17 files changed

+426
-60
lines changed

17 files changed

+426
-60
lines changed

docs/channel.md

Lines changed: 113 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@ In Nextflow there are two kinds of channels: *queue channels* and *value channel
1919

2020
### Queue channel
2121

22-
A *queue channel* is a non-blocking unidirectional FIFO queue which connects two processes, channel factories, or operators.
22+
A *queue channel* is a non-blocking unidirectional FIFO queue connecting a *producer* process (i.e. outputting a value)
23+
to a consumer process, or an operators.
2324

2425
A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, {ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs <process-output>`).
2526

2627
(channel-type-value)=
2728

2829
### Value channel
2930

30-
A *value channel* contains a single value and can be consumed any number of times by a process or operator.
31+
A *value channel* can be bound (i.e. assigned) with one and only one value, and can be consumed any number of times by
32+
a process or an operator.
3133

32-
A value channel can be created with the [value](#value) factory method or by any operator that produces a single value ({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel.
34+
A value channel can be created with the [value](#value) factory method or by any operator that produces a single value
35+
({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value
36+
channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel.
3337

3438
For example:
3539

@@ -52,7 +56,8 @@ workflow {
5256
}
5357
```
5458

55-
In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly wrapped in a value channel, and the output is also emitted as a value channel.
59+
In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly
60+
wrapped in a value channel, and the output is also emitted as a value channel.
5661

5762
See also: {ref}`process-multiple-input-channels`.
5863

@@ -63,7 +68,8 @@ See also: {ref}`process-multiple-input-channels`.
6368
Channels may be created explicitly using the following channel factory methods.
6469

6570
:::{versionadded} 20.07.0
66-
`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or `Channel.of()`, and so on.
71+
`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or
72+
`Channel.of()`, and so on.
6773
:::
6874

6975
(channel-empty)=
@@ -429,6 +435,108 @@ Y
429435

430436
See also: [channel.fromList](#fromlist) factory method.
431437

438+
(channel-topic)=
439+
440+
### topic
441+
442+
:::{versionadded} 23.11.0-edge
443+
:::
444+
445+
:::{note}
446+
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
447+
:::
448+
449+
A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and
450+
{ref}`channel-type-queue`.
451+
452+
A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects
453+
multiple *producer* processes with multiple *consumer* processes or operators.
454+
455+
:::{tip}
456+
You can think about it as a channel that is shared across many different process using the same *topic name*.
457+
:::
458+
459+
A process output can be assigned to a topic using the `topic` option on an output, for example:
460+
461+
```groovy
462+
process foo {
463+
output:
464+
val('foo'), topic: my_topic
465+
}
466+
467+
process bar {
468+
output:
469+
val('bar'), topic: my_topic
470+
}
471+
```
472+
473+
The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process
474+
input or operator composition as any other Nextflow channel:
475+
476+
```groovy
477+
channel.topic('my-topic').view()
478+
```
479+
480+
This approach is a convenient way to collect related items from many different sources without explicitly defining
481+
the logic connecting many different queue channels altogether, commonly using the `mix` operator.
482+
483+
:::{warning}
484+
Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever.
485+
:::
486+
487+
See also: {ref}`process-additional-options` for process outputs.
488+
489+
(channel-topic)=
490+
491+
### topic
492+
493+
:::{versionadded} 23.11.0-edge
494+
:::
495+
496+
:::{note}
497+
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
498+
:::
499+
500+
A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and
501+
{ref}`channel-type-queue`.
502+
503+
A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects
504+
multiple *producer* processes with multiple *consumer* processes or operators.
505+
506+
:::{tip}
507+
You can think about it as a channel that is shared across many different process using the same *topic name*.
508+
:::
509+
510+
A process output can be assigned to a topic using the `topic` option on an output, for example:
511+
512+
```groovy
513+
process foo {
514+
output:
515+
val('foo'), topic: my_topic
516+
}
517+
518+
process bar {
519+
output:
520+
val('bar'), topic: my_topic
521+
}
522+
```
523+
524+
The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process
525+
input or operator composition as any other Nextflow channel:
526+
527+
```groovy
528+
Channel.topic('my-topic').view()
529+
```
530+
531+
This approach is a convenient way to collect related items from many different sources without explicitly defining
532+
the logic connecting many different queue channels altogether, commonly using the `mix` operator.
533+
534+
:::{warning}
535+
Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever.
536+
:::
537+
538+
See also: {ref}`process-additional-options` for process outputs.
539+
432540
(channel-value)=
433541

434542
### value

docs/config.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,3 +1856,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`
18561856
: *Experimental: may change in a future release.*
18571857

18581858
: When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information.
1859+
1860+
`nextflow.preview.topic`
1861+
1862+
: :::{versionadded} 23.11.0-edge
1863+
:::
1864+
1865+
: *Experimental: may change in a future release.*
1866+
1867+
: When `true`, enables {ref}`topic channels <channel-topic>` feature.

docs/process.md

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,62 +1166,59 @@ process foo {
11661166
```
11671167
:::
11681168

1169-
### Optional outputs
1169+
(process-additional-options)=
11701170

1171-
In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced:
1171+
### Additional options
11721172

1173-
```groovy
1174-
output:
1175-
path("output.txt"), optional: true
1176-
```
1173+
The following options are available for all process outputs:
11771174

1178-
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`.
1175+
`emit: <name>`
11791176

1180-
(process-multiple-outputs)=
1177+
: Defines the name of the output channel, which can be used to access the channel by name from the process output:
11811178

1182-
### Multiple outputs
1179+
```groovy
1180+
process FOO {
1181+
output:
1182+
path 'hello.txt', emit: hello
1183+
path 'bye.txt', emit: bye
1184+
1185+
"""
1186+
echo "hello" > hello.txt
1187+
echo "bye" > bye.txt
1188+
"""
1189+
}
11831190
1184-
When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero):
1191+
workflow {
1192+
FOO()
1193+
FOO.out.hello.view()
1194+
}
1195+
```
11851196

1186-
```groovy
1187-
process FOO {
1188-
output:
1189-
path 'bye_file.txt'
1190-
path 'hi_file.txt'
1197+
See {ref}`workflow-process-invocation` for more details.
11911198

1192-
"""
1193-
echo "bye" > bye_file.txt
1194-
echo "hi" > hi_file.txt
1195-
"""
1196-
}
1199+
`optional: true | false`
11971200

1198-
workflow {
1199-
FOO()
1200-
FOO.out[1].view()
1201-
}
1202-
```
1201+
: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel.
12031202

1204-
You can also use the `emit` option to assign a name to each output and access them by name:
1203+
```groovy
1204+
output:
1205+
path("output.txt"), optional: true
1206+
```
12051207

1206-
```groovy
1207-
process FOO {
1208-
output:
1209-
path 'bye_file.txt', emit: bye_file
1210-
path 'hi_file.txt', emit: hi_file
1208+
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`.
12111209

1212-
"""
1213-
echo "bye" > bye_file.txt
1214-
echo "hi" > hi_file.txt
1215-
"""
1216-
}
1210+
: :::{note}
1211+
While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional.
1212+
:::
12171213

1218-
workflow {
1219-
FOO()
1220-
FOO.out.hi_file.view()
1221-
}
1222-
```
1214+
`topic: <name>`
1215+
1216+
: :::{versionadded} 23.11.0-edge
1217+
:::
1218+
1219+
: *Experimental: may change in a future release.*
12231220

1224-
See {ref}`workflow-process-invocation` for more details.
1221+
: Defines the {ref}`channel topic <channel-topic>` to which the output will be sent.
12251222

12261223
## When
12271224

docs/workflow.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ workflow {
104104
}
105105
```
106106

107-
When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below).
107+
When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below).
108108

109109
The process output(s) can also be accessed like the return value of a function:
110110

@@ -144,7 +144,7 @@ workflow {
144144
}
145145
```
146146

147-
See {ref}`process-multiple-outputs` for more details.
147+
See {ref}`process outputs <process-additional-options>` for more details.
148148

149149
### Process named stdout
150150

modules/nextflow/src/main/groovy/nextflow/Channel.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ class Channel {
121121
return CH.queue()
122122
}
123123

124+
static DataflowWriteChannel topic(String name) {
125+
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS)
126+
return CH.topic(name)
127+
}
128+
124129
/**
125130
* Create a empty channel i.e. only emits a STOP signal
126131
*

modules/nextflow/src/main/groovy/nextflow/NF.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,8 @@ class NF {
6767
static boolean isRecurseEnabled() {
6868
NextflowMeta.instance.preview.recursion
6969
}
70+
71+
static boolean isTopicChannelEnabled() {
72+
NextflowMeta.instance.preview.topic
73+
}
7074
}

modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class NextflowMeta {
4343
volatile float dsl
4444
boolean strict
4545
boolean recursion
46+
boolean topic
4647

4748
void setDsl( float num ) {
4849
if( num == 1 )
@@ -59,6 +60,12 @@ class NextflowMeta {
5960
log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
6061
this.recursion = recurse
6162
}
63+
64+
void setTopic(Boolean value) {
65+
if( topic )
66+
log.warn "CHANNEL TOPICS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
67+
this.topic = value
68+
}
6269
}
6370

6471
static class Features implements Flags {

modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -915,15 +915,16 @@ class NextflowDSLImpl implements ASTTransformation {
915915

916916
/**
917917
* Transform a map entry `emit: something` into `emit: 'something'
918+
* and `topic: something` into `topic: 'something'
918919
* (ie. as a constant) in a map expression passed as argument to
919920
* a method call. This allow the syntax
920921
*
921922
* output:
922-
* path 'foo', emit: bar
923+
* path 'foo', emit: bar, topic: baz
923924
*
924925
* @param call
925926
*/
926-
protected void fixOutEmitOption(MethodCallExpression call) {
927+
protected void fixOutEmitAndTopicOptions(MethodCallExpression call) {
927928
List<Expression> args = isTupleX(call.arguments)?.expressions
928929
if( !args ) return
929930
if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return
@@ -936,6 +937,9 @@ class NextflowDSLImpl implements ASTTransformation {
936937
if( key?.text == 'emit' && val ) {
937938
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
938939
}
940+
else if( key?.text == 'topic' && val ) {
941+
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
942+
}
939943
}
940944
}
941945

@@ -955,7 +959,7 @@ class NextflowDSLImpl implements ASTTransformation {
955959
// prefix the method name with the string '_out_'
956960
methodCall.setMethod( new ConstantExpression('_out_' + methodName) )
957961
fixMethodCall(methodCall)
958-
fixOutEmitOption(methodCall)
962+
fixOutEmitAndTopicOptions(methodCall)
959963
}
960964

961965
else if( methodName in ['into','mode'] ) {

0 commit comments

Comments
 (0)