X Tutup
Skip to content

Commit 5ef22e6

Browse files
committed
3.6 Added cache examples
1 parent 6102960 commit 5ef22e6

File tree

1 file changed

+114
-0
lines changed

1 file changed

+114
-0
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package itrx.chapter3.hotandcold;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.Subscription;
10+
import rx.observers.TestSubscriber;
11+
import rx.schedulers.Schedulers;
12+
import rx.schedulers.TestScheduler;
13+
14+
public class CacheTest {
15+
16+
public void exampleCache() throws InterruptedException {
17+
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
18+
.take(5)
19+
.cache();
20+
21+
Thread.sleep(500);
22+
obs.subscribe(i -> System.out.println("First: " + i));
23+
Thread.sleep(300);
24+
obs.subscribe(i -> System.out.println("Second: " + i));
25+
26+
// First: 0
27+
// First: 1
28+
// First: 2
29+
// Second: 0
30+
// Second: 1
31+
// Second: 2
32+
// First: 3
33+
// Second: 3
34+
// First: 4
35+
// Second: 4
36+
}
37+
38+
public void exampleCacheUnsubscribe() throws InterruptedException {
39+
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
40+
.take(5)
41+
.doOnNext(System.out::println)
42+
.cache()
43+
.doOnSubscribe(() -> System.out.println("Subscribed"))
44+
.doOnUnsubscribe(() -> System.out.println("Unsubscribed"));
45+
46+
Subscription subscription = obs.subscribe();
47+
Thread.sleep(150);
48+
subscription.unsubscribe();
49+
50+
// Subscribed
51+
// 0
52+
// Unsubscribed
53+
// 1
54+
// 2
55+
// 3
56+
// 4
57+
}
58+
59+
60+
//
61+
// Tests
62+
//
63+
64+
@Test
65+
public void testCache() throws InterruptedException {
66+
TestSubscriber<Long> tester1 = new TestSubscriber<Long>();
67+
TestSubscriber<Long> tester2 = new TestSubscriber<Long>();
68+
TestScheduler scheduler = Schedulers.test();
69+
70+
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
71+
.take(5)
72+
.cache();
73+
74+
tester1.assertReceivedOnNext(Arrays.asList());
75+
tester2.assertReceivedOnNext(Arrays.asList());
76+
77+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
78+
obs.subscribe(tester1);
79+
tester1.assertReceivedOnNext(Arrays.asList());
80+
tester2.assertReceivedOnNext(Arrays.asList());
81+
82+
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
83+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
84+
tester2.assertReceivedOnNext(Arrays.asList());
85+
86+
obs.subscribe(tester2);
87+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
88+
tester2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
89+
90+
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
91+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
92+
tester2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
93+
}
94+
95+
@Test
96+
public void testCacheUnsubscribe() throws InterruptedException {
97+
TestSubscriber<Long> tester = new TestSubscriber<Long>();
98+
TestScheduler scheduler = Schedulers.test();
99+
100+
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
101+
.take(5)
102+
.doOnEach(tester)
103+
.cache();
104+
105+
Subscription subscription = obs.subscribe();
106+
scheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS);
107+
tester.assertReceivedOnNext(Arrays.asList(0L));
108+
109+
subscription.unsubscribe();
110+
scheduler.advanceTimeBy(350, TimeUnit.MILLISECONDS);
111+
tester.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
112+
}
113+
114+
}

0 commit comments

Comments
 (0)
X Tutup