Skip to content

Commit 7020e37

Browse files
committed
Add setters for the failure and starvation reporters in IOApp
1 parent dddaae1 commit 7020e37

File tree

3 files changed

+193
-14
lines changed

3 files changed

+193
-14
lines changed

core/js/src/main/scala/cats/effect/IOApp.scala

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package cats.effect
1818

1919
import cats.effect.metrics.{CpuStarvationWarningMetrics, JsCpuStarvationMetrics}
20-
import cats.effect.std.Console
2120
import cats.effect.tracing.TracingConstants._
2221

2322
import scala.concurrent.CancellationException
@@ -147,6 +146,14 @@ trait IOApp {
147146

148147
private[this] var _runtime: unsafe.IORuntime = null
149148

149+
@volatile
150+
private[this] var _failureReporter: Throwable => IO[Unit] =
151+
err => reportFailure(err)
152+
153+
@volatile
154+
private[this] var _onCpuStarvationReporter: CpuStarvationWarningMetrics => IO[Unit] =
155+
metrics => onCpuStarvationWarn(metrics)
156+
150157
/**
151158
* The runtime which will be used by `IOApp` to evaluate the [[IO]] produced by the `run`
152159
* method. This may be overridden by `IOApp` implementations which have extremely specialized
@@ -175,16 +182,58 @@ trait IOApp {
175182
* runtime. With that said, some care should be taken to avoid raising unhandled errors as a
176183
* result of handling unhandled errors, since that will result in the obvious chaos.
177184
*/
185+
@deprecatedOverriding("Use setFailureReporter instead", "3.6.0")
178186
protected def reportFailure(err: Throwable): IO[Unit] =
179-
Console[IO].printStackTrace(err)
187+
IO.consoleForIO.printStackTrace(err)
188+
189+
/**
190+
* Configures the action to perform when unhandled errors are caught by the runtime.
191+
*
192+
* An unhandled error is an error that is raised (and not handled) on a Fiber that nobody is
193+
* joining.
194+
*
195+
* For example:
196+
*
197+
* {{{
198+
* import scala.concurrent.duration._
199+
* override def run: IO[Unit] = IO(throw new Exception("")).start *> IO.sleep(1.second)
200+
* }}}
201+
*
202+
* In this case, the exception is raised on a Fiber with no listeners. Nobody would be
203+
* notified about that error. Therefore it is unhandled, and it goes through the reportFailure
204+
* mechanism.
205+
*
206+
* By default, the runtime simply delegates to [[cats.effect.std.Console!.printStackTrace]].
207+
* It is safe to perform any `IO` action within this handler; it will not block the progress
208+
* of the runtime. With that said, some care should be taken to avoid raising unhandled errors
209+
* as a result of handling unhandled errors, since that will result in the obvious chaos.
210+
*/
211+
protected final def setFailureReporter(
212+
reporter: Throwable => IO[Unit]
213+
): IO[Unit] =
214+
IO {
215+
_failureReporter = reporter
216+
}
180217

181218
/**
182219
* Defines what to do when CpuStarvationCheck is triggered. Defaults to log a warning to
183220
* System.err.
184221
*/
222+
@deprecatedOverriding("Use setOnCpuStarvationReporter instead", "3.6.0")
185223
protected def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
186224
CpuStarvationCheck.logWarning(metrics)
187225

226+
/**
227+
* Configures what to do when CpuStarvationCheck is triggered. By default the runtime logs a
228+
* warning to System.err.
229+
*/
230+
protected final def setOnCpuStarvationReporter(
231+
reporter: CpuStarvationWarningMetrics => IO[Unit]
232+
): IO[Unit] =
233+
IO {
234+
_onCpuStarvationReporter = reporter
235+
}
236+
188237
/**
189238
* The entry point for your application. Will be called by the runtime when the process is
190239
* started. If the underlying runtime supports it, any arguments passed to the process will be
@@ -206,8 +255,12 @@ trait IOApp {
206255
import unsafe.IORuntime
207256

208257
val installed = IORuntime installGlobal {
209-
val compute = IORuntime.createBatchingMacrotaskExecutor(reportFailure = t =>
210-
reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime))
258+
val compute = IORuntime.createBatchingMacrotaskExecutor(
259+
reportFailure = t =>
260+
IO(_failureReporter)
261+
.flatMap(_.apply(t))
262+
.unsafeRunAndForgetWithoutCallback()(runtime)
263+
)
211264

212265
IORuntime(
213266
compute,
@@ -260,10 +313,16 @@ trait IOApp {
260313
val fiber = Spawn[IO]
261314
.raceOutcome[ExitCode, Nothing](
262315
CpuStarvationCheck
263-
.run(runtimeConfig, JsCpuStarvationMetrics(), onCpuStarvationWarn)
316+
.run(
317+
runtimeConfig,
318+
JsCpuStarvationMetrics(),
319+
onCpuStarvationWarn =
320+
metrics => IO(_onCpuStarvationReporter).flatMap(_.apply(metrics))
321+
)
264322
.background
265323
.surround(run(argList)),
266-
keepAlive)
324+
keepAlive
325+
)
267326
.flatMap {
268327
case Left(Outcome.Canceled()) =>
269328
IO.raiseError(new CancellationException("IOApp main fiber was canceled"))

core/jvm/src/main/scala/cats/effect/IOApp.scala

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package cats.effect
1818

1919
import cats.effect.metrics.{CpuStarvationWarningMetrics, JvmCpuStarvationMetrics}
20-
import cats.effect.std.Console
2120
import cats.effect.tracing.TracingConstants._
2221
import cats.syntax.all._
2322

@@ -144,6 +143,14 @@ trait IOApp {
144143

145144
private[this] var _runtime: unsafe.IORuntime = null
146145

146+
@volatile
147+
private[this] var _failureReporter: Throwable => IO[Unit] =
148+
err => reportFailure(err)
149+
150+
@volatile
151+
private[this] var _onCpuStarvationReporter: CpuStarvationWarningMetrics => IO[Unit] =
152+
metrics => onCpuStarvationWarn(metrics)
153+
147154
/**
148155
* The runtime which will be used by `IOApp` to evaluate the [[IO]] produced by the `run`
149156
* method. This may be overridden by `IOApp` implementations which have extremely specialized
@@ -214,7 +221,9 @@ trait IOApp {
214221
def reportFailure(t: Throwable): Unit =
215222
t match {
216223
case t if NonFatal(t) =>
217-
IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)
224+
IO(_failureReporter)
225+
.flatMap(_.apply(t))
226+
.unsafeRunAndForgetWithoutCallback()(runtime)
218227

219228
case t =>
220229
runtime.shutdown()
@@ -254,8 +263,38 @@ trait IOApp {
254263
* should be taken to avoid raising unhandled errors as a result of handling unhandled errors,
255264
* since that will result in the obvious chaos.
256265
*/
266+
@deprecatedOverriding("Use setFailureReporter instead", "3.6.0")
257267
protected def reportFailure(err: Throwable): IO[Unit] =
258-
Console[IO].printStackTrace(err)
268+
IO.consoleForIO.printStackTrace(err)
269+
270+
/**
271+
* Configures the action to perform when unhandled errors are caught by the runtime.
272+
*
273+
* An unhandled error is an error that is raised (and not handled) on a Fiber that nobody is
274+
* joining.
275+
*
276+
* For example:
277+
*
278+
* {{{
279+
* import scala.concurrent.duration._
280+
* override def run: IO[Unit] = IO(throw new Exception("")).start *> IO.sleep(1.second)
281+
* }}}
282+
*
283+
* In this case, the exception is raised on a Fiber with no listeners. Nobody would be
284+
* notified about that error. Therefore it is unhandled, and it goes through the reportFailure
285+
* mechanism.
286+
*
287+
* By default, the runtime simply delegates to [[cats.effect.std.Console!.printStackTrace]].
288+
* It is safe to perform any `IO` action within this handler; it will not block the progress
289+
* of the runtime. With that said, some care should be taken to avoid raising unhandled errors
290+
* as a result of handling unhandled errors, since that will result in the obvious chaos.
291+
*/
292+
protected final def setFailureReporter(
293+
reporter: Throwable => IO[Unit]
294+
): IO[Unit] =
295+
IO {
296+
_failureReporter = reporter
297+
}
259298

260299
/**
261300
* Configures whether to enable blocked thread detection. This is relatively expensive so is
@@ -329,9 +368,21 @@ trait IOApp {
329368
* Defines what to do when CpuStarvationCheck is triggered. Defaults to log a warning to
330369
* System.err.
331370
*/
371+
@deprecatedOverriding("Use setOnCpuStarvationReporter instead", "3.6.0")
332372
protected def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
333373
CpuStarvationCheck.logWarning(metrics)
334374

375+
/**
376+
* Configures what to do when CpuStarvationCheck is triggered. By default the runtime logs a
377+
* warning to System.err.
378+
*/
379+
protected final def setOnCpuStarvationReporter(
380+
reporter: CpuStarvationWarningMetrics => IO[Unit]
381+
): IO[Unit] =
382+
IO {
383+
_onCpuStarvationReporter = reporter
384+
}
385+
335386
/**
336387
* Defines what to do when IOApp detects that `main` is being invoked on a `Thread` which
337388
* isn't the main process thread. This condition can happen when we are running inside of an
@@ -387,7 +438,10 @@ trait IOApp {
387438
val (compute, poller, compDown) =
388439
IORuntime.createWorkStealingComputeThreadPool(
389440
threads = computeWorkerThreadCount,
390-
reportFailure = t => reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime),
441+
reportFailure = t =>
442+
IO(_failureReporter)
443+
.flatMap(_.apply(t))
444+
.unsafeRunAndForgetWithoutCallback()(runtime),
391445
blockedThreadDetectionEnabled = blockedThreadDetectionEnabled,
392446
pollingSystem = pollingSystem
393447
)
@@ -450,7 +504,12 @@ trait IOApp {
450504
JvmCpuStarvationMetrics()
451505
.flatMap { cpuStarvationMetrics =>
452506
CpuStarvationCheck
453-
.run(runtimeConfig, cpuStarvationMetrics, onCpuStarvationWarn)
507+
.run(
508+
runtimeConfig,
509+
cpuStarvationMetrics,
510+
onCpuStarvationWarn =
511+
metrics => IO(_onCpuStarvationReporter).flatMap(_.apply(metrics))
512+
)
454513
.background
455514
}
456515
.surround(ioa)

core/native/src/main/scala/cats/effect/IOApp.scala

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ trait IOApp {
146146

147147
private[this] var _runtime: unsafe.IORuntime = null
148148

149+
@volatile
150+
private[this] var _failureReporter: Throwable => IO[Unit] =
151+
err => IO.consoleForIO.printStackTrace(err)
152+
153+
@volatile
154+
private[this] var _onCpuStarvationReporter: CpuStarvationWarningMetrics => IO[Unit] =
155+
metrics => onCpuStarvationWarn(metrics)
156+
149157
/**
150158
* The runtime which will be used by `IOApp` to evaluate the [[IO]] produced by the `run`
151159
* method. This may be overridden by `IOApp` implementations which have extremely specialized
@@ -167,13 +175,54 @@ trait IOApp {
167175
*/
168176
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig()
169177

178+
/**
179+
* Configures the action to perform when unhandled errors are caught by the runtime.
180+
*
181+
* An unhandled error is an error that is raised (and not handled) on a Fiber that nobody is
182+
* joining.
183+
*
184+
* For example:
185+
*
186+
* {{{
187+
* import scala.concurrent.duration._
188+
* override def run: IO[Unit] = IO(throw new Exception("")).start *> IO.sleep(1.second)
189+
* }}}
190+
*
191+
* In this case, the exception is raised on a Fiber with no listeners. Nobody would be
192+
* notified about that error. Therefore it is unhandled, and it goes through the reportFailure
193+
* mechanism.
194+
*
195+
* By default, the runtime simply delegates to [[cats.effect.std.Console!.printStackTrace]].
196+
* It is safe to perform any `IO` action within this handler; it will not block the progress
197+
* of the runtime. With that said, some care should be taken to avoid raising unhandled errors
198+
* as a result of handling unhandled errors, since that will result in the obvious chaos.
199+
*/
200+
protected final def setFailureReporter(
201+
reporter: Throwable => IO[Unit]
202+
): IO[Unit] =
203+
IO {
204+
_failureReporter = reporter
205+
}
206+
170207
/**
171208
* Defines what to do when CpuStarvationCheck is triggered. Defaults to log a warning to
172209
* System.err.
173210
*/
211+
@deprecatedOverriding("Use setOnCpuStarvationReporter instead", "3.6.0")
174212
protected def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
175213
CpuStarvationCheck.logWarning(metrics)
176214

215+
/**
216+
* Configures what to do when CpuStarvationCheck is triggered. By default the runtime logs a
217+
* warning to System.err.
218+
*/
219+
protected final def setOnCpuStarvationReporter(
220+
reporter: CpuStarvationWarningMetrics => IO[Unit]
221+
): IO[Unit] =
222+
IO {
223+
_onCpuStarvationReporter = reporter
224+
}
225+
177226
/**
178227
* The [[unsafe.PollingSystem]] used by the [[runtime]] which will evaluate the [[IO]]
179228
* produced by `run`. It is very unlikely that users will need to override this method.
@@ -204,6 +253,12 @@ trait IOApp {
204253
val installed = if (runtime == null) {
205254
import unsafe.IORuntime
206255

256+
// Eventually this will be needed:
257+
// reportFailure = t =>
258+
// IO(_failureReporter)
259+
// .flatMap(_.apply(t))
260+
// .unsafeRunAndForgetWithoutCallback()(runtime)
261+
207262
val installed = IORuntime installGlobal {
208263
val (loop, poller, loopDown) = IORuntime.createEventLoop(pollingSystem)
209264
IORuntime(
@@ -270,9 +325,15 @@ trait IOApp {
270325
}
271326
else Resource.unit[IO]
272327

273-
val starvationChecker = CpuStarvationCheck
274-
.run(runtimeConfig, NativeCpuStarvationMetrics(), onCpuStarvationWarn)
275-
.background
328+
val starvationChecker =
329+
CpuStarvationCheck
330+
.run(
331+
runtimeConfig,
332+
NativeCpuStarvationMetrics(),
333+
onCpuStarvationWarn =
334+
metrics => IO(_onCpuStarvationReporter).flatMap(_.apply(metrics))
335+
)
336+
.background
276337

277338
Spawn[IO]
278339
.raceOutcome[ExitCode, ExitCode](

0 commit comments

Comments
 (0)