11package itrx .chapter3 .combining ;
22
3- import static org .junit .Assert .* ;
3+ import static org .junit .Assert .assertTrue ;
44
55import java .util .concurrent .TimeUnit ;
66
77import org .junit .Test ;
88
99import rx .Observable ;
10+ import rx .Subscription ;
1011import rx .observers .TestSubscriber ;
1112import rx .schedulers .Schedulers ;
1213import rx .schedulers .TestScheduler ;
1314
1415public class MergeTest {
15-
16+
1617 public void example () {
1718 Observable .merge (
1819 Observable .interval (250 , TimeUnit .MILLISECONDS ).map (i -> "First" ),
@@ -32,6 +33,24 @@ public void example() {
3233 // First
3334 }
3435
36+ public void exampleMergeWith () {
37+ Observable .interval (250 , TimeUnit .MILLISECONDS ).map (i -> "First" )
38+ .mergeWith (Observable .interval (150 , TimeUnit .MILLISECONDS ).map (i -> "Second" ))
39+ .take (10 )
40+ .subscribe (System .out ::println );
41+
42+ // Second
43+ // First
44+ // Second
45+ // Second
46+ // First
47+ // Second
48+ // First
49+ // Second
50+ // Second
51+ // First
52+ }
53+
3554
3655 //
3756 // Test
@@ -40,24 +59,41 @@ public void example() {
4059 @ Test
4160 public void test () {
4261 TestScheduler scheduler = Schedulers .test ();
43- TestSubscriber <Integer > tester = new TestSubscriber <>();
62+ TestSubscriber <String > tester = new TestSubscriber <>();
4463
45- Observable < String > values = Observable .merge (
64+ Subscription subscription = Observable .merge (
4665 Observable .interval (250 , TimeUnit .MILLISECONDS , scheduler ).map (i -> "First" ),
4766 Observable .interval (150 , TimeUnit .MILLISECONDS , scheduler ).map (i -> "Second" ))
48- .take (10 );
49-
50- values
67+ .take (10 )
5168 .distinctUntilChanged ()
52- .count ()
5369 .subscribe (tester );
70+
5471 // Each time that merge switches between the two sources,
5572 // distinctUntilChanged allows one more value through.
5673 // If more that 2 values comes through, merge is going back and forth
74+ scheduler .advanceTimeBy (1000 , TimeUnit .MILLISECONDS );
75+ assertTrue (tester .getOnNextEvents ().size () > 2 );
5776
77+ subscription .unsubscribe ();
78+ }
79+
80+ @ Test
81+ public void testMergeWith () {
82+ TestScheduler scheduler = Schedulers .test ();
83+ TestSubscriber <String > tester = new TestSubscriber <>();
84+
85+ Subscription subscription = Observable .interval (250 , TimeUnit .MILLISECONDS , scheduler ).map (i -> "First" )
86+ .mergeWith (Observable .interval (150 , TimeUnit .MILLISECONDS , scheduler ).map (i -> "Second" ))
87+ .distinctUntilChanged ()
88+ .subscribe (tester );
89+
90+ // Each time that merge switches between the two sources,
91+ // distinctUntilChanged allows one more value through.
92+ // If more that 2 values comes through, merge is going back and forth
5893 scheduler .advanceTimeBy (1000 , TimeUnit .MILLISECONDS );
94+ assertTrue (tester .getOnNextEvents ().size () > 2 );
5995
60- assertTrue ( tester . getOnNextEvents (). get ( 0 ) > 2 );
96+ subscription . unsubscribe ( );
6197 }
6298
6399}
0 commit comments