X Tutup
Skip to content
This repository was archived by the owner on Nov 18, 2024. It is now read-only.

Commit 761c8f6

Browse files
committed
3.4 Added mergeDelayError examples
1 parent 73cbaf7 commit 761c8f6

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package itrx.chapter3.combining;
2+
3+
import static org.hamcrest.CoreMatchers.instanceOf;
4+
import static org.junit.Assert.assertThat;
5+
6+
import java.util.Arrays;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.junit.Test;
10+
11+
import rx.Observable;
12+
import rx.exceptions.CompositeException;
13+
import rx.observers.TestSubscriber;
14+
import rx.schedulers.Schedulers;
15+
import rx.schedulers.TestScheduler;
16+
17+
public class MergeDelayErrorTest {
18+
19+
public void example1Error() {
20+
Observable<Long> failAt200 =
21+
Observable.concat(
22+
Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
23+
Observable.error(new Exception("Failed")));
24+
Observable<Long> completeAt400 =
25+
Observable.interval(100, TimeUnit.MILLISECONDS)
26+
.take(4);
27+
28+
Observable.mergeDelayError(failAt200, completeAt400)
29+
.subscribe(
30+
System.out::println,
31+
System.out::println);
32+
33+
// 0
34+
// 0
35+
// 1
36+
// 1
37+
// 2
38+
// 3
39+
// java.lang.Exception: Failed
40+
}
41+
42+
public void example2Errors() {
43+
Observable<Long> failAt200 =
44+
Observable.concat(
45+
Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
46+
Observable.error(new Exception("Failed")));
47+
Observable<Long> failAt300 =
48+
Observable.concat(
49+
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
50+
Observable.error(new Exception("Failed")));
51+
Observable<Long> completeAt400 =
52+
Observable.interval(100, TimeUnit.MILLISECONDS)
53+
.take(4);
54+
55+
Observable.mergeDelayError(failAt200, failAt300, completeAt400)
56+
.subscribe(
57+
System.out::println,
58+
System.out::println);
59+
60+
// 0
61+
// 0
62+
// 0
63+
// 1
64+
// 1
65+
// 1
66+
// 2
67+
// 2
68+
// 3
69+
// rx.exceptions.CompositeException: 2 exceptions occurred.
70+
}
71+
72+
73+
//
74+
// Tests
75+
//
76+
77+
@Test
78+
public void test1Error() {
79+
TestSubscriber<Long> tester = new TestSubscriber<>();
80+
TestScheduler scheduler = Schedulers.test();
81+
82+
Observable<Long> failAt200 =
83+
Observable.concat(
84+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(2),
85+
Observable.error(new Exception("Failed")));
86+
Observable<Long> completeAt400 =
87+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
88+
.take(4);
89+
90+
Observable.mergeDelayError(failAt200, completeAt400)
91+
.subscribe(tester);
92+
93+
scheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
94+
tester.assertReceivedOnNext(Arrays.asList(0L, 0L, 1L, 1L, 2L, 3L));
95+
assertThat(tester.getOnErrorEvents().get(0), instanceOf(Exception.class));
96+
}
97+
98+
@Test
99+
public void test2Errors() {
100+
TestSubscriber<Long> tester = new TestSubscriber<>();
101+
TestScheduler scheduler = Schedulers.test();
102+
103+
Observable<Long> failAt200 =
104+
Observable.concat(
105+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(2),
106+
Observable.error(new Exception("Failed")));
107+
Observable<Long> failAt300 =
108+
Observable.concat(
109+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(3),
110+
Observable.error(new Exception("Failed")));
111+
Observable<Long> completeAt400 =
112+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
113+
.take(4);
114+
115+
Observable.mergeDelayError(failAt200, failAt300, completeAt400)
116+
.subscribe(tester);
117+
118+
scheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
119+
tester.assertReceivedOnNext(Arrays.asList(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 3L));
120+
assertThat(tester.getOnErrorEvents().get(0), instanceOf(CompositeException.class));
121+
}
122+
123+
}

0 commit comments

Comments
 (0)
X Tutup