Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ sealed abstract class Pull[+F[_], +O, +R] {
_ => this,
(l, _) => Pull.eval(l.cancel).rethrow
)

}

object Pull extends PullLowPriority {
Expand Down Expand Up @@ -464,10 +465,70 @@ object Pull extends PullLowPriority {
def extendScopeTo[F[_], O](
s: Stream[F, O]
)(implicit F: MonadError[F, Throwable]): Pull[F, Nothing, Stream[F, O]] =
for {
scope <- Pull.getScope[F]
lease <- Pull.eval(scope.lease)
} yield s.onFinalize(lease.cancel.redeemWith(F.raiseError(_), _ => F.unit))
Pull.getScope[F].map(scope => Stream.bracket(scope.lease)(_.cancel.rethrow) *> s)

private class ScopedBind[+F[_], +O, X, +R](
bind: Bind[F, O, X, R],
scope: Scope[F]
)(implicit F: MonadThrow[F])
extends Bind[F, O, X, R](
bindAcquireToScope(bind.step, scope)
) {
def cont(r: Terminal[X]): Pull[F, O, R] =
bindAcquireToScope[F, O, R](bind.cont(r), scope)
}

private[fs2] def bindAcquireToScope[F[_], O, R](
pull: Pull[F, O, R],
scope: Scope[F]
)(implicit F: MonadThrow[F]): Pull[F, O, R] =
pull match {
case p: Pull.FlatMapOutput[?, ?, ?] =>
bindAcquireToScope(p.stream, scope).flatMapOutput(o => bindAcquireToScope(p.fun(o), scope))
case p: Pull.Acquire[F, r] @unchecked =>
Pull
.eval(
scope.acquireResource(
poll =>
if (p.cancelable) poll(p.resource)
else p.resource,
p.release
)
)
.flatMap {
case Outcome.Succeeded(Left(id)) =>
Pull.raiseError(new RuntimeException(s"what to do with id? $id"))
case Outcome.Succeeded(Right(r)) =>
Pull.pure(r)
case Outcome.Errored(e) => Pull.raiseError(e)
case Outcome.Canceled() => Pull.raiseError(new InterruptedException)
}
case p: Pull.Bind[F, O, x, R] @unchecked =>
new ScopedBind[F, O, x, R](p, scope)
case p: Pull.InScope[?, ?] =>
Pull.InScope(bindAcquireToScope(p.stream, scope), p.useInterruption)
case p: Pull.StepLeg[F, O] @unchecked =>
Pull.StepLeg(
bindAcquireToScope(p.stream, scope),
p.scope
)
case p: Pull.Uncons[F, O] @unchecked =>
Pull.Uncons(
bindAcquireToScope(p.stream, scope)
)
case other => other
// case p: Pull.AlgEffect[F, R] @unchecked =>
// p // workaround for Scala 3 'Pull.CloseScope case is unreachable'
// case p: Pull.Translate[g, F, O] @unchecked => p
//// case p: Pull.InterruptWhen[?] => p
//// case p: Pull.CloseScope => p
//// case p: Pull.GetScope[?] => p
//// case p: Pull.Eval[?, ?] => p
// case p: Pull.Fail => p
// case p: Pull.Succeeded[R] @unchecked => p
// case p: Pull.Interrupted => p
// case p: Pull.Output[O] @unchecked => p
}

/** Repeatedly uses the output of the pull as input for the next step of the
* pull. Halts when a step terminates with `None` or `Pull.raiseError`.
Expand Down
173 changes: 99 additions & 74 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,29 +238,31 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] = {
assert(pipes.nonEmpty, s"pipes should not be empty")
Stream.force {
for {
// topic: contains the chunk that the pipes are processing at one point.
// until and unless all pipes are finished with it, won't move to next one
topic <- Topic[F2, Chunk[O]]
// Coordination: neither the producer nor any consumer starts
// until and unless all consumers are subscribed to topic.
allReady <- CountDownLatch[F2](pipes.length)
} yield {
val checkIn = allReady.release >> allReady.await

def dump(pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
Stream.resource(topic.subscribeAwait(1)).flatMap { sub =>
// Wait until all pipes are ready before consuming.
// Crucial: checkin is not passed to the pipe,
// so pipe cannot interrupt it and alter the latch count
Stream.exec(checkIn) ++ pipe(sub.unchunks)
}
extendScopeThrough[F2, O2] { source =>
Stream.force {
for {
// topic: contains the chunk that the pipes are processing at one point.
// until and unless all pipes are finished with it, won't move to next one
topic <- Topic[F2, Chunk[O]]
// Coordination: neither the producer nor any consumer starts
// until and unless all consumers are subscribed to topic.
allReady <- CountDownLatch[F2](pipes.length)
} yield {
val checkIn = allReady.release >> allReady.await

def dump(pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
Stream.resource(topic.subscribeAwait(1)).flatMap { sub =>
// Wait until all pipes are ready before consuming.
// Crucial: checkin is not passed to the pipe,
// so pipe cannot interrupt it and alter the latch count
Stream.exec(checkIn) ++ pipe(sub.unchunks)
}

val dumpAll: Stream[F2, O2] = Stream(pipes: _*).map(dump).parJoinUnbounded
// Wait until all pipes are checked in before pulling
val pump = Stream.exec(allReady.await) ++ topic.publish(chunks)
dumpAll.concurrently(pump)
val dumpAll: Stream[F2, O2] = Stream(pipes: _*).map(dump).parJoinUnbounded
// Wait until all pipes are checked in before pulling
val pump = Stream.exec(allReady.await) ++ topic.publish(source.chunks)
dumpAll.concurrently(pump)
}
}
}
}
Expand Down Expand Up @@ -2323,64 +2325,65 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
channel: F2[Channel[F2, F2[Either[Throwable, O2]]]],
isOrdered: Boolean,
f: O => F2[O2]
)(implicit F: Concurrent[F2]): Stream[F2, O2] = {
val action =
(
Semaphore[F2](concurrency),
channel,
Deferred[F2, Unit],
Deferred[F2, Unit]
).mapN { (semaphore, channel, stop, end) =>
def initFork(release: F2[Unit]): F2[Either[Throwable, O2] => F2[Unit]] = {
def ordered: F2[Either[Throwable, O2] => F2[Unit]] = {
def send(v: Deferred[F2, Either[Throwable, O2]]) =
(el: Either[Throwable, O2]) => v.complete(el).void

Deferred[F2, Either[Throwable, O2]]
.flatTap(value => channel.send(release *> value.get))
.map(send)
}
)(implicit F: Concurrent[F2]): Stream[F2, O2] =
extendScopeThrough[F2, O2] { source =>
Stream.force {
(
Semaphore[F2](concurrency),
channel,
Deferred[F2, Unit],
Deferred[F2, Unit]
).mapN { (semaphore, channel, stop, end) =>
def initFork(release: F2[Unit]): F2[Either[Throwable, O2] => F2[Unit]] = {
def ordered: F2[Either[Throwable, O2] => F2[Unit]] = {
def send(v: Deferred[F2, Either[Throwable, O2]]) =
(el: Either[Throwable, O2]) => v.complete(el).void

Deferred[F2, Either[Throwable, O2]]
.flatTap(value => channel.send(release *> value.get))
.map(send)
}

def unordered: Either[Throwable, O2] => F2[Unit] =
(el: Either[Throwable, O2]) => release <* channel.send(F.pure(el))
def unordered: Either[Throwable, O2] => F2[Unit] =
(el: Either[Throwable, O2]) => release <* channel.send(F.pure(el))

if (isOrdered) ordered else F.pure(unordered)
}
if (isOrdered) ordered else F.pure(unordered)
}

val releaseAndCheckCompletion =
semaphore.release *>
semaphore.available.flatMap {
case `concurrency` => channel.close *> end.complete(()).void
case _ => F.unit
}
val releaseAndCheckCompletion =
semaphore.release *>
semaphore.available.flatMap {
case `concurrency` => channel.close *> end.complete(()).void
case _ => F.unit
}

def forkOnElem(el: O): F2[Unit] =
F.uncancelable { poll =>
poll(semaphore.acquire) <*
Deferred[F2, Unit].flatMap { pushed =>
val init = initFork(pushed.complete(()).void)
poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get
F.start(stop.get.race(action) *> releaseAndCheckCompletion)
def forkOnElem(el: O): F2[Unit] =
F.uncancelable { poll =>
poll(semaphore.acquire) <*
Deferred[F2, Unit].flatMap { pushed =>
val init = initFork(pushed.complete(()).void)
poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get
F.start(stop.get.race(action) *> releaseAndCheckCompletion)
}
}
}
}
}

val background =
Stream.exec(semaphore.acquire) ++
interruptWhen(stop.get.map(_.asRight[Throwable]))
.foreach(forkOnElem)
.onFinalizeCase {
case ExitCase.Succeeded => releaseAndCheckCompletion
case _ => stop.complete(()) *> releaseAndCheckCompletion
}
val background =
Stream.exec(semaphore.acquire) ++
source
.interruptWhen(stop.get.map(_.asRight[Throwable]))
.foreach(forkOnElem)
.onFinalizeCase {
case ExitCase.Succeeded => releaseAndCheckCompletion
case _ => stop.complete(()) *> releaseAndCheckCompletion
}

val foreground = channel.stream.evalMap(_.rethrow)
foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
val foreground = channel.stream.evalMap(_.rethrow)
foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
}
}

Stream.force(action)
}
}

/** Concurrent zip.
*
Expand Down Expand Up @@ -2455,12 +2458,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
*/
def prefetchN[F2[x] >: F[x]: Concurrent](
n: Int
): Stream[F2, O] =
): Stream[F2, O] = extendScopeThrough[F2, O] { source =>
Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
chan.stream.unchunks.concurrently {
chunks.through(chan.sendAll)
source.chunks.through(chan.sendAll)
}
}
}

