File tree Expand file tree Collapse file tree 3 files changed +29
-2
lines changed
main/clojure/clojure/core/async
test/clojure/clojure/core Expand file tree Collapse file tree 3 files changed +29
-2
lines changed Original file line number Diff line number Diff line change 73
73
:args - a map of param->val which will be passed to the process ctor
74
74
:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
75
75
and xform have their meanings per core.async/chan
76
- the default is {:buf-or-n 10}
76
+ the default is {:buf-or-n 10}. For efficiency, Flow creates
77
+ only the in channel per in/out pair and so :chan-opts
78
+ should be associated with the in channel.
77
79
78
80
:conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
79
81
Original file line number Diff line number Diff line change 62
62
conn-map (reduce (fn [ret [out in :as conn]]
63
63
(if (and (contains? outopts out)
64
64
(contains? inopts in))
65
- (update ret out set-conj in)
65
+ (if (seq (vals (get outopts out)))
66
+ (throw (ex-info (str " only one channel created for connection. "
67
+ " :chan-opts must be associated with input side." )
68
+ {:conn conn, :out-opts outopts}))
69
+ (update ret out set-conj in))
66
70
(throw (ex-info " invalid connection" {:conn conn}))))
67
71
{} conns)
68
72
running-chans #(or (deref chans) (throw (Exception. " flow not running" )))
Original file line number Diff line number Diff line change
1
+ (ns clojure.core.flow-test
2
+ (:require [clojure.core.async.flow :as flow]
3
+ [clojure.test :refer :all ]))
4
+
5
+ (defn tap-n-drop [x] (tap> x) nil )
6
+
7
+ (deftest chan-opts-tests
8
+ (testing " :chan-opts only specified on in side of connected pair"
9
+ (is (thrown? clojure.lang.ExceptionInfo
10
+ (flow/create-flow
11
+ {:procs {:source {:proc (-> identity flow/lift1->step flow/process)
12
+ :chan-opts {:out {:buf-or-n 11
13
+ :xform (map (fn [x] (str " Saw " x)))}}}
14
+ :sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)}}
15
+ :conns [[[:source :out ] [:sink :in ]]]})))
16
+ (is (flow/create-flow
17
+ {:procs {:source {:proc (-> identity flow/lift1->step flow/process)}
18
+ :sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)
19
+ :chan-opts {:in {:buf-or-n 11
20
+ :xform (map (fn [x] (str " Saw " x)))}}}}
21
+ :conns [[[:source :out ] [:sink :in ]]]}))))
You can’t perform that action at this time.
0 commit comments