Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions packages/toolkit/src/listenerMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ const cancelActiveListeners = (

const createClearListenerMiddleware = (
listenerMap: Map<string, ListenerEntry>,
executingListeners: Map<ListenerEntry, number>,
) => {
return () => {
listenerMap.forEach(cancelActiveListeners)

executingListeners.keys().forEach(cancelActiveListeners)
listenerMap.clear()
}
}
Expand Down Expand Up @@ -339,6 +339,23 @@ export const createListenerMiddleware = <
middlewareOptions: CreateListenerMiddlewareOptions<ExtraArgument> = {},
) => {
const listenerMap = new Map<string, ListenerEntry>()

// Track listeners whose effect is currently executing so clearListeners can
// abort even listeners that have become unsubscribed while executing.
const executingListeners = new Map<ListenerEntry, number>()
const trackExecutingListener = (entry: ListenerEntry) => {
const count = executingListeners.get(entry) ?? 0
executingListeners.set(entry, count + 1)
}
const untrackExecutingListener = (entry: ListenerEntry) => {
const count = executingListeners.get(entry) ?? 1
if (count === 1) {
executingListeners.delete(entry)
} else {
executingListeners.set(entry, count - 1)
}
}

const { extra, onError = defaultErrorHandler } = middlewareOptions

assertFunction(onError, 'onError')
Expand Down Expand Up @@ -401,6 +418,7 @@ export const createListenerMiddleware = <

try {
entry.pending.add(internalTaskController)
trackExecutingListener(entry)
await Promise.resolve(
entry.effect(
action,
Expand Down Expand Up @@ -452,11 +470,15 @@ export const createListenerMiddleware = <
await Promise.all(autoJoinPromises)

abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed
untrackExecutingListener(entry)
entry.pending.delete(internalTaskController)
}
}

const clearListenerMiddleware = createClearListenerMiddleware(listenerMap)
const clearListenerMiddleware = createClearListenerMiddleware(
listenerMap,
executingListeners,
)

const middleware: ListenerMiddleware<
StateType,
Expand Down
131 changes: 131 additions & 0 deletions packages/toolkit/src/listenerMiddleware/tests/effectScenarios.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,135 @@ describe('Saga-style Effects Scenarios', () => {

expect(canceledCheck).toBe(true)
})

test('long-running listener with immediate unsubscribe is cancelable', async () => {
let runCount = 0
let abortCount = 0

startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
runCount++

// Stop listening for this action
listenerApi.unsubscribe()

try {
// Wait indefinitely
await listenerApi.condition(() => false)
} catch (err) {
if (err instanceof TaskAbortError) {
abortCount++
}
}
},
})

// First action starts the listener, which unsubscribes
store.dispatch(increment())
expect(runCount).toBe(1)

// Verify that the first action unsubscribed the listener
store.dispatch(increment())
expect(runCount).toBe(1)

// Now call clearListeners, which should abort the running effect, even
// though the listener is no longer subscribed
listenerMiddleware.clearListeners()
await delay(0)

expect(abortCount).toBe(1)
})

test('long-running listener with unsubscribe race is cancelable', async () => {
let runCount = 0
let abortCount = 0

startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
runCount++

if (runCount === 2) {
// On the second run, stop listening for this action
listenerApi.unsubscribe()
return
}

try {
// Wait indefinitely
await listenerApi.condition(() => false)
} catch (err) {
if (err instanceof TaskAbortError) {
abortCount++
}
}
},
})

// First action starts the hanging effect
store.dispatch(increment())
expect(runCount).toBe(1)

// Second action starts the fast effect, which unsubscribes
store.dispatch(increment())
expect(runCount).toBe(2)

// Third action should be a noop
store.dispatch(increment())
expect(runCount).toBe(2)

// The hanging effect should still be hanging
expect(abortCount).toBe(0)

// Now call clearListeners, which should abort the hanging effect, even
// though the listener is no longer subscribed
listenerMiddleware.clearListeners()
await delay(0)

expect(abortCount).toBe(1)
})

test('long-running listener with immediate unsubscribe and forked child is cancelable', async () => {
let outerAborted = false
let innerAborted = false

startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
// Stop listening for this action
listenerApi.unsubscribe()

const pollingTask = listenerApi.fork(async (forkApi) => {
try {
// Cancellation-aware indefinite pause
await forkApi.pause(new Promise(() => {}))
} catch (err) {
if (err instanceof TaskAbortError) {
innerAborted = true
}
}
})

try {
// Wait indefinitely
await listenerApi.condition(() => false)
pollingTask.cancel()
} catch (err) {
if (err instanceof TaskAbortError) {
outerAborted = true
}
}
},
})

store.dispatch(increment())
await delay(0)

listenerMiddleware.clearListeners()
await delay(0)

expect(outerAborted).toBe(true)
expect(innerAborted).toBe(true)
})
})