Skip to content

Commit b82b334

Browse files
committed
fix(broker): issue with wrongly intercepting Subject next and publish
1 parent 822314d commit b82b334

File tree

2 files changed

+50
-21
lines changed

2 files changed

+50
-21
lines changed

packages/broker/src/broker.ts

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -219,31 +219,33 @@ export class BrokerQueueChannel<T> {
219219

220220
export class RefCountedSubject<T> extends Subject<T> {
221221
private refCount = 0;
222-
private readonly onFirst: () => void;
223-
private readonly onLast: () => void;
224-
public skipPublish = 0;
225222

226-
constructor(onFirst: () => void, onLast: () => void) {
223+
constructor(protected onFirst: () => void, protected onLast: () => void, protected onPublish: (value: T) => void) {
227224
super();
228-
this.onFirst = onFirst;
229-
this.onLast = onLast;
230225
}
231226

232227
// @ts-ignore
233228
override subscribe(...args: Parameters<Subject<T>['subscribe']>): Subscription {
234-
if (this.refCount++ === 1) {
235-
// We skip 1
229+
if (this.refCount++ === 0) {
236230
this.onFirst();
237231
}
238232

239233
const sub = super.subscribe(...args);
240234
sub.add(() => {
241-
if (--this.refCount === 1) {
235+
if (--this.refCount === 0) {
242236
this.onLast();
243237
}
244238
});
245239
return sub;
246240
}
241+
242+
override next(value: T, publish = true): void {
243+
if (publish) {
244+
this.onPublish(value);
245+
} else {
246+
super.next(value);
247+
}
248+
}
247249
}
248250

249251
const subjectFinalizer = new FinalizationRegistry<{
@@ -275,10 +277,7 @@ class BrokerBusSubjectHandle {
275277
this.releaseChannel = this.channel.subscribe(value => {
276278
for (const subjectRef of this.subjects) {
277279
const subject = subjectRef.deref();
278-
if (subject) {
279-
subject.skipPublish++;
280-
subject.next(value);
281-
}
280+
if (subject) subject.next(value, false);
282281
}
283282
}).catch((e) => {
284283
this.errorHandler.subscribeFailed(this.channel.name, ensureError(e));
@@ -295,19 +294,15 @@ class BrokerBusSubjectHandle {
295294
() => {
296295
this.releaseSubject(subjectRef!);
297296
},
297+
(value) => {
298+
this.publish(value);
299+
},
298300
);
299301
subjectRef = new WeakRef(subject);
300302
subjectFinalizer.register(subject, {
301303
handle: this,
302304
subjectRef,
303305
});
304-
subject.subscribe(value => {
305-
if (subject.skipPublish) {
306-
subject.skipPublish--;
307-
return;
308-
}
309-
this.publish(value);
310-
});
311306
return subject;
312307
}
313308

packages/broker/tests/broker.spec.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,32 +181,66 @@ test('bus subject gc', async () => {
181181
});
182182

183183
test('bus subject 1', async () => {
184-
185184
const bus = new BrokerBus(await adapterFactory());
186185
const handles: BrokerBus['subjectHandles'] = (bus as any).subjectHandles;
187186

188187
type Events = { type: 'user-created', id: number } | { type: 'user-deleted', id: number };
189188

190189
const caughtEvents: Events[] = [];
190+
const caughtEvents2: Events[] = [];
191191

192192
const subject1 = bus.subject<Events>('/events');
193193
const subject2 = bus.subject<Events>('/events');
194194
expect(handles.get('/events')!.isSubscribed).toBe(false);
195195
const sub = subject2.subscribe((event) => {
196196
caughtEvents.push(event);
197197
});
198+
const sub2 = subject1.subscribe((event) => {
199+
caughtEvents2.push(event);
200+
});
198201
expect(handles.get('/events')!.isSubscribed).toBe(true);
199202

200203
subject1.next({ type: 'user-created', id: 2 });
201204
await sleep(0.1);
202205
expect(handles.size).toBe(1);
203206
expect(caughtEvents.length).toBe(1);
207+
expect(caughtEvents2.length).toBe(1);
204208
sub.unsubscribe();
209+
sub2.unsubscribe();
205210
await sleep(0.1);
206211
expect(handles.size).toBe(0);
207212
});
208213

209214
test('bus subject 2', async () => {
215+
const adapter = await adapterFactory();
216+
const bus1 = new BrokerBus(adapter);
217+
const bus2 = new BrokerBus(adapter);
218+
219+
type Events = { type: 'user-created', id: number } | { type: 'user-deleted', id: number };
220+
221+
const caughtEvents: Events[] = [];
222+
const caughtEvents2: Events[] = [];
223+
224+
const subject1 = bus1.subject<Events>('/events');
225+
const subject2 = bus2.subject<Events>('/events');
226+
const sub = subject2.subscribe((event) => {
227+
caughtEvents.push(event);
228+
});
229+
const sub2 = subject1.subscribe((event) => {
230+
caughtEvents2.push(event);
231+
});
232+
233+
await sleep(0.1);
234+
subject1.next({ type: 'user-created', id: 2 });
235+
await sleep(0.1);
236+
expect(caughtEvents.length).toBe(1);
237+
expect(caughtEvents2.length).toBe(1);
238+
sub.unsubscribe();
239+
sub2.unsubscribe();
240+
await sleep(0.1);
241+
});
242+
243+
test('bus subject 3', async () => {
210244

211245
const bus = new BrokerBus(await adapterFactory());
212246
const handles: BrokerBus['subjectHandles'] = (bus as any).subjectHandles;

0 commit comments

Comments
 (0)