From 1e9d0f3c1ed3bad7bb9a4a2e0e9188b15591c0d0 Mon Sep 17 00:00:00 2001 From: Nicolas VERINAUD Date: Tue, 29 Nov 2016 14:39:31 +0100 Subject: [PATCH] Adds -[RACSignal amb:]. --- ReactiveObjC/RACSignal+Operations.h | 9 +++ ReactiveObjC/RACSignal+Operations.m | 59 +++++++++++++++++++ ReactiveObjCTests/RACSignalSpec.m | 90 +++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) diff --git a/ReactiveObjC/RACSignal+Operations.h b/ReactiveObjC/RACSignal+Operations.h index 3cdeedb57..8392c0c3c 100644 --- a/ReactiveObjC/RACSignal+Operations.h +++ b/ReactiveObjC/RACSignal+Operations.h @@ -208,6 +208,15 @@ extern const NSInteger RACSignalErrorNoMatchingCase; /// the returned signal sends `error` immediately. + (RACSignal *)merge:(id)signals; +/// Emits all of the items from only the first of the receiver or the given signal +/// to emit an item or notification. +/// +/// `amb` will pass through the emissions and notifications of exactly one of the receiver +/// or given signal : the first one that sends a notification to `amb`, either by emitting an +/// item or sending an onError or onCompleted notification. `amb` will ignore and discard the +/// emissions and notifications of the other source signals. +- (RACSignal *)amb:(RACSignal *)signal; + /// Merges the signals sent by the receiver into a flattened signal, but only /// subscribes to `maxConcurrent` number of signals at a time. New signals are /// queued and subscribed to as other signals complete. diff --git a/ReactiveObjC/RACSignal+Operations.m b/ReactiveObjC/RACSignal+Operations.m index ec250a490..df6134e4d 100644 --- a/ReactiveObjC/RACSignal+Operations.m +++ b/ReactiveObjC/RACSignal+Operations.m @@ -494,6 +494,65 @@ + (RACSignal *)merge:(id)signals { setNameWithFormat:@"+merge: %@", copiedSignals]; } +- (RACSignal *)amb:(RACSignal *)signal { + NSParameterAssert(signal); + + return [[RACSignal + createSignal:^(id subscriber) { + __block RACDisposable *d = nil; + __block BOOL hasChosen = NO; + NSRecursiveLock *lock = NSRecursiveLock.new; + + RACDisposable * (^decide)(RACSignal *, RACDisposable *) = ^(RACSignal *s, RACDisposable *disposeMe) { + return [s subscribeNext:^(id _Nullable value) { + [lock lock]; + + if (!hasChosen) { + hasChosen = YES; + [disposeMe dispose]; + [subscriber sendNext:value]; + d = [s subscribe:subscriber]; + } + + [lock unlock]; + } error:^(NSError *error) { + [lock lock]; + + if (!hasChosen) { + hasChosen = YES; + [disposeMe dispose]; + [subscriber sendError:error]; + } + + [lock unlock]; + } completed:^{ + [lock lock]; + + if (!hasChosen) { + hasChosen = YES; + [disposeMe dispose]; + [subscriber sendCompleted]; + } + + [lock unlock]; + }]; + }; + + RACSerialDisposable *d1 = [RACSerialDisposable serialDisposableWithDisposable:nil]; + RACSerialDisposable *d2 = [RACSerialDisposable serialDisposableWithDisposable:nil]; + + d1.disposable = decide(self, d2); + d2.disposable = decide(signal, d1); + + return [RACDisposable disposableWithBlock:^{ + [d dispose]; + [d1 dispose]; + [d2 dispose]; + }]; + }] + setNameWithFormat:@"[%@] -amb: %@", self.name, signal]; +} + - (RACSignal *)flatten:(NSUInteger)maxConcurrent { return [[RACSignal createSignal:^(id subscriber) { RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init]; diff --git a/ReactiveObjCTests/RACSignalSpec.m b/ReactiveObjCTests/RACSignalSpec.m index bc65fd547..9f8cba430 100644 --- a/ReactiveObjCTests/RACSignalSpec.m +++ b/ReactiveObjCTests/RACSignalSpec.m @@ -1681,6 +1681,96 @@ + (void)configure:(Configuration *)configuration { }); }); +qck_describe(@"-amb:", ^{ + __block RACSubject *sub1; + __block RACSubject *sub2; + __block RACSignal *ambed; + + qck_beforeEach(^{ + sub1 = [RACSubject subject]; + sub2 = [RACSubject subject]; + ambed = [sub1 amb:sub2]; + }); + + qck_it(@"should send all values from first signal to emit a value or a notification", ^{ + __block NSUInteger nextCount = 0; + [ambed subscribeNext:^(id _Nullable _) { + nextCount++; + }]; + + [sub1 sendNext:nil]; + [sub2 sendNext:nil]; + [sub1 sendNext:nil]; + [sub2 sendNext:nil]; + + expect(@(nextCount)).to(equal(@2)); + }); + + qck_it(@"should complete if the first signal completes", ^{ + __block BOOL hasCompleted = NO; + __block NSUInteger nextCount = 0; + [ambed subscribeNext:^(id _Nullable _) { + nextCount++; + } completed:^{ + hasCompleted = YES; + }]; + + [sub1 sendCompleted]; + [sub2 sendNext:nil]; + + expect(@(hasCompleted)).to(beTruthy()); + expect(@(nextCount)).to(equal(@0)); + }); + + qck_it(@"should error if the first signal errors", ^{ + __block BOOL hasError = NO; + __block NSUInteger nextCount = 0; + [ambed subscribeNext:^(id _Nullable _) { + nextCount++; + } error:^(NSError * _) { + hasError = YES; + }]; + + [sub1 sendError:nil]; + [sub2 sendNext:nil]; + + expect(@(hasError)).to(beTruthy()); + expect(@(nextCount)).to(equal(@0)); + }); + + qck_it(@"should not complete if the second signal completes", ^{ + __block BOOL hasCompleted = NO; + __block NSUInteger nextCount = 0; + [ambed subscribeNext:^(id _Nullable _) { + nextCount++; + } completed:^{ + hasCompleted = YES; + }]; + + [sub1 sendNext:nil]; + [sub2 sendCompleted]; + + expect(@(hasCompleted)).to(beFalsy()); + expect(@(nextCount)).to(equal(@1)); + }); + + qck_it(@"should not error if the second signal errors", ^{ + __block BOOL hasError = NO; + __block NSUInteger nextCount = 0; + [ambed subscribeNext:^(id _Nullable _) { + nextCount++; + } error:^(NSError * _) { + hasError = YES; + }]; + + [sub1 sendNext:nil]; + [sub2 sendError:nil]; + + expect(@(hasError)).to(beFalsy()); + expect(@(nextCount)).to(equal(@1)); + }); +}); + qck_describe(@"-flatten:", ^{ __block BOOL subscribedTo1 = NO; __block BOOL subscribedTo2 = NO;