Skip to content

Commit b576131

Browse files
authored
Add Publisher.onCompleteError (#2723)
Motivation: There are some scenarios where a stream terminating with onComplete is not expected, and translating to an error simplifies recovery.
1 parent a726670 commit b576131

File tree

4 files changed

+194
-0
lines changed

4 files changed

+194
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.api;
17+
18+
import java.util.function.Supplier;
19+
import javax.annotation.Nullable;
20+
21+
import static java.util.Objects.requireNonNull;
22+
23+
final class OnCompleteErrorPublisher<T> extends AbstractSynchronousPublisherOperator<T, T> {
24+
private final Supplier<? extends Throwable> errorSupplier;
25+
26+
OnCompleteErrorPublisher(final Publisher<T> original, final Supplier<? extends Throwable> errorSupplier) {
27+
super(original);
28+
this.errorSupplier = requireNonNull(errorSupplier);
29+
}
30+
31+
@Override
32+
public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {
33+
return new OnCompleteErrorSubscriber<>(subscriber, errorSupplier);
34+
}
35+
36+
private static final class OnCompleteErrorSubscriber<T> implements Subscriber<T> {
37+
private final Subscriber<? super T> subscriber;
38+
private final Supplier<? extends Throwable> errorSupplier;
39+
40+
private OnCompleteErrorSubscriber(final Subscriber<? super T> subscriber,
41+
final Supplier<? extends Throwable> errorSupplier) {
42+
this.subscriber = subscriber;
43+
this.errorSupplier = errorSupplier;
44+
}
45+
46+
@Override
47+
public void onSubscribe(final Subscription subscription) {
48+
subscriber.onSubscribe(subscription);
49+
}
50+
51+
@Override
52+
public void onNext(@Nullable final T t) {
53+
subscriber.onNext(t);
54+
}
55+
56+
@Override
57+
public void onError(final Throwable t) {
58+
subscriber.onError(t);
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
final Throwable cause;
64+
try {
65+
cause = errorSupplier.get();
66+
} catch (Throwable cause2) {
67+
subscriber.onError(cause2);
68+
return;
69+
}
70+
if (cause == null) {
71+
subscriber.onComplete();
72+
} else {
73+
subscriber.onError(cause);
74+
}
75+
}
76+
}
77+
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,28 @@ public final <E extends Throwable> Publisher<T> onErrorReturn(
561561
return onErrorReturn(type::isInstance, rawSupplier);
562562
}
563563

564+
/**
565+
* Transform this {@link Publisher}s {@link Subscriber#onComplete()} signal into
566+
* {@link Subscriber#onError(Throwable)} signal (unless {@code null} error returned from {@code errorSupplier}).
567+
* <p>
568+
* This method provides a data transformation in sequential programming similar to:
569+
* <pre>{@code
570+
* List<T> results = resultOfThisPublisher();
571+
* terminalOfThisPublisher();
572+
* Throwable cause = errorSupplier.get()
573+
* if (cause != null) {
574+
* throw cause;
575+
* }
576+
* }</pre>
577+
* @param errorSupplier returns the error to emit to {@link Subscriber#onError(Throwable)}. if the return value
578+
* is {@code null} then complete with {@link Subscriber#onComplete()}.
579+
* @return A {@link Publisher} which transform this {@link Publisher}s {@link Subscriber#onComplete()} signal into
580+
* {@link Subscriber#onError(Throwable)} signal (unless {@code null} error returned from {@code errorSupplier}).
581+
*/
582+
public final Publisher<T> onCompleteError(final Supplier<? extends Throwable> errorSupplier) {
583+
return new OnCompleteErrorPublisher<>(this, errorSupplier);
584+
}
585+
564586
/**
565587
* Transform errors emitted on this {@link Publisher} which match {@code predicate} into
566588
* {@link Subscriber#onNext(Object)} then {@link Subscriber#onComplete()} signals (e.g. swallows the error).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.api;
17+
18+
import io.servicetalk.concurrent.internal.DeliberateException;
19+
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;
20+
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.ValueSource;
24+
25+
import static io.servicetalk.concurrent.api.Publisher.from;
26+
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
27+
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
28+
import static org.hamcrest.MatcherAssert.assertThat;
29+
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.sameInstance;
31+
32+
final class OnCompleteErrorPublisherTest {
33+
private final TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
34+
35+
@Test
36+
void errorPassThrough() {
37+
toSource(Publisher.<Integer>failed(DELIBERATE_EXCEPTION)
38+
.onCompleteError(() -> new IllegalStateException("shouldn't get here"))
39+
).subscribe(subscriber);
40+
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
41+
}
42+
43+
@Test
44+
void nullCompletes() {
45+
toSource(Publisher.<Integer>empty()
46+
.onCompleteError(() -> null)
47+
).subscribe(subscriber);
48+
subscriber.awaitOnComplete();
49+
}
50+
51+
@ParameterizedTest(name = "{displayName} [{index}] shouldThrow={0}")
52+
@ValueSource(booleans = {false, true})
53+
void completeToError(boolean shouldThrow) {
54+
final DeliberateException thrown = new DeliberateException();
55+
toSource(from(1)
56+
.onCompleteError(() -> {
57+
if (shouldThrow) {
58+
throw thrown;
59+
}
60+
return DELIBERATE_EXCEPTION;
61+
})
62+
).subscribe(subscriber);
63+
subscriber.awaitSubscription().request(1);
64+
assertThat(subscriber.takeOnNext(), equalTo(1));
65+
assertThat(subscriber.awaitOnError(), shouldThrow ? sameInstance(thrown) : sameInstance(DELIBERATE_EXCEPTION));
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.reactivestreams.tck;
17+
18+
import io.servicetalk.concurrent.api.Publisher;
19+
20+
import org.testng.annotations.Test;
21+
22+
@Test
23+
public class PublisherOnCompleteErrorTckTest extends AbstractPublisherOperatorTckTest<Integer> {
24+
@Override
25+
protected Publisher<Integer> composePublisher(final Publisher<Integer> publisher, final int elements) {
26+
return publisher.onCompleteError(() -> null);
27+
}
28+
}

0 commit comments

Comments
 (0)