Skip to content
This repository was archived by the owner on May 29, 2023. It is now read-only.

Commit b20aad5

Browse files
committed
fix: close all subscriptions, if the worker was terminated
1 parent 1b75cdd commit b20aad5

File tree

3 files changed

+61
-14
lines changed

3 files changed

+61
-14
lines changed

projects/workers/src/worker/classes/web-worker.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,25 @@ describe('WebWorker', () => {
8989
'Uncaught reason',
9090
);
9191
});
92+
93+
it('should close all subscriptions, if the worker was terminated', async () => {
94+
const worker = WebWorker.fromFunction<void, string>(() => 'some data');
95+
96+
const subscriptions = [
97+
worker.subscribe(),
98+
worker.subscribe(),
99+
worker.subscribe(),
100+
];
101+
102+
worker.terminate();
103+
expect(subscriptions.map(s => s.closed)).toEqual([true, true, true]);
104+
});
105+
106+
it("shouldn't throw any errors, if the worker was terminated twice", async () => {
107+
const worker = WebWorker.fromFunction<void, string>(() => 'some data');
108+
109+
worker.terminate();
110+
worker.terminate();
111+
expect(await worker.toPromise()).toBeUndefined();
112+
});
92113
});

projects/workers/src/worker/classes/web-worker.ts

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {EMPTY, fromEvent, merge, Observable} from 'rxjs';
1+
import {EMPTY, fromEvent, merge, Observable, Observer} from 'rxjs';
22
import {take, tap} from 'rxjs/operators';
33
import {WORKER_BLANK_FN} from '../consts/worker-fn-template';
44
import {TypedMessageEvent} from '../types/typed-message-event';
@@ -7,6 +7,8 @@ import {WorkerFunction} from '../types/worker-function';
77
export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>> {
88
private readonly worker: Worker | undefined;
99
private readonly url: string;
10+
private isStopped: boolean;
11+
private observers: Observer<TypedMessageEvent<R>>[];
1012

1113
constructor(url: string, options?: WorkerOptions) {
1214
let worker: Worker | undefined;
@@ -19,26 +21,33 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
1921
}
2022

2123
super(subscriber => {
24+
let eventStream$: Observable<TypedMessageEvent<R> | ErrorEvent> = EMPTY;
25+
2226
if (error) {
2327
subscriber.error(error);
28+
} else if (this.isStopped) {
29+
subscriber.complete();
30+
} else if (worker) {
31+
eventStream$ = merge(
32+
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
33+
tap(event => subscriber.next(event)),
34+
),
35+
fromEvent<ErrorEvent>(worker, 'error').pipe(
36+
tap(event => subscriber.error(event)),
37+
),
38+
);
39+
40+
this.observers.push(subscriber);
2441
}
2542

26-
const eventStream$ = worker
27-
? merge(
28-
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
29-
tap(event => subscriber.next(event)),
30-
),
31-
fromEvent<ErrorEvent>(worker, 'error').pipe(
32-
tap(event => subscriber.error(event)),
33-
),
34-
)
35-
: EMPTY;
36-
3743
return eventStream$.subscribe();
3844
});
3945

4046
this.worker = worker;
4147
this.url = url;
48+
49+
this.isStopped = false;
50+
this.observers = [];
4251
}
4352

4453
static fromFunction<T, R>(
@@ -57,7 +66,11 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
5766

5867
worker.postMessage(data);
5968

60-
return promise;
69+
return promise.then(result => {
70+
worker.terminate();
71+
72+
return result;
73+
});
6174
}
6275

6376
private static createFnUrl(fn: WorkerFunction): string {
@@ -69,11 +82,23 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
6982
}
7083

7184
terminate() {
85+
if (this.isStopped) {
86+
return;
87+
}
88+
7289
if (this.worker) {
7390
this.worker.terminate();
7491
}
7592

7693
URL.revokeObjectURL(this.url);
94+
95+
this.isStopped = true;
96+
this.observers.forEach(observer => {
97+
if (!observer.closed) {
98+
observer.complete();
99+
}
100+
});
101+
this.observers = [];
77102
}
78103

79104
postMessage(value: T) {

projects/workers/src/worker/pipes/worker.pipe.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ export class WorkerPipe implements PipeTransform {
1212
private observers = new WeakMap<WebWorker, Observable<any>>();
1313

1414
transform<T, R>(value: T, fn: WorkerFunction<T, R>): Observable<R> {
15-
const worker = this.workers.get(fn) || WebWorker.fromFunction(fn);
15+
const worker: WebWorker<T, R> =
16+
this.workers.get(fn) || WebWorker.fromFunction(fn);
1617

1718
this.workers.set(fn, worker);
1819
worker.postMessage(value);

0 commit comments

Comments
 (0)