Skip to content

Commit 00fb259

Browse files
authored
Merge pull request #3342 from BalmungSan/unsafe-to-publisher
Add flow.unsafeToPublisher
2 parents 53ba75c + b5e91af commit 00fb259

File tree

5 files changed

+120
-38
lines changed

5 files changed

+120
-38
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ package fs2
2323
package interop
2424
package flow
2525

26+
import cats.effect.IO
2627
import cats.effect.kernel.{Async, Resource}
2728
import cats.effect.std.Dispatcher
29+
import cats.effect.unsafe.IORuntime
2830

2931
import java.util.Objects.requireNonNull
3032
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}
33+
import java.util.concurrent.RejectedExecutionException
3134
import scala.util.control.NoStackTrace
3235

3336
/** Implementation of a [[Publisher]].
@@ -39,22 +42,24 @@ import scala.util.control.NoStackTrace
3942
*
4043
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
4144
*/
42-
private[flow] final class StreamPublisher[F[_], A] private (
43-
stream: Stream[F, A],
44-
startDispatcher: Dispatcher[F]
45-
)(implicit F: Async[F])
46-
extends Publisher[A] {
47-
override def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
45+
private[flow] sealed abstract class StreamPublisher[F[_], A] private (
46+
stream: Stream[F, A]
47+
)(implicit
48+
F: Async[F]
49+
) extends Publisher[A] {
50+
protected def runSubscription(subscribe: F[Unit]): Unit
51+
52+
override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
4853
requireNonNull(
4954
subscriber,
5055
"The subscriber provided to subscribe must not be null"
5156
)
5257
try
53-
startDispatcher.unsafeRunAndForget(
58+
runSubscription(
5459
StreamSubscription.subscribe(stream, subscriber)
5560
)
5661
catch {
57-
case _: IllegalStateException =>
62+
case _: IllegalStateException | _: RejectedExecutionException =>
5863
subscriber.onSubscribe(new Subscription {
5964
override def cancel(): Unit = ()
6065
override def request(x$1: Long): Unit = ()
@@ -65,13 +70,41 @@ private[flow] final class StreamPublisher[F[_], A] private (
6570
}
6671

6772
private[flow] object StreamPublisher {
73+
private final class DispatcherStreamPublisher[F[_], A](
74+
stream: Stream[F, A],
75+
startDispatcher: Dispatcher[F]
76+
)(implicit
77+
F: Async[F]
78+
) extends StreamPublisher[F, A](stream) {
79+
override protected final def runSubscription(subscribe: F[Unit]): Unit =
80+
startDispatcher.unsafeRunAndForget(subscribe)
81+
}
82+
83+
private final class IORuntimeStreamPublisher[A](
84+
stream: Stream[IO, A]
85+
)(implicit
86+
runtime: IORuntime
87+
) extends StreamPublisher[IO, A](stream) {
88+
override protected final def runSubscription(subscribe: IO[Unit]): Unit =
89+
subscribe.unsafeRunAndForget()
90+
}
91+
6892
def apply[F[_], A](
6993
stream: Stream[F, A]
70-
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
71-
Dispatcher.parallel[F](await = false).map { startDispatcher =>
72-
new StreamPublisher(stream, startDispatcher)
94+
)(implicit
95+
F: Async[F]
96+
): Resource[F, StreamPublisher[F, A]] =
97+
Dispatcher.parallel[F](await = true).map { startDispatcher =>
98+
new DispatcherStreamPublisher(stream, startDispatcher)
7399
}
74100

101+
def unsafe[A](
102+
stream: Stream[IO, A]
103+
)(implicit
104+
runtime: IORuntime
105+
): StreamPublisher[IO, A] =
106+
new IORuntimeStreamPublisher(stream)
107+
75108
private object CanceledStreamPublisherException
76109
extends IllegalStateException(
77110
"This StreamPublisher is not longer accepting subscribers"

core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,49 +50,49 @@ private[flow] final class StreamSubscriber[F[_], A] private (
5050
// Subscriber API.
5151

5252
/** Receives a subscription from the upstream reactive-streams system. */
53-
override def onSubscribe(subscription: Subscription): Unit = {
53+
override final def onSubscribe(subscription: Subscription): Unit = {
5454
requireNonNull(
5555
subscription,
5656
"The subscription provided to onSubscribe must not be null"
5757
)
58-
nextState(Subscribe(subscription))
58+
nextState(input = Subscribe(subscription))
5959
}
6060

6161
/** Receives the next record from the upstream reactive-streams system. */
62-
override def onNext(a: A): Unit = {
62+
override final def onNext(a: A): Unit = {
6363
requireNonNull(
6464
a,
6565
"The element provided to onNext must not be null"
6666
)
67-
nextState(Next(a))
67+
nextState(input = Next(a))
6868
}
6969

7070
/** Called by the upstream reactive-streams system when it fails. */
71-
override def onError(ex: Throwable): Unit = {
71+
override final def onError(ex: Throwable): Unit = {
7272
requireNonNull(
7373
ex,
7474
"The throwable provided to onError must not be null"
7575
)
76-
nextState(Error(ex))
76+
nextState(input = Error(ex))
7777
}
7878

7979
/** Called by the upstream reactive-streams system when it has finished sending records. */
80-
override def onComplete(): Unit =
81-
nextState(Complete(canceled = false))
80+
override final def onComplete(): Unit =
81+
nextState(input = Complete(canceled = false))
8282

8383
// Interop API.
8484

8585
/** Creates a downstream [[Stream]] from this [[Subscriber]]. */
8686
private[flow] def stream(subscribe: F[Unit]): Stream[F, A] = {
8787
// Called when downstream has finished consuming records.
8888
val finalize =
89-
F.delay(nextState(Complete(canceled = true)))
89+
F.delay(nextState(input = Complete(canceled = true)))
9090

9191
// Producer for downstream.
9292
val dequeue1 =
9393
F.async[Option[Chunk[Any]]] { cb =>
9494
F.delay {
95-
nextState(Dequeue(cb))
95+
nextState(input = Dequeue(cb))
9696

9797
Some(finalize)
9898
}
@@ -112,8 +112,8 @@ private[flow] final class StreamSubscriber[F[_], A] private (
112112
private def run(block: => Unit): () => Unit = () => block
113113

114114
/** Runs a single step of the state machine. */
115-
private def step(in: Input): State => (State, () => Unit) =
116-
in match {
115+
private def step(input: Input): State => (State, () => Unit) =
116+
input match {
117117
case Subscribe(s) => {
118118
case Uninitialized(None) =>
119119
Idle(s) -> noop
@@ -263,9 +263,9 @@ private[flow] final class StreamSubscriber[F[_], A] private (
263263
* + `Error` & `Dequeue`: No matter the order in which they are processed, we will complete the callback with the error.
264264
* + cancellation & any other thing: Worst case, we will lose some data that we not longer care about; and eventually reach `Terminal`.
265265
*/
266-
private def nextState(in: Input): Unit = {
266+
private def nextState(input: Input): Unit = {
267267
val (_, effect) = currentState.updateAndGet { case (state, _) =>
268-
step(in)(state)
268+
step(input)(state)
269269
}
270270
// Only run the effect after the state update took place.
271271
effect()

core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
3333

3434
/** Implementation of a [[Subscription]].
3535
*
36-
* This is used by the [[StreamUnicastPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
36+
* This is used by the [[StreamPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
3737
*
3838
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]]
3939
*/
@@ -58,7 +58,9 @@ private[flow] final class StreamSubscription[F[_], A] private (
5858
sub.onComplete()
5959
}
6060

61-
private[flow] def run: F[Unit] = {
61+
// This is a def rather than a val, because it is only used once.
62+
// And having fields increase the instantiation cost and delay garbage collection.
63+
def run: F[Unit] = {
6264
val subscriptionPipe: Pipe[F, A, A] = in => {
6365
def go(s: Stream[F, A]): Pull[F, A, Unit] =
6466
Pull.eval(F.delay(requests.get())).flatMap { n =>
@@ -133,14 +135,14 @@ private[flow] final class StreamSubscription[F[_], A] private (
133135
// then the request must be a NOOP.
134136
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
135137
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
136-
override def cancel(): Unit = {
138+
override final def cancel(): Unit = {
137139
val cancelCB = canceled.getAndSet(null)
138140
if (cancelCB ne null) {
139141
cancelCB.apply()
140142
}
141143
}
142144

143-
override def request(n: Long): Unit =
145+
override final def request(n: Long): Unit =
144146
// First, confirm we are not yet cancelled.
145147
if (canceled.get() ne null) {
146148
// Second, ensure we were requested a positive number of elements.

core/shared/src/main/scala/fs2/interop/flow/package.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
package fs2
2323
package interop
2424

25+
import cats.effect.IO
2526
import cats.effect.kernel.{Async, Resource}
27+
import cats.effect.unsafe.IORuntime
2628

2729
import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}
2830

@@ -100,7 +102,7 @@ package object flow {
100102
subscriber.stream(subscribe(subscriber))
101103
}
102104

103-
/** Creates a [[Stream]] from an [[Publisher]].
105+
/** Creates a [[Stream]] from a [[Publisher]].
104106
*
105107
* @example {{{
106108
* scala> import cats.effect.IO
@@ -116,7 +118,7 @@ package object flow {
116118
* res0: Stream[IO, Int] = Stream(..)
117119
* }}}
118120
*
119-
* @note The publisher will not receive a subscriber until the stream is run.
121+
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
120122
*
121123
* @see the `toStream` extension method added to `Publisher`
122124
*
@@ -134,12 +136,15 @@ package object flow {
134136
/** Creates a [[Publisher]] from a [[Stream]].
135137
*
136138
* The stream is only ran when elements are requested.
137-
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
139+
* Closing the [[Resource]] means not accepting new subscriptions,
140+
* but waiting for all active ones to finish consuming.
141+
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
138142
* Thus, no more elements will be published.
139143
*
140-
* @note This Publisher can be reused for multiple Subscribers,
141-
* each subscription will re-run the [[Stream]] from the beginning.
144+
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
145+
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
142146
*
147+
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
143148
* @see [[subscribeStream]] for a simpler version that only requires a [[Subscriber]].
144149
*
145150
* @param stream The [[Stream]] to transform.
@@ -151,6 +156,24 @@ package object flow {
151156
): Resource[F, Publisher[A]] =
152157
StreamPublisher(stream)
153158

159+
/** Creates a [[Publisher]] from a [[Stream]].
160+
*
161+
* The stream is only ran when elements are requested.
162+
*
163+
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
164+
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
165+
*
166+
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
167+
*
168+
* @param stream The [[Stream]] to transform.
169+
*/
170+
def unsafeToPublisher[A](
171+
stream: Stream[IO, A]
172+
)(implicit
173+
runtime: IORuntime
174+
): Publisher[A] =
175+
StreamPublisher.unsafe(stream)
176+
154177
/** Allows subscribing a [[Subscriber]] to a [[Stream]].
155178
*
156179
* The returned program will run until

core/shared/src/main/scala/fs2/interop/flow/syntax.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@ package fs2
2323
package interop
2424
package flow
2525

26+
import cats.effect.IO
2627
import cats.effect.kernel.{Async, Resource}
28+
import cats.effect.unsafe.IORuntime
2729

2830
import java.util.concurrent.Flow.{Publisher, Subscriber}
2931

3032
object syntax {
3133
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {
3234

33-
/** Creates a [[Stream]] from an [[Publisher]].
35+
/** Creates a [[Stream]] from a [[Publisher]].
3436
*
3537
* @example {{{
3638
* scala> import cats.effect.IO
@@ -47,6 +49,8 @@ object syntax {
4749
* res0: Stream[IO, Int] = Stream(..)
4850
* }}}
4951
*
52+
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
53+
*
5054
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
5155
* A high number may be useful if the publisher is triggering from IO,
5256
* like requesting elements from a database.
@@ -63,12 +67,15 @@ object syntax {
6367
/** Creates a [[Publisher]] from a [[Stream]].
6468
*
6569
* The stream is only ran when elements are requested.
66-
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
70+
* Closing the [[Resource]] means not accepting new subscriptions,
71+
* but waiting for all active ones to finish consuming.
72+
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
6773
* Thus, no more elements will be published.
6874
*
69-
* @note This Publisher can be reused for multiple Subscribers,
70-
* each subscription will re-run the [[Stream]] from the beginning.
75+
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
76+
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
7177
*
78+
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
7279
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
7380
*/
7481
def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] =
@@ -86,6 +93,23 @@ object syntax {
8693
flow.subscribeStream(stream, subscriber)
8794
}
8895

96+
implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal {
97+
98+
/** Creates a [[Publisher]] from a [[Stream]].
99+
*
100+
* The stream is only ran when elements are requested.
101+
*
102+
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
103+
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
104+
*
105+
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
106+
*/
107+
def unsafeToPublisher()(implicit
108+
runtime: IORuntime
109+
): Publisher[A] =
110+
flow.unsafeToPublisher(stream)
111+
}
112+
89113
final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
90114
def apply[A](
91115
publisher: Publisher[A],

0 commit comments

Comments
 (0)