X Tutup
Skip to content
This repository was archived by the owner on Nov 18, 2024. It is now read-only.

Commit 73cbaf7

Browse files
committed
4.4 Added doOnRequest examples
1 parent 5ef22e6 commit 73cbaf7

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package itrx.chapter4.backpressure;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
10+
import org.junit.Test;
11+
12+
import rx.Observable;
13+
14+
public class OnRequestTest {
15+
16+
public void exampleOnRequest() {
17+
Observable.range(0, 3)
18+
.doOnRequest(i -> System.out.println("Requested " + i))
19+
.subscribe(System.out::println);
20+
21+
// Requested 9223372036854775807
22+
// 0
23+
// 1
24+
// 2
25+
}
26+
27+
public void exampleOnRequestZip() {
28+
Observable.range(0, 300)
29+
.doOnRequest(i -> System.out.println("Requested " + i))
30+
.zipWith(
31+
Observable.range(10, 300),
32+
(i1, i2) -> i1 + " - " + i2)
33+
.take(300)
34+
.subscribe();
35+
36+
// Requested 128
37+
// Requested 90
38+
// Requested 90
39+
// Requested 90
40+
41+
}
42+
43+
public void exampleOnRequestManual() {
44+
ControlledPullSubscriber<Integer> puller =
45+
new ControlledPullSubscriber<Integer>(System.out::println);
46+
47+
Observable.range(0, 3)
48+
.doOnRequest(i -> System.out.println("Requested " + i))
49+
.subscribe(puller);
50+
51+
puller.requestMore(2);
52+
puller.requestMore(1);
53+
54+
// Requested 0
55+
// Requested 2
56+
// 0
57+
// 1
58+
// Requested 1
59+
// 2
60+
}
61+
62+
63+
//
64+
// Tests
65+
//
66+
67+
@Test
68+
public void testOnRequest() {
69+
List<Long> requests = new ArrayList<Long>();
70+
71+
Observable.range(0, 3)
72+
.doOnRequest(requests::add)
73+
.subscribe();
74+
75+
assertEquals(Arrays.asList(Long.MAX_VALUE), requests);
76+
}
77+
78+
@Test
79+
public void testOnRequestZip() {
80+
List<Long> requests = new ArrayList<Long>();
81+
82+
Observable.range(0, 300)
83+
.doOnRequest(requests::add)
84+
.zipWith(
85+
Observable.range(10, 300),
86+
(i1, i2) -> i1 + " - " + i2)
87+
.take(300)
88+
.subscribe();
89+
90+
assertTrue("zip makes subsequent requests",
91+
requests.size() > 1);
92+
assertEquals("zip uses a buffer of 128",
93+
requests.get(0), new Long(128));
94+
}
95+
96+
@Test
97+
public void testOnRequestManual() {
98+
List<Integer> received = new ArrayList<Integer>();
99+
List<Long> requests = new ArrayList<Long>();
100+
101+
ControlledPullSubscriber<Integer> puller =
102+
new ControlledPullSubscriber<Integer>(received::add);
103+
104+
Observable.range(0, 3)
105+
.doOnRequest(requests::add)
106+
.subscribe(puller);
107+
108+
assertEquals(Arrays.asList(0L), requests);
109+
assertEquals(Arrays.asList(), received);
110+
puller.requestMore(2);
111+
assertEquals(Arrays.asList(0L, 2L), requests);
112+
assertEquals(Arrays.asList(0, 1), received);
113+
puller.requestMore(1);
114+
assertEquals(Arrays.asList(0L, 2L, 1L), requests);
115+
assertEquals(Arrays.asList(0, 1, 2), received);
116+
}
117+
118+
}

0 commit comments

Comments
 (0)
X Tutup