/** Prints each element of this stream to standard out, converting each element to a `String` via `Show`. */
def printlns[F2[x] >: F[x], O2 >: O](implicit
Expand Down Expand Up @@ -2921,6 +2925,27 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
)(f: (Stream[F, O], Stream[F2, O2]) => Stream[F2, O3]): Stream[F2, O3] =
f(this, s2)

/** Transforms this stream, explicitly extending the current scope through the given pipe.
*
* Use this when implementing a pipe where the resulting stream is not directly constructed from
* the source stream, e.g. when sending the source stream through a Channel and returning the
* channel's stream.
*/
def extendScopeThrough[F2[x] >: F[x], O2](
f: Stream[F2, O] => Stream[F2, O2]
)(implicit F: MonadError[F2, Throwable]): Stream[F2, O2] =
this.pull.peek.flatMap {
case Some((_, stream)) =>
Pull
.getScope[F2]
.flatMap { scope =>
f(
Pull.bindAcquireToScope(stream.underlying, scope).stream
).underlying
}
case None => Pull.done
}.stream

/** Fails this stream with a `TimeoutException` if it does not complete within given `timeout`. */
def timeout[F2[x] >: F[x]: Temporal](
timeout: FiniteDuration
Expand Down
30 changes: 30 additions & 0 deletions core/shared/src/test/scala/fs2/ParEvalMapSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,34 @@ class ParEvalMapSuite extends Fs2Suite {
.timeout(2.seconds)
}
}

