/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import org.reactivestreams.*;
import io.reactivex.disposables.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
/**
* Represents a deferred computation and emission of a single value or exception.
*
* @param the value type
*/
public class Single {
public interface SingleOnSubscribe extends Consumer> {
}
public interface SingleOperator extends Function, SingleSubscriber> {
}
public interface SingleSubscriber {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable e);
}
public interface SingleTransformer extends Function, Single> {
}
public static Single amb(Iterable> sources) {
return create(s -> {
AtomicBoolean once = new AtomicBoolean();
CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);
int c = 0;
for (Single s1 : sources) {
if (once.get()) {
return;
}
s1.subscribe(new SingleSubscriber() {
@Override
public void onSubscribe(Disposable d) {
set.add(d);
}
@Override
public void onSuccess(T value) {
if (once.compareAndSet(false, true)) {
s.onSuccess(value);
}
}
@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
s.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}
});
c++;
}
if (c == 0 && !set.isDisposed()) {
s.onError(new NoSuchElementException());
}
});
}
@SafeVarargs
@SuppressWarnings("unchecked")
public static Single amb(Single... sources) {
if (sources.length == 0) {
return error(() -> new NoSuchElementException());
}
if (sources.length == 1) {
return (Single)sources[0];
}
return create(s -> {
AtomicBoolean once = new AtomicBoolean();
CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);
for (Single s1 : sources) {
if (once.get()) {
return;
}
s1.subscribe(new SingleSubscriber() {
@Override
public void onSubscribe(Disposable d) {
set.add(d);
}
@Override
public void onSuccess(T value) {
if (once.compareAndSet(false, true)) {
s.onSuccess(value);
}
}
@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
s.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}
});
}
});
}
public static Observable concat(Iterable> sources) {
return concat(Observable.fromIterable(sources));
}
public static Observable concat(Observable> sources) {
return sources.concatMap(Single::toFlowable);
}
public static Observable concat(
Single s1, Single s2
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
return concat(Observable.fromArray(s1, s2));
}
public static Observable concat(
Single s1, Single s2,
Single s3
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
return concat(Observable.fromArray(s1, s2, s3));
}
public static Observable concat(
Single s1, Single s2,
Single s3, Single s4
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
return concat(Observable.fromArray(s1, s2, s3, s4));
}
public static Observable concat(
Single s1, Single s2,
Single s3, Single s4,
Single s5
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
return concat(Observable.fromArray(s1, s2, s3, s4, s5));
}
public static Observable concat(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6));
}
public static Observable concat(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6,
Single s7
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
Objects.requireNonNull(s7);
return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7));
}
public static Observable concat(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6,
Single s7, Single s8
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
Objects.requireNonNull(s7);
Objects.requireNonNull(s8);
return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8));
}
public static Observable concat(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6,
Single s7, Single s8,
Single s9
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
Objects.requireNonNull(s7);
Objects.requireNonNull(s8);
Objects.requireNonNull(s9);
return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9));
}
public static Single create(SingleOnSubscribe onSubscribe) {
Objects.requireNonNull(onSubscribe);
// TODO plugin wrapper
return new Single<>(onSubscribe);
}
public static Single defer(Supplier> singleSupplier) {
return create(s -> {
Single next;
try {
next = singleSupplier.get();
} catch (Throwable e) {
s.onSubscribe(EmptyDisposable.INSTANCE);
s.onError(e);
return;
}
if (next == null) {
s.onSubscribe(EmptyDisposable.INSTANCE);
s.onError(new NullPointerException("The Single supplied was null"));
return;
}
next.subscribe(s);
});
}
public static Single error(Supplier errorSupplier) {
Objects.requireNonNull(errorSupplier);
return create(s -> {
Throwable error;
try {
error = errorSupplier.get();
} catch (Throwable e) {
error = e;
}
if (error == null) {
error = new NullPointerException();
}
s.onSubscribe(EmptyDisposable.INSTANCE);
s.onError(error);
});
}
public static Single error(Throwable error) {
Objects.requireNonNull(error);
return error(() -> error);
}
public static Single fromCallable(Callable callable) {
Objects.requireNonNull(callable);
return create(s -> {
s.onSubscribe(EmptyDisposable.INSTANCE);
try {
T v = callable.call();
if (v != null) {
s.onSuccess(v);
} else {
s.onError(new NullPointerException());
}
} catch (Throwable e) {
s.onError(e);
}
});
}
public static Single fromFuture(CompletableFuture future) {
Objects.requireNonNull(future);
return create(s -> {
BooleanDisposable bd = new BooleanDisposable(() -> future.cancel(true));
s.onSubscribe(bd);
future.whenComplete((v, e) -> {
if (e != null) {
if (!bd.isDisposed()) {
s.onError(e);
}
} else {
if (v != null) {
s.onSuccess(v);
} else {
s.onError(new NullPointerException());
}
}
});
});
}
public static Single fromFuture(Future future) {
return Observable.fromFuture(future).toSingle();
}
public static Single fromFuture(Future future, long timeout, TimeUnit unit) {
return Observable.fromFuture(future, timeout, unit).toSingle();
}
public static Single fromFuture(Future future, long timeout, TimeUnit unit, Scheduler scheduler) {
return Observable.fromFuture(future, timeout, unit, scheduler).toSingle();
}
public static Single fromFuture(Future future, Scheduler scheduler) {
return Observable.fromFuture(future, scheduler).toSingle();
}
public static Single fromPublisher(Publisher publisher) {
Objects.requireNonNull(publisher);
return create(s -> {
publisher.subscribe(new Subscriber() {
T value;
@Override
public void onComplete() {
T v = value;
value = null;
if (v != null) {
s.onSuccess(v);
} else {
s.onError(new NoSuchElementException());
}
}
@Override
public void onError(Throwable t) {
value = null;
s.onError(t);
}
@Override
public void onNext(T t) {
value = t;
}
@Override
public void onSubscribe(Subscription inner) {
s.onSubscribe(inner::cancel);
inner.request(Long.MAX_VALUE);
}
});
});
}
public static Single just(T value) {
Objects.requireNonNull(value);
return create(s -> {
s.onSubscribe(EmptyDisposable.INSTANCE);
s.onSuccess(value);
});
}
public static Observable merge(Iterable> sources) {
return merge(Observable.fromIterable(sources));
}
public static Observable merge(Observable> sources) {
return sources.flatMap(Single::toFlowable);
}
public static Single merge(Single> source) {
return source.flatMap(v -> v);
}
public static Observable merge(
Single s1, Single s2
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
return merge(Observable.fromArray(s1, s2));
}
public static Observable merge(
Single s1, Single s2,
Single s3
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
return merge(Observable.fromArray(s1, s2, s3));
}
public static Observable merge(
Single s1, Single s2,
Single s3, Single s4
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
return merge(Observable.fromArray(s1, s2, s3, s4));
}
public static Observable merge(
Single s1, Single s2,
Single s3, Single s4,
Single s5
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
return merge(Observable.fromArray(s1, s2, s3, s4, s5));
}
public static Observable merge(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6));
}
public static Observable merge(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6,
Single s7
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
Objects.requireNonNull(s7);
return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7));
}
public static Observable merge(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6,
Single s7, Single s8
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
Objects.requireNonNull(s7);
Objects.requireNonNull(s8);
return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8));
}
public static Observable merge(
Single s1, Single s2,
Single s3, Single s4,
Single s5, Single s6,
Single s7, Single s8,
Single s9
) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
Objects.requireNonNull(s3);
Objects.requireNonNull(s4);
Objects.requireNonNull(s5);
Objects.requireNonNull(s6);
Objects.requireNonNull(s7);
Objects.requireNonNull(s8);
Objects.requireNonNull(s9);
return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9));
}
public static Single never() {
return create(s -> {
s.onSubscribe(EmptyDisposable.INSTANCE);
});
}
public static Single timer(long delay, TimeUnit unit) {
return timer(delay, unit, Schedulers.computation());
}
public static Single timer(long delay, TimeUnit unit, Scheduler scheduler) {
Objects.requireNonNull(unit);
Objects.requireNonNull(scheduler);
return create(s -> {
MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable();
s.onSubscribe(mad);
mad.set(scheduler.scheduleDirect(() -> s.onSuccess(0L), delay, unit));
});
}
public static Single equals(Single first, Single second) {
Objects.requireNonNull(first);
Objects.requireNonNull(second);
return create(s -> {
AtomicInteger count = new AtomicInteger();
Object[] values = { null, null };
CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);
class InnerSubscriber implements SingleSubscriber {
final int index;
public InnerSubscriber(int index) {
this.index = index;
}
@Override
public void onSubscribe(Disposable d) {
set.add(d);
}
@Override
public void onSuccess(T value) {
values[index] = value;
if (count.incrementAndGet() == 2) {
s.onSuccess(Objects.equals(values[0], values[1]));
}
}
@Override
public void onError(Throwable e) {
for (;;) {
int state = count.get();
if (state >= 2) {
RxJavaPlugins.onError(e);
return;
}
if (count.compareAndSet(state, 2)) {
s.onError(e);
return;
}
}
}
}
first.subscribe(new InnerSubscriber(0));
second.subscribe(new InnerSubscriber(1));
});
}
@SuppressWarnings("unchecked")
private static Function