33const { pipeline } = require ( './pipeline' )
44const Duplex = require ( './duplex' )
55const { destroyer } = require ( './destroy' )
6- const { isNodeStream, isReadable, isWritable } = require ( './utils' )
6+ const {
7+ isNodeStream,
8+ isReadable,
9+ isWritable,
10+ isWebStream,
11+ isTransformStream,
12+ isWritableStream,
13+ isReadableStream
14+ } = require ( './utils' )
715const {
816 AbortError,
917 codes : { ERR_INVALID_ARG_VALUE , ERR_MISSING_ARGS }
1018} = require ( '../../ours/errors' )
19+ const eos = require ( './end-of-stream' )
1120module . exports = function compose ( ...streams ) {
1221 if ( streams . length === 0 ) {
1322 throw new ERR_MISSING_ARGS ( 'streams' )
@@ -24,14 +33,17 @@ module.exports = function compose(...streams) {
2433 streams [ idx ] = Duplex . from ( streams [ idx ] )
2534 }
2635 for ( let n = 0 ; n < streams . length ; ++ n ) {
27- if ( ! isNodeStream ( streams [ n ] ) ) {
36+ if ( ! isNodeStream ( streams [ n ] ) && ! isWebStream ( streams [ n ] ) ) {
2837 // TODO(ronag): Add checks for non streams.
2938 continue
3039 }
31- if ( n < streams . length - 1 && ! isReadable ( streams [ n ] ) ) {
40+ if (
41+ n < streams . length - 1 &&
42+ ! ( isReadable ( streams [ n ] ) || isReadableStream ( streams [ n ] ) || isTransformStream ( streams [ n ] ) )
43+ ) {
3244 throw new ERR_INVALID_ARG_VALUE ( `streams[${ n } ]` , orgStreams [ n ] , 'must be readable' )
3345 }
34- if ( n > 0 && ! isWritable ( streams [ n ] ) ) {
46+ if ( n > 0 && ! ( isWritable ( streams [ n ] ) || isWritableStream ( streams [ n ] ) || isTransformStream ( streams [ n ] ) ) ) {
3547 throw new ERR_INVALID_ARG_VALUE ( `streams[${ n } ]` , orgStreams [ n ] , 'must be writable' )
3648 }
3749 }
@@ -53,8 +65,8 @@ module.exports = function compose(...streams) {
5365 }
5466 const head = streams [ 0 ]
5567 const tail = pipeline ( streams , onfinished )
56- const writable = ! ! isWritable ( head )
57- const readable = ! ! isReadable ( tail )
68+ const writable = ! ! ( isWritable ( head ) || isWritableStream ( head ) || isTransformStream ( head ) )
69+ const readable = ! ! ( isReadable ( tail ) || isReadableStream ( tail ) || isTransformStream ( tail ) )
5870
5971 // TODO(ronag): Avoid double buffering.
6072 // Implement Writable/Readable/Duplex traits.
@@ -67,25 +79,49 @@ module.exports = function compose(...streams) {
6779 readable
6880 } )
6981 if ( writable ) {
70- d . _write = function ( chunk , encoding , callback ) {
71- if ( head . write ( chunk , encoding ) ) {
72- callback ( )
73- } else {
74- ondrain = callback
82+ if ( isNodeStream ( head ) ) {
83+ d . _write = function ( chunk , encoding , callback ) {
84+ if ( head . write ( chunk , encoding ) ) {
85+ callback ( )
86+ } else {
87+ ondrain = callback
88+ }
7589 }
76- }
77- d . _final = function ( callback ) {
78- head . end ( )
79- onfinish = callback
80- }
81- head . on ( 'drain' , function ( ) {
82- if ( ondrain ) {
83- const cb = ondrain
84- ondrain = null
85- cb ( )
90+ d . _final = function ( callback ) {
91+ head . end ( )
92+ onfinish = callback
8693 }
87- } )
88- tail . on ( 'finish' , function ( ) {
94+ head . on ( 'drain' , function ( ) {
95+ if ( ondrain ) {
96+ const cb = ondrain
97+ ondrain = null
98+ cb ( )
99+ }
100+ } )
101+ } else if ( isWebStream ( head ) ) {
102+ const writable = isTransformStream ( head ) ? head . writable : head
103+ const writer = writable . getWriter ( )
104+ d . _write = async function ( chunk , encoding , callback ) {
105+ try {
106+ await writer . ready
107+ writer . write ( chunk ) . catch ( ( ) => { } )
108+ callback ( )
109+ } catch ( err ) {
110+ callback ( err )
111+ }
112+ }
113+ d . _final = async function ( callback ) {
114+ try {
115+ await writer . ready
116+ writer . close ( ) . catch ( ( ) => { } )
117+ onfinish = callback
118+ } catch ( err ) {
119+ callback ( err )
120+ }
121+ }
122+ }
123+ const toRead = isTransformStream ( tail ) ? tail . readable : tail
124+ eos ( toRead , ( ) => {
89125 if ( onfinish ) {
90126 const cb = onfinish
91127 onfinish = null
@@ -94,25 +130,46 @@ module.exports = function compose(...streams) {
94130 } )
95131 }
96132 if ( readable ) {
97- tail . on ( 'readable' , function ( ) {
98- if ( onreadable ) {
99- const cb = onreadable
100- onreadable = null
101- cb ( )
102- }
103- } )
104- tail . on ( 'end' , function ( ) {
105- d . push ( null )
106- } )
107- d . _read = function ( ) {
108- while ( true ) {
109- const buf = tail . read ( )
110- if ( buf === null ) {
111- onreadable = d . _read
112- return
133+ if ( isNodeStream ( tail ) ) {
134+ tail . on ( 'readable' , function ( ) {
135+ if ( onreadable ) {
136+ const cb = onreadable
137+ onreadable = null
138+ cb ( )
139+ }
140+ } )
141+ tail . on ( 'end' , function ( ) {
142+ d . push ( null )
143+ } )
144+ d . _read = function ( ) {
145+ while ( true ) {
146+ const buf = tail . read ( )
147+ if ( buf === null ) {
148+ onreadable = d . _read
149+ return
150+ }
151+ if ( ! d . push ( buf ) ) {
152+ return
153+ }
113154 }
114- if ( ! d . push ( buf ) ) {
115- return
155+ }
156+ } else if ( isWebStream ( tail ) ) {
157+ const readable = isTransformStream ( tail ) ? tail . readable : tail
158+ const reader = readable . getReader ( )
159+ d . _read = async function ( ) {
160+ while ( true ) {
161+ try {
162+ const { value, done } = await reader . read ( )
163+ if ( ! d . push ( value ) ) {
164+ return
165+ }
166+ if ( done ) {
167+ d . push ( null )
168+ return
169+ }
170+ } catch {
171+ return
172+ }
116173 }
117174 }
118175 }
@@ -128,7 +185,9 @@ module.exports = function compose(...streams) {
128185 callback ( err )
129186 } else {
130187 onclose = callback
131- destroyer ( tail , err )
188+ if ( isNodeStream ( tail ) ) {
189+ destroyer ( tail , err )
190+ }
132191 }
133192 }
134193 return d
0 commit comments