group("issue-3076, parEvalMap* runs resource finaliser before usage") {
test("parEvalMap") {
Deferred[IO, Unit]
.flatMap { d =>
Stream
.bracket(IO.unit)(_ => d.complete(()).void)
.parEvalMap(2)(_ => IO.sleep(1.second))
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
.timeout(5.seconds)
.compile
.last
}
.assertEquals(Some(true))
}

test("broadcastThrough") {
Deferred[IO, Unit]
.flatMap { d =>
Stream
.bracket(IO.unit)(_ => d.complete(()).void)
.broadcastThrough(identity[Stream[IO, Unit]])
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
.timeout(5.seconds)
.compile
.last
}
.assertEquals(Some(true))
}
}
}
30 changes: 30 additions & 0 deletions core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,36 @@ class StreamCombinatorsSuite extends Fs2Suite {
)
.assertEquals(4.seconds)
}

test("scope propagation") {
Deferred[IO, Unit]
.flatMap { d =>
Stream
.bracket(IO.unit)(_ => d.complete(()).void)
.prefetch
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
.timeout(5.seconds)
.compile
.last
}
.assertEquals(Some(true))
}

test("scope propagation, multiple pulls") {
Stream(1, 2, 3, 4, 5, 6)
.flatMap(i => Stream.bracket(Deferred[IO, Int])(_.complete(i).void))
.parEvalMap(2)(d => IO.sleep(1.second).as(d))
.parEvalMap(2)(d => IO.sleep(1.second).as(d))
.parEvalMap(2)(d => IO.sleep(1.second) >> d.complete(0))
.evalMap(completed =>
IO.raiseUnless(completed)(new RuntimeException("resource released prematurely"))
)
.timeout(15.seconds)
.compile
.last
.assertEquals(Some(()))
}

}

test("range") {
Expand Down
Loading