-
Notifications
You must be signed in to change notification settings - Fork 560
Rewrote parTraverseN
and parTraverseN_
for better performance
#4451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.6.x
Are you sure you want to change the base?
Conversation
Pros and cons on performance, though I think it's possible to do better here. It's a little bit slower than the previous implementation in the happy path, but it's several orders of magnitude faster in the error path so I'll call that a win. Before
After
|
So I haven't golfed the failure down yet, but it really looks like we're hitting a bug in Scala.js, probably stemming from the "null safe" test. @durban you may be amused I think we could just remove the null safe test now since we're not using an |
Well, "amused" is one word for it :-) So it's not a bug in Scala.js, as in, it behaves as documented: dereferencing |
Well that's fun. I actually thought we had some special checking for when the |
That we do check. It's this line: https://github.com/typelevel/cats-effect/blob/series/3.x/core/shared/src/main/scala/cats/effect/IO.scala#L2024 (and |
Ahhhhhhh that makes sense. Okay, by that token, I think it's fair to say that a lot of our combinators just aren't |
It's annoying, because in Scala they are null safe. (The test passed before, it just failed on JS.) We'd have to do something like this (everywhere), to make it work on JS: def combinator(fa: F[A], ...) = {
if (fa eq null) throw new NullPointerException
} Which is (1) annoying, (2) very redundant, except on Scala.js, and (3) apparently has performance problems in Scala.js (or maybe that's only the linker setting?). I don't propose we do this. There is a Scala.js linker setting which fixes the problem. In Scala and Scala Native it works by default. |
*/ | ||
def parTraverseN[T[_]: Traverse, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[T[B]] = { | ||
require(n >= 1, s"Concurrency limit should be at least 1, was: $n") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scaladoc needs an update above.
*/ | ||
def parTraverseN_[T[_]: Foldable, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[Unit] = { | ||
require(n >= 1, s"Concurrency limit should be at least 1, was: $n") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scaladoc above.
case None => | ||
F.uncancelable { poll => | ||
F.deferred[Outcome[F, E, B]] flatMap { result => | ||
val action = poll(sem.acquire) >> f(a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentionally >>
and not *>
? So that evaluating the pure f
is also restricted by the semaphore? (In my opinion it doesn't need to be, but it's okay that it is.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's intentional. I should probably comment it as such. I think most users probably believe that even the pure part of the function is parallelized (and subject to the semaphore).
result | ||
.get | ||
.flatMap(_.embed(F.canceled *> F.never)) | ||
.onCancel(fiber.cancel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this onCancel
necessary? Wouldn't the guaranteeCase
below cancel everything in supervision
anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not necessary. I've been building this up a bit incrementally so there's some overlapping logic I need to deduplicate due to the number of corner cases this function has.
// we block until it's all done by acquiring all the permits | ||
F.race(preempt.get *> cancelAll, sem.acquire.replicateA_(n)) *> | ||
// if we hit an error or self-cancelation in any effect, resurface it here | ||
// note that we can't lose errors here because of the permits: we know the fibers are done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there may be a race here:
- The very last task fails with an error, and releases its permit (
sem.release
above inwrapped
). - Acquiring all the permits here wins the
F.race
here (just above). - Just below we
preempt.tryGet
, and readNone
, and complete withF.unit
. - The task completes
preempt
with the error (above inwrapped
). - (But no one will see that any more.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good point. I do this in both implementations. My thinking was that it increases parallelism somewhat (releasing the permit asap), but it does general this race condition. I'll fix it in both.
(Just some context about the |
Could this fix hit 3.6.4? |
There are a couple failing tests related to early termination that I'm still trying to track down. Am trying to find the spare time needed to push on it. Help definitely welcome! Otherwise I'll probably get to it within the next few weeks. Sorry :( |
I see that last CI is green, do you mean that you want to reintroduce the tests removed in this commit? 599b790 |
This shifts to a fully bespoke implementation of
parTraverseN
and such. There are a few things left to clean up, such as a few more tests and running some comparative benchmarks, but early results are very promising. In particular, the failure case from #4434 appears to be around two to three orders of magnitude faster with this implementation (which makes sense, since it handles early abort correctly). Kudos to @SystemFw for the core idea which makes this possible.One of the things I'm doing here is giving up entirely on universal fairness and merely focusing on in-batch fairness. A simpler way of saying this is that we are hardened against head of line blocking, both for actions and cancelation.
Fixes #4434