package rx;
import rx.Observable.OnSubscribe;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Func3;
import rx.internal.operators.bi.OperatorBiToMonoMap;
import rx.internal.operators.bi.OperatorDoOnNext;
import rx.internal.operators.bi.OperatorFlip;
import rx.internal.operators.bi.OperatorGenerate;
import rx.internal.operators.bi.OperatorBiMap;
import rx.internal.operators.bi.OperatorScan;
import rx.internal.operators.bi.OperatorTakeLast;
import rx.operators.BiOperator;
import rx.operators.BiToSingleOperator;
import rx.operators.SingleToBiOperator;
public class BiObservable {
private BiOnSubscribe onSubscribeFunc;
/**
* An action used by a {@link BiObservable} to produce data to be consumed by a downstream
* {@link BiSubscriber}.
*
* @param
* type of first argument
* @param
* type of second argument
*/
public static interface BiOnSubscribe extends Action1> {
@Override
public void call(BiSubscriber subscriber);
}
/**
* @param onSubscribeFunc
*/
private BiObservable(BiOnSubscribe onSubscribeFunc) {
this.onSubscribeFunc = onSubscribeFunc;
}
/**
* @param onSubscribe
* @return a {@link BiObservable} wrapping the given {@link BiOnSubscribe} action.
*/
public static BiObservable create(BiOnSubscribe onSubscribe) {
return new BiObservable(onSubscribe);
}
/**
* @param subscriber
*/
public void subcribe(BiSubscriber subscriber) {
onSubscribeFunc.call(subscriber);
}
/**
* Create a new DyadObservable that defers the subscription of {@code this} with a
* {@link BiSubscriber subscriber} that applies the given operator's effect to values produced
* when subscribed to.
*
* @param biOperator
* a function to adapt the types and semantics of the downstream operator.
* @return a new {@link BiObservable} with a {@link BiOnSubscribe onSubscribeFunc} that
* subscribes to {@code this}.
* @see BiObservable#lift(BiToSingleOperator)
*/
public BiObservable lift(final BiOperator biOperator) {
return new BiObservable(new BiOnSubscribe() {
@Override
public void call(BiSubscriber child) {
onSubscribeFunc.call(biOperator.call(child));
}
});
}
/**
* Create a new {@link Observable} that defers the subscription of {@code this} with a
* {@link BiSubscriber subscriber} that applies the given operator's effect to values produced
* when subscribed to. This overload of {@code lift} converts a DyadObservable to a
* single-valued Observable.
*
* @param operator
* a function to adapt the types and semantics of the downstream operator.
* @return a new {@link BiObservable} with a {@link BiOnSubscribe onSubscribeFunc} that
* subscribes to {@code this}.
*/
public Observable lift(final BiToSingleOperator operator) {
return Observable.create(new OnSubscribe() {
@Override
public void call(Subscriber child) {
onSubscribeFunc.call(operator.call(child));
}
});
}
/**
* Create a new {@link Observable} that defers the subscription of {@code obs} with a
* {@link Subscriber subscriber} that applies the given operator's effect to values produced
* when subscribed to. This overload of {@code lift} converts a single-valued Observable to a
* two-valued {@link BiObservable}.
*
* @param obs
* the producer subscribed to.
* @param op
* a function to adapt the types and semantics of the downstream operator to
* {@code obs}.
* @return
*/
public static BiObservable lift(Observable obs, SingleToBiOperator op) {
return new BiObservable(new BiOnSubscribe() {
@Override
public void call(BiSubscriber subscriber) {
obs.unsafeSubscribe(op.call(subscriber));
}
});
}
/**
* Converts an Observable to a BiObservable by applying a function to generate a second value
* based on the values produced by the {@code observable}.
*
* @param observable
* the producer
* @param generatorFunc
* ran once per call made to onNext to produce the paired BiObservable's second
* value
* @return a BiObservable encapsulating the subscription to the given {@code observable}
*/
public static BiObservable generate(final Observable observable, final Func1 generatorFunc) {
return BiObservable.lift(observable, new OperatorGenerate(generatorFunc));
}
/**
* Creates a BiObservable by zipping two observables. Each value produced by the returned
* BiObservable is the pair of each value emitted by the given observables.
*
* @param ob0
* the first observable
* @param ob1
* the second observable
* @return a BiObservable encapsulating the subscription to both observables
*/
public static BiObservable zip(final Observable ob0, final Observable ob1) {
return create(new BiOnSubscribe() {
@Override
public void call(final BiSubscriber child) {
child.add(Observable.zip(ob0, ob1, new Func2() {
@Override
public Void call(T0 t0, T1 t1) {
child.onNext(t0, t1);
return null;
}
}).subscribe(new Observer() {
@Override
public void onCompleted() {
child.onComplete();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Void t) {
}
}));
}
});
}
/**
* @param ob0
* @param ob1
* @return
*/
public static final BiObservable combineLatest(final Observable ob0, final Observable ob1) {
return create(new BiOnSubscribe() {
@Override
public void call(final BiSubscriber child) {
child.add(Observable
.combineLatest(ob0, ob1, new Func2() {
@Override
public Void call(T0 t0, T1 t1) {
child.onNext(t0, t1);
return null;
}
}).subscribe(new Observer() {
@Override
public void onCompleted() {
child.onComplete();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Void t) {
}
}));
}
});
}
/**
* Creates a BiObservable that emits one pair of elements for each combination of the elements
* emitted by the observables passed as parameters. This produces the Cartesian product of all
* emitted elements from two observables.
*
* @param ob0
* @param ob1
* @return a BiObservable that produces the Cartesian product of ob0 and ob1
*/
public static BiObservable product(final Observable ob0, final Observable ob1) {
if (ob1 == Observable.empty() || ob0 == Observable.empty())
return BiObservable.empty();
return create(new BiOnSubscribe() {
@Override
public void call(final BiSubscriber child) {
child.add(ob0.flatMap(new Func1>() {
@Override
public Observable call(final T0 t0) {
return ob1.map(new Func1() {
@Override
public Void call(T1 t1) {
child.onNext(t0, t1);
return null;
}
});
}
}).subscribe(new Observer() {
@Override
public void onCompleted() {
child.onComplete();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Void t) {
}
}));
}
});
}
public static BiObservable empty() {
return BiObservable.create(new BiOnSubscribe(){
@Override
public void call(BiSubscriber subscriber) {
subscriber.onComplete();
}});
}
/**
* Creates a BiObservable from an observable and a non-deterministic-arity generator function.
* This emits a pair of each generatorFunc's output element with the input it was obtained from.
* Note that if the generatorFunc produces an "empty" observable then no pairs will be emitted
* for that input element.
*
* @param ob0
* @param generatorFunc
* @return a BiObservable that
*/
public static BiObservable sparseProduct(final Observable ob0, final Func1> generatorFunc) {
return create(new BiOnSubscribe() {
@Override
public void call(final BiSubscriber subscriber) {
subscriber.add(ob0.flatMap(new Func1>() {
@Override
public Observable call(final T0 t0) {
return generatorFunc.call(t0)
.doOnNext(new Action1() {
@Override
public void call(T1 t1) {
subscriber.onNext(t0, t1);
}
});
}
}).subscribe(new Observer() {
@Override
public void onCompleted() {
subscriber.onComplete();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T1 t) {
}
}));
}
});
}
/**
* Attaches an instance as the paired item to each element emitted from the observable.
*
* @param i0
* @param ob1
* @return
*/
public static BiObservable attach(T0 i0, Observable ob1) {
return product(Observable.just(i0), ob1);
}
/**
* Attaches an instance as the paired item to each element emitted from the observable.
*
* @param ob0
* @param i1
* @return
*/
public static BiObservable attach(Observable ob0, T1 i1) {
return product(ob0, Observable.just(i1));
}
/**
* Returns a BiObservable that applies a specified function to each pair of items emitted by
* the source BiObservable and emits the results of these function applications, replacing the
* second item with the results. This overload accepts a Func2 that will receive both items
* emitted by the BiObservable as arguments.
*
* @param func
* the function used to produce the new value.
* @return a BiObservable which transforms the first item emitted using the specified
* function.
*/
public BiObservable map1(final Func2 func) {
return lift(OperatorBiMap.bi1Operator(func));
}
/**
* Returns a DyadObservable that applies a specified function to each pair of items emitted by
* the source DyadObservable and emits the results of these function applications, replacing the
* second item with the results. This overload accepts a Func1 that will receive the first item
* emitted by the DyadObservable as an argument.
*
* @param func
* the function used to produce the new value.
* @return a DyadObservable which transforms the first item emitted using the specified
* function.
*/
public BiObservable map1(final Func1 func) {
return lift(OperatorBiMap.mono1Operator(func));
}
// for TriObservable we'll need many combinations of flatten
// -> ,
// -> ,
// -> ,
// ->
//
// Quad
// ->
// ->
// ->
// ->
// ->
// ->
// ->
// ->
// ->
// ->
// ->
/**
* Returns a BiObservable that applies a specified function to each pair of items emitted by
* the source BiObservable and emits the results of these function applications, replacing the
* emitted values with the result from the specified function.
*
* @param func
* the function used to produce the new value.
* @return an Observable which transforms the pair of items emitted using the specified
* function.
*/
public Observable bimap(final Func2 func) {
return lift(new OperatorBiToMonoMap(func));
}
/**
* @param action
* @return
*/
public BiObservable doOnNext(final Action2 action) {
return lift(OperatorDoOnNext.biOperator(action));
}
/**
* @param action
* @return
*/
public BiObservable doOnNext1(final Action1 action) {
return lift(OperatorDoOnNext.mono1Operator(action));
}
/**
* @param action
* @return
*/
public BiObservable doOnNext2(final Action1 action) {
return lift(OperatorDoOnNext.mono2Operator(action));
}
/**
* @param seed
* @param func
* @return
*/
public BiObservable scan1(R seed, final Func3 func) {
return lift(OperatorScan.bi1Operator(seed, func));
}
/**
* @param func
* @return
*/
public BiObservable scan1(final Func3 func) {
return lift(OperatorScan.bi1Operator(func));
}
/**
* @param seed
* @param func
* @return
*/
public BiObservable scan2(R seed, final Func3 func) {
return lift(OperatorScan.bi2Operator(seed, func));
}
/**
* @param func
* @return
*/
public BiObservable scan2(final Func3 func) {
return lift(OperatorScan.bi2Operator(func));
}
/**
* @return
*/
public BiObservable takeLast() {
return lift(new OperatorTakeLast());
}
/**
* @param seed
* @param func
* @return
*/
public BiObservable reduce1(R seed, final Func3 func) {
return scan1(seed, func).takeLast();
}
/**
* @param func
* @return
*/
public BiObservable reduce1(final Func3 func) {
return scan1(func).takeLast();
}
/**
* @param seed
* @param func
* @return
*/
public BiObservable reduce2(R seed, final Func3 func) {
return scan2(seed, func).takeLast();
}
/**
* @param func
* @return
*/
public BiObservable reduce2(final Func3 func) {
return scan2(func).takeLast();
}
/**
* Returns a BiObservable that applies a specified function to each pair of items emitted by
* the source BiObservable and emits the results of these function applications, replacing the
* second item with the results. This overload accepts a Func2 that will receive both items
* emitted by the BiObservable as arguments.
*
* @param func
* the function used to produce the new value.
* @return a BiObservable which transforms the second item emitted using the specified
* function.
*/
public BiObservable map2(Func2 func) {
return lift(OperatorBiMap.bi2Operator(func));
}
/**
* Returns a BiObservable that applies a specified function to each pair of items emitted by
* the source BiObservable and emits the results of these function applications, replacing the
* second item with the results. This overload accepts a Func1 that will receive the second item
* emitted by the BiObservable as an argument.
*
* @param func
* the function used to produce the new value.
* @return a BiObservable which transforms the second item emitted using the specified
* function.
*/
public BiObservable map2(final Func1 func) {
return lift(OperatorBiMap.mono2Operator(func));
}
/**
* @param func
* @return
*/
public static Func2 flip(final Func2 func) {
return new Func2() {
@Override
public R call(T1 t1, T0 t0) {
return func.call(t0, t1);
}
};
}
/**
* @return
*/
public BiObservable flip() {
return lift(new OperatorFlip());
}
}