X Tutup
Skip to content

Commit 9bd4441

Browse files
committed
Examples 3.6 Hot and cold observables
1 parent 917078c commit 9bd4441

File tree

4 files changed

+495
-0
lines changed

4 files changed

+495
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package itrx.chapter3.hotandcold;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class ColdExample {
14+
15+
public void example() throws InterruptedException {
16+
Observable<Long> cold =
17+
Observable
18+
.interval(200, TimeUnit.MILLISECONDS)
19+
.take(5);
20+
21+
cold.subscribe(i -> System.out.println("First: " + i));
22+
Thread.sleep(500);
23+
cold.subscribe(i -> System.out.println("Second: " + i));
24+
25+
// First: 0
26+
// First: 1
27+
// First: 2
28+
// Second: 0
29+
// First: 3
30+
// Second: 1
31+
// First: 4
32+
// Second: 2
33+
// Second: 3
34+
// Second: 4
35+
}
36+
37+
38+
//
39+
// Test
40+
//
41+
42+
@Test
43+
public void test() {
44+
TestScheduler scheduler = Schedulers.test();
45+
TestSubscriber<Long> tester1 = new TestSubscriber<>();
46+
TestSubscriber<Long> tester2 = new TestSubscriber<>();
47+
48+
Observable<Long> cold =
49+
Observable
50+
.interval(200, TimeUnit.MILLISECONDS, scheduler)
51+
.take(5);
52+
53+
cold.subscribe(tester1);
54+
tester1.assertReceivedOnNext(Arrays.asList());
55+
tester2.assertReceivedOnNext(Arrays.asList());
56+
57+
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
58+
cold.subscribe(tester2);
59+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L));
60+
tester2.assertReceivedOnNext(Arrays.asList());
61+
62+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
63+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
64+
tester2.assertReceivedOnNext(Arrays.asList(0L, 1L));
65+
66+
scheduler.advanceTimeTo(1500, TimeUnit.MILLISECONDS);
67+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
68+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
69+
}
70+
71+
}
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
package itrx.chapter3.hotandcold;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.Subscription;
10+
import rx.observables.ConnectableObservable;
11+
import rx.observers.TestSubscriber;
12+
import rx.schedulers.Schedulers;
13+
import rx.schedulers.TestScheduler;
14+
15+
public class ConnectableObservableTest {
16+
17+
public void exampleConnect() throws InterruptedException {
18+
ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
19+
cold.connect();
20+
21+
cold.subscribe(i -> System.out.println("First: " + i));
22+
Thread.sleep(500);
23+
cold.subscribe(i -> System.out.println("Second: " + i));
24+
25+
// First: 0
26+
// First: 1
27+
// First: 2
28+
// Second: 2
29+
// First: 3
30+
// Second: 3
31+
// First: 4
32+
// Second: 4
33+
// First: 5
34+
// Second: 5
35+
}
36+
37+
public void exampleDisconnect() throws InterruptedException {
38+
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
39+
Subscription s = connectable.connect();
40+
41+
connectable.subscribe(i -> System.out.println(i));
42+
43+
Thread.sleep(1000);
44+
System.out.println("Closing connection");
45+
s.unsubscribe();
46+
47+
Thread.sleep(1000);
48+
System.out.println("Reconnecting");
49+
s = connectable.connect();
50+
51+
// 0
52+
// 1
53+
// 2
54+
// 3
55+
// 4
56+
// Closing connection
57+
// Reconnecting
58+
// 0
59+
// 1
60+
// 2
61+
}
62+
63+
public void exampleUnsubscribe() throws InterruptedException {
64+
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
65+
connectable.connect();
66+
67+
connectable.subscribe(i -> System.out.println("First: " + i));
68+
Thread.sleep(500);
69+
Subscription s2 = connectable.subscribe(i -> System.out.println("Seconds: " + i));
70+
71+
Thread.sleep(500);
72+
System.out.println("Unsubscribing second");
73+
s2.unsubscribe();
74+
75+
// First: 0
76+
// First: 1
77+
// First: 2
78+
// Seconds: 2
79+
// First: 3
80+
// Seconds: 3
81+
// First: 4
82+
// Seconds: 4
83+
// Unsubscribing second
84+
// First: 5
85+
// First: 6
86+
}
87+
88+
public void exampleRefcount() throws InterruptedException {
89+
Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
90+
91+
Subscription s1 = cold.subscribe(i -> System.out.println("First: " + i));
92+
Thread.sleep(500);
93+
Subscription s2 = cold.subscribe(i -> System.out.println("Second: " + i));
94+
Thread.sleep(500);
95+
System.out.println("Unsubscribe first");
96+
s2.unsubscribe();
97+
Thread.sleep(500);
98+
System.out.println("Unsubscribe first");
99+
s1.unsubscribe();
100+
101+
System.out.println("First connection again");
102+
Thread.sleep(500);
103+
s1 = cold.subscribe(i -> System.out.println("First: " + i));
104+
105+
// First: 0
106+
// First: 1
107+
// First: 2
108+
// Second: 2
109+
// First: 3
110+
// Second: 3
111+
// Unsubscribe first
112+
// First: 4
113+
// First: 5
114+
// First: 6
115+
// Unsubscribe first
116+
// First connection again
117+
// First: 0
118+
// First: 1
119+
// First: 2
120+
// First: 3
121+
// First: 4
122+
}
123+
124+
125+
//
126+
// Test
127+
//
128+
129+
@Test
130+
public void testConnect() throws InterruptedException {
131+
TestScheduler scheduler = Schedulers.test();
132+
TestSubscriber<Long> tester1 = new TestSubscriber<Long>();
133+
TestSubscriber<Long> tester2 = new TestSubscriber<Long>();
134+
135+
ConnectableObservable<Long> cold =
136+
Observable
137+
.interval(200, TimeUnit.MILLISECONDS, scheduler)
138+
.publish();
139+
Subscription connection = cold.connect();
140+
141+
cold.subscribe(tester1);
142+
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
143+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L));
144+
tester2.assertReceivedOnNext(Arrays.asList());
145+
146+
cold.subscribe(tester2);
147+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
148+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
149+
tester2.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
150+
151+
connection.unsubscribe();
152+
}
153+
154+
@Test
155+
public void testDisconnect() throws InterruptedException {
156+
TestScheduler scheduler = Schedulers.test();
157+
TestSubscriber<Long> tester = new TestSubscriber<Long>();
158+
159+
ConnectableObservable<Long> connectable =
160+
Observable
161+
.interval(200, TimeUnit.MILLISECONDS, scheduler)
162+
.publish();
163+
Subscription connection = connectable.connect();
164+
connectable.subscribe(tester);
165+
166+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
167+
tester.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
168+
169+
connection.unsubscribe();
170+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
171+
tester.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
172+
173+
connection = connectable.connect();
174+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
175+
tester.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L, 0L, 1L, 2L, 3L, 4L));
176+
177+
connection.unsubscribe();
178+
}
179+
180+
@Test
181+
public void testUnsubscribe() throws InterruptedException {
182+
TestScheduler scheduler = Schedulers.test();
183+
TestSubscriber<Long> tester1 = new TestSubscriber<Long>();
184+
TestSubscriber<Long> tester2 = new TestSubscriber<Long>();
185+
186+
ConnectableObservable<Long> connectable =
187+
Observable
188+
.interval(200, TimeUnit.MILLISECONDS, scheduler)
189+
.publish();
190+
Subscription conSubscription = connectable.connect();
191+
192+
connectable.subscribe(tester1);
193+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
194+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L));
195+
tester2.assertReceivedOnNext(Arrays.asList());
196+
197+
Subscription s2 = connectable.subscribe(tester2);
198+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
199+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
200+
tester2.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
201+
202+
s2.unsubscribe();
203+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
204+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L));
205+
tester2.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
206+
207+
conSubscription.unsubscribe();
208+
}
209+
210+
@Test
211+
public void testRefcount() throws InterruptedException {
212+
TestScheduler scheduler = Schedulers.test();
213+
TestSubscriber<Long> tester1 = new TestSubscriber<Long>();
214+
TestSubscriber<Long> tester2 = new TestSubscriber<Long>();
215+
TestSubscriber<Long> tester3 = new TestSubscriber<Long>();
216+
217+
Observable<Long> cold =
218+
Observable
219+
.interval(200, TimeUnit.MILLISECONDS, scheduler)
220+
.publish()
221+
.refCount();
222+
223+
Subscription s1 = cold.subscribe(tester1);
224+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
225+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L));
226+
tester2.assertReceivedOnNext(Arrays.asList());
227+
tester3.assertReceivedOnNext(Arrays.asList());
228+
229+
Subscription s2 = cold.subscribe(tester2);
230+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
231+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L));
232+
tester2.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
233+
tester3.assertReceivedOnNext(Arrays.asList());
234+
235+
s2.unsubscribe();
236+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
237+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L));
238+
tester2.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
239+
tester3.assertReceivedOnNext(Arrays.asList());
240+
241+
s1.unsubscribe();
242+
Subscription s3 = cold.subscribe(tester3);
243+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
244+
tester1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L));
245+
tester2.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
246+
tester3.assertReceivedOnNext(Arrays.asList(0L, 1L));
247+
248+
s3.unsubscribe();
249+
}
250+
251+
}

0 commit comments

Comments
 (0)
X Tutup