X Tutup
Skip to content

Commit abf86ec

Browse files
committed
Example 3.3 Advanced error handling
1 parent 3aee7f3 commit abf86ec

File tree

3 files changed

+349
-0
lines changed

3 files changed

+349
-0
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package itrx.chapter3.error;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertThat;
5+
6+
import java.util.Arrays;
7+
8+
import org.junit.Test;
9+
10+
import rx.Observable;
11+
import rx.Subscriber;
12+
import rx.observers.TestSubscriber;
13+
14+
public class ResumeTest {
15+
16+
private static class PrintSubscriber extends Subscriber<Object>{
17+
private final String name;
18+
public PrintSubscriber(String name) {
19+
this.name = name;
20+
}
21+
@Override
22+
public void onCompleted() {
23+
System.out.println(name + ": Completed");
24+
}
25+
@Override
26+
public void onError(Throwable e) {
27+
System.out.println(name + ": Error: " + e);
28+
}
29+
@Override
30+
public void onNext(Object v) {
31+
System.out.println(name + ": " + v);
32+
}
33+
}
34+
35+
public void exampleOnErrorReturn() {
36+
Observable<String> values = Observable.create(o -> {
37+
o.onNext("Rx");
38+
o.onNext("is");
39+
o.onError(new Exception("adjective unknown"));
40+
});
41+
42+
values
43+
.onErrorReturn(e -> "Error: " + e.getMessage())
44+
.subscribe(v -> System.out.println(v));
45+
46+
// Rx
47+
// is
48+
// Error: adjective unknown
49+
}
50+
51+
public void exampleOnErrorResumeNext() {
52+
Observable<Integer> values = Observable.create(o -> {
53+
o.onNext(1);
54+
o.onNext(2);
55+
o.onError(new Exception("Oops"));
56+
});
57+
58+
values
59+
.onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
60+
.subscribe(new PrintSubscriber("with onError: "));
61+
62+
// with onError: 1
63+
// with onError: 2
64+
// with onError: 2147483647
65+
// with onError: Completed
66+
}
67+
68+
public void exampleOnErrorResumeNextRethrow() {
69+
Observable<Integer> values = Observable.create(o -> {
70+
o.onNext(1);
71+
o.onNext(2);
72+
o.onError(new Exception("Oops"));
73+
});
74+
75+
values
76+
.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))
77+
.subscribe(new PrintSubscriber("with onError: "));
78+
}
79+
80+
public void exampleOnExceptionResumeNext() {
81+
Observable<String> values = Observable.create(o -> {
82+
o.onNext("Rx");
83+
o.onNext("is");
84+
o.onError(new Exception()); // this will be caught
85+
});
86+
87+
values
88+
.onExceptionResumeNext(Observable.just("hard"))
89+
.subscribe(v -> System.out.println(v));
90+
}
91+
92+
@SuppressWarnings("serial")
93+
public void exampleOnExceptionResumeNextNoException() {
94+
Observable<String> values = Observable.create(o -> {
95+
o.onNext("Rx");
96+
o.onNext("is");
97+
o.onError(new Throwable() {}); // this won't be caught
98+
});
99+
100+
values
101+
.onExceptionResumeNext(Observable.just("hard"))
102+
.subscribe(v -> System.out.println(v));
103+
}
104+
105+
106+
//
107+
// Tests
108+
//
109+
110+
@Test
111+
public void testOnErrorReturn() {
112+
TestSubscriber<String> tester = new TestSubscriber<>();
113+
114+
Observable<String> values = Observable.create(o -> {
115+
o.onNext("Rx");
116+
o.onNext("is");
117+
o.onError(new Exception("adjective unknown"));
118+
});
119+
120+
values
121+
.onErrorReturn(e -> "Error: " + e.getMessage())
122+
.subscribe(tester);
123+
124+
tester.assertReceivedOnNext(Arrays.asList(
125+
"Rx",
126+
"is",
127+
"Error: adjective unknown"));
128+
tester.assertTerminalEvent();
129+
tester.assertNoErrors();
130+
}
131+
132+
@Test
133+
public void testerOnErrorResumeNext() {
134+
TestSubscriber<Integer> tester = new TestSubscriber<>();
135+
136+
Observable<Integer> values = Observable.create(o -> {
137+
o.onNext(1);
138+
o.onNext(2);
139+
o.onError(new Exception("Oops"));
140+
});
141+
142+
values
143+
.onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
144+
.subscribe(tester);
145+
146+
tester.assertReceivedOnNext(Arrays.asList(1,2,Integer.MAX_VALUE));
147+
tester.assertTerminalEvent();
148+
tester.assertNoErrors();
149+
}
150+
151+
@Test
152+
public void testOnErrorResumeNextRethrow() {
153+
TestSubscriber<Integer> tester = new TestSubscriber<>();
154+
155+
Observable<Integer> values = Observable.create(o -> {
156+
o.onNext(1);
157+
o.onNext(2);
158+
o.onError(new Exception("Oops"));
159+
});
160+
161+
values
162+
.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))
163+
.subscribe(tester);
164+
165+
tester.assertReceivedOnNext(Arrays.asList(1,2));
166+
tester.assertTerminalEvent();
167+
assertThat(tester.getOnErrorEvents().get(0),
168+
org.hamcrest.CoreMatchers.instanceOf(UnsupportedOperationException.class));
169+
}
170+
171+
@Test
172+
public void testOnExceptionResumeNext() {
173+
TestSubscriber<String> tester = new TestSubscriber<>();
174+
175+
Observable<String> values = Observable.create(o -> {
176+
o.onNext("Rx");
177+
o.onNext("is");
178+
o.onError(new Exception()); // this will be caught
179+
});
180+
181+
values
182+
.onExceptionResumeNext(Observable.just("hard"))
183+
.subscribe(tester);
184+
185+
tester.assertReceivedOnNext(Arrays.asList("Rx","is","hard"));
186+
tester.assertTerminalEvent();
187+
tester.assertNoErrors();
188+
}
189+
190+
@SuppressWarnings("serial")
191+
@Test
192+
public void testOnExceptionResumeNextNoException() {
193+
TestSubscriber<String> tester = new TestSubscriber<>();
194+
195+
Observable<String> values = Observable.create(o -> {
196+
o.onNext("Rx");
197+
o.onNext("is");
198+
o.onError(new Throwable() {}); // this won't be caught
199+
});
200+
201+
values
202+
.onExceptionResumeNext(Observable.just("hard"))
203+
.subscribe(tester);
204+
205+
tester.assertTerminalEvent();
206+
assertEquals(tester.getOnErrorEvents().size(), 1);
207+
}
208+
209+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package itrx.chapter3.error;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.Random;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.observers.TestSubscriber;
11+
12+
public class RetryTest {
13+
14+
public void exampleRetry() {
15+
Random random = new Random();
16+
Observable<Integer> values = Observable.create(o -> {
17+
o.onNext(random.nextInt() % 20);
18+
o.onNext(random.nextInt() % 20);
19+
o.onError(new Exception());
20+
});
21+
22+
values
23+
.retry(1)
24+
.subscribe(v -> System.out.println(v));
25+
26+
// 0
27+
// 13
28+
// 9
29+
// 15
30+
// java.lang.Exception
31+
}
32+
33+
34+
//
35+
// Test
36+
//
37+
38+
@Test
39+
public void testRetry() {
40+
TestSubscriber<Integer> tester = new TestSubscriber<>();
41+
Random random = new Random();
42+
Observable<Integer> values = Observable.create(o -> {
43+
o.onNext(random.nextInt() % 20);
44+
o.onNext(random.nextInt() % 20);
45+
o.onError(new Exception());
46+
});
47+
48+
values
49+
.retry(1)
50+
.subscribe(tester);
51+
52+
assertEquals(tester.getOnNextEvents().size(), 4);
53+
tester.assertTerminalEvent();
54+
assertEquals(tester.getOnErrorEvents().size(), 1);
55+
}
56+
57+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package itrx.chapter3.error;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.Arrays;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.observers.TestSubscriber;
11+
12+
public class UsingTest {
13+
14+
public void exampleUsing() {
15+
Observable<Character> values = Observable.using(
16+
() -> {
17+
String resource = "MyResource";
18+
System.out.println("Leased: " + resource);
19+
return resource;
20+
},
21+
(resource) -> {
22+
return Observable.create(o -> {
23+
for (Character c : resource.toCharArray())
24+
o.onNext(c);
25+
o.onCompleted();
26+
});
27+
},
28+
(resource) -> System.out.println("Disposed: " + resource));
29+
30+
values
31+
.subscribe(
32+
v -> System.out.println(v),
33+
e -> System.out.println(e));
34+
35+
// Leased: MyResource
36+
// M
37+
// y
38+
// R
39+
// e
40+
// s
41+
// o
42+
// u
43+
// r
44+
// c
45+
// e
46+
// Disposed: MyResource
47+
}
48+
49+
50+
//
51+
// Test
52+
//
53+
54+
@Test
55+
public void testUsing() {
56+
TestSubscriber<Character> tester = new TestSubscriber<>();
57+
String[] leaseRelease = {"", ""};
58+
59+
Observable<Character> values = Observable.using(
60+
() -> {
61+
String resource = "MyResource";
62+
leaseRelease[0] = resource;
63+
return resource;
64+
},
65+
(resource) -> {
66+
return Observable.create(o -> {
67+
for (Character c : resource.toCharArray())
68+
o.onNext(c);
69+
o.onCompleted();
70+
});
71+
},
72+
(resource) -> leaseRelease[1] = resource);
73+
74+
values
75+
.subscribe(tester);
76+
77+
assertEquals(leaseRelease[0], leaseRelease[1]);
78+
tester.assertReceivedOnNext(Arrays.asList('M','y','R','e','s','o','u','r','c','e'));
79+
tester.assertTerminalEvent();
80+
tester.assertNoErrors();
81+
}
82+
83+
}

0 commit comments

Comments
 (0)
X Tutup