X Tutup
/** * Copyright 2015 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; import org.reactivestreams.*; import io.reactivex.disposables.*; import io.reactivex.functions.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.operators.single.*; import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; /** * Represents a deferred computation and emission of a single value or exception. * * @param the value type */ public class Single { public interface SingleOnSubscribe extends Consumer> { } public interface SingleOperator extends Function, SingleSubscriber> { } public interface SingleSubscriber { void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable e); } public interface SingleTransformer extends Function, Single> { } public static Single amb(Iterable> sources) { return create(s -> { AtomicBoolean once = new AtomicBoolean(); CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); int c = 0; for (Single s1 : sources) { if (once.get()) { return; } s1.subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { set.add(d); } @Override public void onSuccess(T value) { if (once.compareAndSet(false, true)) { s.onSuccess(value); } } @Override public void onError(Throwable e) { if (once.compareAndSet(false, true)) { s.onError(e); } else { RxJavaPlugins.onError(e); } } }); c++; } if (c == 0 && !set.isDisposed()) { s.onError(new NoSuchElementException()); } }); } @SafeVarargs @SuppressWarnings("unchecked") public static Single amb(Single... sources) { if (sources.length == 0) { return error(() -> new NoSuchElementException()); } if (sources.length == 1) { return (Single)sources[0]; } return create(s -> { AtomicBoolean once = new AtomicBoolean(); CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); for (Single s1 : sources) { if (once.get()) { return; } s1.subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { set.add(d); } @Override public void onSuccess(T value) { if (once.compareAndSet(false, true)) { s.onSuccess(value); } } @Override public void onError(Throwable e) { if (once.compareAndSet(false, true)) { s.onError(e); } else { RxJavaPlugins.onError(e); } } }); } }); } public static Observable concat(Iterable> sources) { return concat(Observable.fromIterable(sources)); } public static Observable concat(Observable> sources) { return sources.concatMap(Single::toFlowable); } public static Observable concat( Single s1, Single s2 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); return concat(Observable.fromArray(s1, s2)); } public static Observable concat( Single s1, Single s2, Single s3 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); return concat(Observable.fromArray(s1, s2, s3)); } public static Observable concat( Single s1, Single s2, Single s3, Single s4 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); return concat(Observable.fromArray(s1, s2, s3, s4)); } public static Observable concat( Single s1, Single s2, Single s3, Single s4, Single s5 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); return concat(Observable.fromArray(s1, s2, s3, s4, s5)); } public static Observable concat( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6)); } public static Observable concat( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7)); } public static Observable concat( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Single s8 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); Objects.requireNonNull(s8); return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8)); } public static Observable concat( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Single s8, Single s9 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); Objects.requireNonNull(s8); Objects.requireNonNull(s9); return concat(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9)); } public static Single create(SingleOnSubscribe onSubscribe) { Objects.requireNonNull(onSubscribe); // TODO plugin wrapper return new Single<>(onSubscribe); } public static Single defer(Supplier> singleSupplier) { return create(s -> { Single next; try { next = singleSupplier.get(); } catch (Throwable e) { s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(e); return; } if (next == null) { s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(new NullPointerException("The Single supplied was null")); return; } next.subscribe(s); }); } public static Single error(Supplier errorSupplier) { Objects.requireNonNull(errorSupplier); return create(s -> { Throwable error; try { error = errorSupplier.get(); } catch (Throwable e) { error = e; } if (error == null) { error = new NullPointerException(); } s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(error); }); } public static Single error(Throwable error) { Objects.requireNonNull(error); return error(() -> error); } public static Single fromCallable(Callable callable) { Objects.requireNonNull(callable); return create(s -> { s.onSubscribe(EmptyDisposable.INSTANCE); try { T v = callable.call(); if (v != null) { s.onSuccess(v); } else { s.onError(new NullPointerException()); } } catch (Throwable e) { s.onError(e); } }); } public static Single fromFuture(CompletableFuture future) { Objects.requireNonNull(future); return create(s -> { BooleanDisposable bd = new BooleanDisposable(() -> future.cancel(true)); s.onSubscribe(bd); future.whenComplete((v, e) -> { if (e != null) { if (!bd.isDisposed()) { s.onError(e); } } else { if (v != null) { s.onSuccess(v); } else { s.onError(new NullPointerException()); } } }); }); } public static Single fromFuture(Future future) { return Observable.fromFuture(future).toSingle(); } public static Single fromFuture(Future future, long timeout, TimeUnit unit) { return Observable.fromFuture(future, timeout, unit).toSingle(); } public static Single fromFuture(Future future, long timeout, TimeUnit unit, Scheduler scheduler) { return Observable.fromFuture(future, timeout, unit, scheduler).toSingle(); } public static Single fromFuture(Future future, Scheduler scheduler) { return Observable.fromFuture(future, scheduler).toSingle(); } public static Single fromPublisher(Publisher publisher) { Objects.requireNonNull(publisher); return create(s -> { publisher.subscribe(new Subscriber() { T value; @Override public void onComplete() { T v = value; value = null; if (v != null) { s.onSuccess(v); } else { s.onError(new NoSuchElementException()); } } @Override public void onError(Throwable t) { value = null; s.onError(t); } @Override public void onNext(T t) { value = t; } @Override public void onSubscribe(Subscription inner) { s.onSubscribe(inner::cancel); inner.request(Long.MAX_VALUE); } }); }); } public static Single just(T value) { Objects.requireNonNull(value); return create(s -> { s.onSubscribe(EmptyDisposable.INSTANCE); s.onSuccess(value); }); } public static Observable merge(Iterable> sources) { return merge(Observable.fromIterable(sources)); } public static Observable merge(Observable> sources) { return sources.flatMap(Single::toFlowable); } public static Single merge(Single> source) { return source.flatMap(v -> v); } public static Observable merge( Single s1, Single s2 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); return merge(Observable.fromArray(s1, s2)); } public static Observable merge( Single s1, Single s2, Single s3 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); return merge(Observable.fromArray(s1, s2, s3)); } public static Observable merge( Single s1, Single s2, Single s3, Single s4 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); return merge(Observable.fromArray(s1, s2, s3, s4)); } public static Observable merge( Single s1, Single s2, Single s3, Single s4, Single s5 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); return merge(Observable.fromArray(s1, s2, s3, s4, s5)); } public static Observable merge( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6)); } public static Observable merge( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7)); } public static Observable merge( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Single s8 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); Objects.requireNonNull(s8); return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8)); } public static Observable merge( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Single s8, Single s9 ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); Objects.requireNonNull(s8); Objects.requireNonNull(s9); return merge(Observable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9)); } public static Single never() { return create(s -> { s.onSubscribe(EmptyDisposable.INSTANCE); }); } public static Single timer(long delay, TimeUnit unit) { return timer(delay, unit, Schedulers.computation()); } public static Single timer(long delay, TimeUnit unit, Scheduler scheduler) { Objects.requireNonNull(unit); Objects.requireNonNull(scheduler); return create(s -> { MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); s.onSubscribe(mad); mad.set(scheduler.scheduleDirect(() -> s.onSuccess(0L), delay, unit)); }); } public static Single equals(Single first, Single second) { Objects.requireNonNull(first); Objects.requireNonNull(second); return create(s -> { AtomicInteger count = new AtomicInteger(); Object[] values = { null, null }; CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); class InnerSubscriber implements SingleSubscriber { final int index; public InnerSubscriber(int index) { this.index = index; } @Override public void onSubscribe(Disposable d) { set.add(d); } @Override public void onSuccess(T value) { values[index] = value; if (count.incrementAndGet() == 2) { s.onSuccess(Objects.equals(values[0], values[1])); } } @Override public void onError(Throwable e) { for (;;) { int state = count.get(); if (state >= 2) { RxJavaPlugins.onError(e); return; } if (count.compareAndSet(state, 2)) { s.onError(e); return; } } } } first.subscribe(new InnerSubscriber(0)); second.subscribe(new InnerSubscriber(1)); }); } @SuppressWarnings("unchecked") private static Function toFunction(BiFunction biFunction) { Objects.requireNonNull(biFunction); return a -> { if (a.length != 2) { throw new IllegalArgumentException("Array of size 2 expected but got " + a.length); } return ((BiFunction)biFunction).apply(a[0], a[1]); }; } public static Single using(Supplier resourceSupplier, Function> singleFunction, Consumer disposer) { return using(resourceSupplier, singleFunction, disposer); } public static Single using(Supplier resourceSupplier, Function> singleFunction, Consumer disposer, boolean eager) { Objects.requireNonNull(resourceSupplier); Objects.requireNonNull(singleFunction); Objects.requireNonNull(disposer); return create(s -> { U resource; try { resource = resourceSupplier.get(); } catch (Throwable ex) { s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(ex); return; } Single s1; try { s1 = singleFunction.apply(resource); } catch (Throwable ex) { s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(ex); return; } if (s1 == null) { s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(new NullPointerException("The Single supplied by the function was null")); return; } s1.subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { if (eager) { CompositeDisposable set = new CompositeDisposable(); set.add(d); set.add(() -> { try { disposer.accept(resource); } catch (Throwable e) { RxJavaPlugins.onError(e); } }); } else { s.onSubscribe(d); } } @Override public void onSuccess(T value) { if (eager) { try { disposer.accept(resource); } catch (Throwable e) { s.onError(e); return; } } s.onSuccess(value); if (!eager) { try { disposer.accept(resource); } catch (Throwable e) { RxJavaPlugins.onError(e); } } } @Override public void onError(Throwable e) { if (eager) { try { disposer.accept(resource); } catch (Throwable ex) { e.addSuppressed(ex); } } s.onError(e); if (!eager) { try { disposer.accept(resource); } catch (Throwable ex) { e.addSuppressed(ex); RxJavaPlugins.onError(e); } } } }); }); } public static Observable zip(Iterable> sources, Function zipper) { Iterable> it = () -> { Iterator> sit = sources.iterator(); return new Iterator>() { @Override public boolean hasNext() { return sit.hasNext(); } @SuppressWarnings("unchecked") @Override public Observable next() { return ((Observable)sit.next().toFlowable()); } }; }; return Observable.zipIterable(zipper, false, 1, it); } public static Observable zip( Single s1, Single s2, BiFunction zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); return zipArray(toFunction(zipper), s1, s2); } public static Observable zip( Single s1, Single s2, Single s3, Function3 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); return zipArray(zipper, s1, s2, s3); } public static Observable zip( Single s1, Single s2, Single s3, Single s4, Function4 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); return zipArray(zipper, s1, s2, s3, s4); } public static Observable zip( Single s1, Single s2, Single s3, Single s4, Single s5, Function5 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); return zipArray(zipper, s1, s2, s3, s4, s5); } public static Observable zip( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Function6 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); return zipArray(zipper, s1, s2, s3, s4, s5, s6); } public static Observable zip( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Function7 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); return zipArray(zipper, s1, s2, s3, s4, s5, s6, s7); } public static Observable zip( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Single s8, Function8 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); Objects.requireNonNull(s8); return zipArray(zipper, s1, s2, s3, s4, s5, s6, s7, s8); } public static Observable zip( Single s1, Single s2, Single s3, Single s4, Single s5, Single s6, Single s7, Single s8, Single s9, Function9 zipper ) { Objects.requireNonNull(s1); Objects.requireNonNull(s2); Objects.requireNonNull(s3); Objects.requireNonNull(s4); Objects.requireNonNull(s5); Objects.requireNonNull(s6); Objects.requireNonNull(s7); Objects.requireNonNull(s8); Objects.requireNonNull(s9); return zipArray(zipper, s1, s2, s3, s4, s5, s6, s7, s8, s9); } @SafeVarargs @SuppressWarnings({"rawtypes", "unchecked"}) public static Observable zipArray(Function zipper, Single... sources) { Publisher[] sourcePublishers = new Publisher[sources.length]; int i = 0; for (Single s : sources) { sourcePublishers[i] = s.toFlowable(); i++; } return Observable.zipArray(zipper, false, 1, sourcePublishers); } protected final SingleOnSubscribe onSubscribe; protected Single(SingleOnSubscribe onSubscribe) { this.onSubscribe = onSubscribe; } public final Single ambWith(Single other) { return amb(this, other); } public final Single asSingle() { return create(s -> subscribe(s)); } public final Single compose(Function, ? extends Single> convert) { return to(convert); } public final Single cache() { AtomicInteger wip = new AtomicInteger(); AtomicReference notification = new AtomicReference<>(); List> subscribers = new ArrayList<>(); return create(s -> { Object o = notification.get(); if (o != null) { s.onSubscribe(EmptyDisposable.INSTANCE); if (NotificationLite.isError(o)) { s.onError(NotificationLite.getError(o)); } else { s.onSuccess(NotificationLite.getValue(o)); } return; } synchronized (subscribers) { o = notification.get(); if (o == null) { subscribers.add(s); } } if (o != null) { s.onSubscribe(EmptyDisposable.INSTANCE); if (NotificationLite.isError(o)) { s.onError(NotificationLite.getError(o)); } else { s.onSuccess(NotificationLite.getValue(o)); } return; } if (wip.getAndIncrement() != 0) { return; } subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(T value) { notification.set(NotificationLite.next(value)); List> list; synchronized (subscribers) { list = new ArrayList<>(subscribers); subscribers.clear(); } for (SingleSubscriber s1 : list) { s1.onSuccess(value); } } @Override public void onError(Throwable e) { notification.set(NotificationLite.error(e)); List> list; synchronized (subscribers) { list = new ArrayList<>(subscribers); subscribers.clear(); } for (SingleSubscriber s1 : list) { s1.onError(e); } } }); }); } public final Single cast(Class clazz) { return create(s -> { subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { s.onSubscribe(d); } @Override public void onSuccess(T value) { U v; try { v = clazz.cast(value); } catch (ClassCastException ex) { s.onError(ex); return; } s.onSuccess(v); } @Override public void onError(Throwable e) { s.onError(e); } }); }); } public final Observable concatWith(Single other) { return concat(this, other); } public final Single delay(long time, TimeUnit unit) { return delay(time, unit, Schedulers.computation()); } public final Single delay(long time, TimeUnit unit, Scheduler scheduler) { Objects.requireNonNull(unit); Objects.requireNonNull(scheduler); return create(s -> { MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); s.onSubscribe(mad); subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { mad.set(d); } @Override public void onSuccess(T value) { mad.set(scheduler.scheduleDirect(() -> { s.onSuccess(value); }, time, unit)); } @Override public void onError(Throwable e) { s.onError(e); } }); }); } public final Single doOnSubscribe(Consumer onSubscribe) { return create(s -> { subscribe(new SingleSubscriber() { boolean done; @Override public void onSubscribe(Disposable d) { try { onSubscribe.accept(d); } catch (Throwable ex) { done = true; d.dispose(); s.onSubscribe(EmptyDisposable.INSTANCE); s.onError(ex); return; } s.onSubscribe(d); } @Override public void onSuccess(T value) { if (done) { return; } s.onSuccess(value); } @Override public void onError(Throwable e) { if (done) { RxJavaPlugins.onError(e); return; } s.onError(e); } }); }); } public final Single doOnSuccess(Consumer onSuccess) { return create(s -> { subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { s.onSubscribe(d); } @Override public void onSuccess(T value) { try { onSuccess.accept(value); } catch (Throwable ex) { s.onError(ex); return; } s.onSuccess(value); } @Override public void onError(Throwable e) { s.onError(e); } }); }); } public final Single doOnError(Consumer onError) { return create(s -> { subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { s.onSubscribe(d); } @Override public void onSuccess(T value) { s.onSuccess(value); } @Override public void onError(Throwable e) { try { onError.accept(e); } catch (Throwable ex) { e.addSuppressed(ex); } s.onError(e); } }); }); } public final Single doOnCancel(Runnable onCancel) { return create(s -> { subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { CompositeDisposable set = new CompositeDisposable(); set.add(onCancel::run); set.add(d); s.onSubscribe(set); } @Override public void onSuccess(T value) { s.onSuccess(value); } @Override public void onError(Throwable e) { s.onError(e); } }); }); } public final Single flatMap(Function> mapper) { return lift(new SingleOperatorFlatMap<>(mapper)); } public final Observable flatMapPublisher(Function> mapper) { return toFlowable().flatMap(mapper); } public final T get() { AtomicReference valueRef = new AtomicReference<>(); AtomicReference errorRef = new AtomicReference<>(); CountDownLatch cdl = new CountDownLatch(1); subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { errorRef.lazySet(e); cdl.countDown(); } @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(T value) { valueRef.lazySet(value); cdl.countDown(); } }); if (cdl.getCount() != 0L) { try { cdl.await(); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } Throwable e = errorRef.get(); if (e != null) { throw Exceptions.propagate(e); } return valueRef.get(); } public final Single lift(SingleOperator onLift) { Objects.requireNonNull(onLift); return create(s -> { SingleSubscriber sr = onLift.apply(s); // TODO plugin wrapper onSubscribe.accept(sr); }); } public final Single map(Function mapper) { return lift(new SingleOperatorMap<>(mapper)); } public final Single contains(Object value) { return contains(value, Objects::equals); } public final Single contains(Object value, BiPredicate comparer) { return create(s -> { subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { s.onSubscribe(d); } @Override public void onSuccess(T v) { s.onSuccess(comparer.test(v, value)); } @Override public void onError(Throwable e) { s.onError(e); } }); }); } public final Observable mergeWith(Single other) { return merge(this, other); } public final Single> nest() { return just(this); } public final Single observeOn(Scheduler scheduler) { return create(s -> { CompositeDisposable mad = new CompositeDisposable(); s.onSubscribe(mad); subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { mad.add(scheduler.scheduleDirect(() -> s.onError(e))); } @Override public void onSubscribe(Disposable d) { mad.add(d); } @Override public void onSuccess(T value) { mad.add(scheduler.scheduleDirect(() -> s.onSuccess(value))); } }); }); } public final Single onErrorReturn(Supplier valueSupplier) { return create(s -> { subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { T v; try { v = valueSupplier.get(); } catch (Throwable ex) { e.addSuppressed(ex); s.onError(e); return; } if (v == null) { e.addSuppressed(new NullPointerException("Value supplied was null")); s.onError(e); return; } s.onSuccess(v); } @Override public void onSubscribe(Disposable d) { s.onSubscribe(d); } @Override public void onSuccess(T value) { s.onSuccess(value); } }); }); } public final Single onErrorReturn(T value) { Objects.requireNonNull(value); return onErrorReturn(() -> value); } public final Single onErrorResumeNext(Function> nextFunction) { return create(s -> { MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); s.onSubscribe(mad); subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { mad.set(d); } @Override public void onSuccess(T value) { s.onSuccess(value); } @Override public void onError(Throwable e) { Single next; try { next = nextFunction.apply(e); } catch (Throwable ex) { e.addSuppressed(ex); s.onError(e); return; } if (next == null) { s.onError(new NullPointerException("The next Single supplied was null")); return; } next.subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { mad.set(d); } @Override public void onSuccess(T value) { s.onSuccess(value); } @Override public void onError(Throwable e) { s.onError(e); } }); } }); }); } public final Observable repeat() { return toFlowable().repeat(); } public final Observable repeat(long times) { return toFlowable().repeat(times); } public final Observable repeatWhen(Function, ? extends Publisher> handler) { return toFlowable().repeatWhen(handler); } public final Observable repeatUntil(BooleanSupplier stop) { return toFlowable().repeatUntil(stop); } public final Single retry() { return toFlowable().retry().toSingle(); } public final Single retry(long times) { return toFlowable().retry(times).toSingle(); } public final Single retry(BiPredicate predicate) { return toFlowable().retry(predicate).toSingle(); } public final Single retry(Predicate predicate) { return toFlowable().retry(predicate).toSingle(); } public final Single retryWhen(Function, ? extends Publisher> handler) { return toFlowable().retryWhen(handler).toSingle(); } public final void safeSubscribe(Subscriber s) { toFlowable().safeSubscribe(s); } public final Disposable subscribe() { return subscribe(v -> { }, RxJavaPlugins::onError); } public final Disposable subscribe(BiConsumer onCallback) { Objects.requireNonNull(onCallback); MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { onCallback.accept(null, e); } @Override public void onSubscribe(Disposable d) { mad.set(d); } @Override public void onSuccess(T value) { onCallback.accept(value, null); } }); return mad; } public final Disposable subscribe(Consumer onSuccess) { return subscribe(onSuccess, RxJavaPlugins::onError); } public final Disposable subscribe(Consumer onSuccess, Consumer onError) { Objects.requireNonNull(onSuccess); Objects.requireNonNull(onError); MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { onError.accept(e); } @Override public void onSubscribe(Disposable d) { mad.set(d); } @Override public void onSuccess(T value) { onSuccess.accept(value); } }); return mad; } public final void subscribe(SingleSubscriber subscriber) { Objects.requireNonNull(subscriber); onSubscribe.accept(subscriber); } public final void subscribe(Subscriber s) { toFlowable().subscribe(s); } public final Single subscribeOn(Scheduler scheduler) { Objects.requireNonNull(scheduler); return create(s -> { scheduler.scheduleDirect(() -> { subscribe(s); }); }); } public final Single timeout(long timeout, TimeUnit unit) { Objects.requireNonNull(unit); return timeout0(timeout, unit, Schedulers.computation(), null); } public final Single timeout(long timeout, TimeUnit unit, Scheduler scheduler) { Objects.requireNonNull(unit); Objects.requireNonNull(scheduler); return timeout0(timeout, unit, scheduler, null); } public final Single timeout(long timeout, TimeUnit unit, Scheduler scheduler, Single other) { Objects.requireNonNull(unit); Objects.requireNonNull(scheduler); Objects.requireNonNull(other); return timeout0(timeout, unit, scheduler, other); } public final Single timeout(long timeout, TimeUnit unit, Single other) { Objects.requireNonNull(unit); Objects.requireNonNull(other); return timeout0(timeout, unit, Schedulers.computation(), other); } private Single timeout0(long timeout, TimeUnit unit, Scheduler scheduler, Single other) { return create(s -> { CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); AtomicBoolean once = new AtomicBoolean(); Disposable timer = scheduler.scheduleDirect(() -> { if (once.compareAndSet(false, true)) { if (other != null) { set.clear(); other.subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { set.dispose(); s.onError(e); } @Override public void onSubscribe(Disposable d) { set.add(d); } @Override public void onSuccess(T value) { set.dispose(); s.onSuccess(value); } }); } else { set.dispose(); s.onError(new TimeoutException()); } } }, timeout, unit); set.add(timer); subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { if (once.compareAndSet(false, true)) { set.dispose(); s.onError(e); } } @Override public void onSubscribe(Disposable d) { set.add(d); } @Override public void onSuccess(T value) { if (once.compareAndSet(false, true)) { set.dispose(); s.onSuccess(value); } } }); }); } public final R to(Function, R> convert) { return convert.apply(this); } public final Observable toFlowable() { return Observable.create(s -> { ScalarAsyncSubscription sas = new ScalarAsyncSubscription<>(s); AsyncSubscription as = new AsyncSubscription(); as.setSubscription(sas); s.onSubscribe(as); subscribe(new SingleSubscriber() { @Override public void onError(Throwable e) { s.onError(e); } @Override public void onSubscribe(Disposable d) { as.setResource(d); } @Override public void onSuccess(T value) { sas.setValue(value); } }); }); } public final void unsafeSubscribe(Subscriber s) { toFlowable().unsafeSubscribe(s); } public final Observable zipWith(Single other, BiFunction zipper) { return zip(this, other, zipper); } }
X Tutup