X Tutup
Skip to content

Commit 06a4dca

Browse files
committed
3.4 Added mergeWith examples
1 parent 761c8f6 commit 06a4dca

File tree

1 file changed

+45
-9
lines changed

1 file changed

+45
-9
lines changed

tests/java/itrx/chapter3/combining/MergeTest.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package itrx.chapter3.combining;
22

3-
import static org.junit.Assert.*;
3+
import static org.junit.Assert.assertTrue;
44

55
import java.util.concurrent.TimeUnit;
66

77
import org.junit.Test;
88

99
import rx.Observable;
10+
import rx.Subscription;
1011
import rx.observers.TestSubscriber;
1112
import rx.schedulers.Schedulers;
1213
import rx.schedulers.TestScheduler;
1314

1415
public class MergeTest {
15-
16+
1617
public void example() {
1718
Observable.merge(
1819
Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First"),
@@ -32,6 +33,24 @@ public void example() {
3233
// First
3334
}
3435

36+
public void exampleMergeWith() {
37+
Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First")
38+
.mergeWith(Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
39+
.take(10)
40+
.subscribe(System.out::println);
41+
42+
// Second
43+
// First
44+
// Second
45+
// Second
46+
// First
47+
// Second
48+
// First
49+
// Second
50+
// Second
51+
// First
52+
}
53+
3554

3655
//
3756
// Test
@@ -40,24 +59,41 @@ public void example() {
4059
@Test
4160
public void test() {
4261
TestScheduler scheduler = Schedulers.test();
43-
TestSubscriber<Integer> tester = new TestSubscriber<>();
62+
TestSubscriber<String> tester = new TestSubscriber<>();
4463

45-
Observable<String> values = Observable.merge(
64+
Subscription subscription = Observable.merge(
4665
Observable.interval(250, TimeUnit.MILLISECONDS, scheduler).map(i -> "First"),
4766
Observable.interval(150, TimeUnit.MILLISECONDS, scheduler).map(i -> "Second"))
48-
.take(10);
49-
50-
values
67+
.take(10)
5168
.distinctUntilChanged()
52-
.count()
5369
.subscribe(tester);
70+
5471
// Each time that merge switches between the two sources,
5572
// distinctUntilChanged allows one more value through.
5673
// If more that 2 values comes through, merge is going back and forth
74+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
75+
assertTrue(tester.getOnNextEvents().size() > 2);
5776

77+
subscription.unsubscribe();
78+
}
79+
80+
@Test
81+
public void testMergeWith() {
82+
TestScheduler scheduler = Schedulers.test();
83+
TestSubscriber<String> tester = new TestSubscriber<>();
84+
85+
Subscription subscription = Observable.interval(250, TimeUnit.MILLISECONDS, scheduler).map(i -> "First")
86+
.mergeWith(Observable.interval(150, TimeUnit.MILLISECONDS, scheduler).map(i -> "Second"))
87+
.distinctUntilChanged()
88+
.subscribe(tester);
89+
90+
// Each time that merge switches between the two sources,
91+
// distinctUntilChanged allows one more value through.
92+
// If more that 2 values comes through, merge is going back and forth
5893
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
94+
assertTrue(tester.getOnNextEvents().size() > 2);
5995

60-
assertTrue(tester.getOnNextEvents().get(0) > 2);
96+
subscription.unsubscribe();
6197
}
6298

6399
}

0 commit comments

Comments
 (0)
X Tutup