|
| 1 | +@file:OptIn(DelicateCoroutinesApi::class) |
| 2 | + |
| 3 | +package org.funfix.tasks.kotlin |
| 4 | + |
| 5 | +import kotlinx.coroutines.CancellableContinuation |
| 6 | +import kotlinx.coroutines.CoroutineDispatcher |
| 7 | +import kotlinx.coroutines.DelicateCoroutinesApi |
| 8 | +import kotlinx.coroutines.Dispatchers |
| 9 | +import kotlinx.coroutines.GlobalScope |
| 10 | +import kotlinx.coroutines.launch |
| 11 | +import kotlinx.coroutines.suspendCancellableCoroutine |
| 12 | +import kotlin.coroutines.CoroutineContext |
| 13 | +import kotlin.coroutines.EmptyCoroutineContext |
| 14 | +import kotlin.coroutines.cancellation.CancellationException |
| 15 | +import kotlin.coroutines.resumeWithException |
| 16 | + |
| 17 | +public actual suspend fun <T> PlatformTask<out T>.runSuspended( |
| 18 | + executor: Executor? |
| 19 | +): T = run { |
| 20 | + val executorOrDefault = executor ?: buildExecutor(currentDispatcher()) |
| 21 | + suspendCancellableCoroutine { cont -> |
| 22 | + val contCallback = cont.asCompletionCallback() |
| 23 | + try { |
| 24 | + val token = this.invoke(executorOrDefault, contCallback) |
| 25 | + cont.invokeOnCancellation { |
| 26 | + token.cancel() |
| 27 | + } |
| 28 | + } catch (e: Throwable) { |
| 29 | + UncaughtExceptionHandler.rethrowIfFatal(e) |
| 30 | + contCallback(Outcome.Failure(e)) |
| 31 | + } |
| 32 | + } |
| 33 | +} |
| 34 | + |
| 35 | +internal fun buildExecutor(dispatcher: CoroutineDispatcher): Executor = |
| 36 | + DispatcherExecutor(dispatcher) |
| 37 | + |
| 38 | +internal fun buildCoroutineDispatcher( |
| 39 | + @Suppress("UNUSED_PARAMETER") executor: Executor |
| 40 | +): CoroutineDispatcher = |
| 41 | + // Building this CoroutineDispatcher from an Executor is problematic, and there's no |
| 42 | + // point in even trying on top of JS engines. |
| 43 | + Dispatchers.Default |
| 44 | + |
| 45 | +private class DispatcherExecutor(val dispatcher: CoroutineDispatcher) : Executor { |
| 46 | + override fun execute(command: Runnable) { |
| 47 | + if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { |
| 48 | + dispatcher.dispatch( |
| 49 | + EmptyCoroutineContext, |
| 50 | + kotlinx.coroutines.Runnable { command.run() } |
| 51 | + ) |
| 52 | + } else { |
| 53 | + command.run() |
| 54 | + } |
| 55 | + } |
| 56 | + |
| 57 | + override fun toString(): String = |
| 58 | + dispatcher.toString() |
| 59 | +} |
| 60 | + |
| 61 | +internal fun <T> CancellableContinuation<T>.asCompletionCallback(): Callback<T> { |
| 62 | + var isActive = true |
| 63 | + return { outcome -> |
| 64 | + if (outcome is Outcome.Failure) { |
| 65 | + UncaughtExceptionHandler.rethrowIfFatal(outcome.exception) |
| 66 | + } |
| 67 | + if (isActive) { |
| 68 | + isActive = false |
| 69 | + when (outcome) { |
| 70 | + is Outcome.Success -> |
| 71 | + resume(outcome.value) { _, _, _ -> |
| 72 | + // on cancellation? |
| 73 | + } |
| 74 | + is Outcome.Failure -> |
| 75 | + resumeWithException(outcome.exception) |
| 76 | + is Outcome.Cancellation -> |
| 77 | + resumeWithException(kotlinx.coroutines.CancellationException()) |
| 78 | + } |
| 79 | + } else if (outcome is Outcome.Failure) { |
| 80 | + UncaughtExceptionHandler.logOrRethrow(outcome.exception) |
| 81 | + } |
| 82 | + } |
| 83 | +} |
| 84 | + |
| 85 | +/** |
| 86 | + * Creates a [Task] from a suspended block of code. |
| 87 | + */ |
| 88 | +public actual suspend fun <T> Task.Companion.fromSuspended( |
| 89 | + coroutineContext: CoroutineContext, |
| 90 | + block: suspend () -> T |
| 91 | +): Task<T> = |
| 92 | + Task.fromAsync { executor, callback -> |
| 93 | + val job = GlobalScope.launch( |
| 94 | + buildCoroutineDispatcher(executor) + coroutineContext |
| 95 | + ) { |
| 96 | + try { |
| 97 | + val r = block() |
| 98 | + callback(Outcome.Success(r)) |
| 99 | + } catch (e: Throwable) { |
| 100 | + UncaughtExceptionHandler.rethrowIfFatal(e) |
| 101 | + when (e) { |
| 102 | + is CancellationException, is TaskCancellationException -> |
| 103 | + callback(Outcome.Cancellation) |
| 104 | + else -> |
| 105 | + callback(Outcome.Failure(e)) |
| 106 | + } |
| 107 | + } |
| 108 | + } |
| 109 | + Cancellable { |
| 110 | + job.cancel() |
| 111 | + } |
| 112 | + } |
| 113 | + |
| 114 | +public actual suspend fun <T> Task<T>.runSuspended(executor: Executor?): T = |
| 115 | + asPlatform.runSuspended(executor) |
0 commit comments