Skip to content

Commit 1de27e2

Browse files
committed
Add evalScan1 operator
1 parent 74a9ee0 commit 1de27e2

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,29 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
10421042
(Pull.output1(z) >> go(z, this)).stream
10431043
}
10441044

1045+
/** Like `[[Stream#scan1]]`, but accepts a function returning an `F[_]`.
1046+
*
1047+
* @example {{{
1048+
* scala> import cats.effect.SyncIO
1049+
* scala> Stream(1,2,3,4).covary[SyncIO].evalScan1((acc,i) => SyncIO(acc + i)).compile.toVector.unsafeRunSync()
1050+
* res0: Vector[Int] = Vector(1, 3, 6, 10)
1051+
* }}}
1052+
*/
1053+
def evalScan1[F2[x] >: F[x], O2 >: O](f: (O2, O2) => F2[O2]): fs2.Stream[F2, O2] = {
1054+
def go(z: O2, s: fs2.Stream[F2, O]): Pull[F2, O2, Unit] =
1055+
s.pull.uncons1.flatMap {
1056+
case Some((hd, tl)) =>
1057+
Pull.eval(f(z, hd)).flatMap(o => Pull.output1(o) >> go(o, tl))
1058+
case None => Pull.done
1059+
}
1060+
this.pull.uncons.flatMap {
1061+
case None => Pull.done
1062+
case Some((hd, tl)) =>
1063+
val (pre, post) = hd.splitAt(1)
1064+
Pull.output(pre) >> go(pre(0), tl.cons(post))
1065+
}.stream
1066+
}
1067+
10451068
/** Like `observe` but observes with a function `O => F[O2]` instead of a pipe.
10461069
* Not as powerful as `observe` since not all pipes can be represented by `O => F[O2]`, but much faster.
10471070
* Alias for `evalMap(o => f(o).as(o))`.

core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,19 @@ class StreamCombinatorsSuite extends Fs2Suite {
480480
}
481481
}
482482

483+
test("evalScan1") {
484+
forAllF { (s: Stream[Pure, Int]) =>
485+
val v = s.toVector
486+
val f = (a: Int, b: Int) => a + b
487+
val g = (a: Int, b: Int) => IO.pure(a + b)
488+
s.covary[IO]
489+
.evalScan1(g)
490+
.assertEmits(
491+
v.headOption.fold(Vector.empty[Int])(h => v.drop(1).scanLeft(h)(f)).toList
492+
)
493+
}
494+
}
495+
483496
test("every".flaky) {
484497
type BD = (Boolean, FiniteDuration)
485498
def durationSinceLastTrue[F[_]]: Pipe[F, BD, BD] = {

0 commit comments

Comments
 (0)