X Tutup
Skip to content

Commit 34a0d4b

Browse files
committed
Examples 3.4 combining sequences
1 parent abf86ec commit 34a0d4b

File tree

8 files changed

+626
-0
lines changed

8 files changed

+626
-0
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package itrx.chapter3.combining;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class AmbTest {
14+
15+
public void example() {
16+
Observable.amb(
17+
Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First"),
18+
Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
19+
.subscribe(System.out::println);
20+
21+
// Second
22+
}
23+
24+
25+
//
26+
// Test
27+
//
28+
29+
@Test
30+
public void test() {
31+
TestSubscriber<String> tester = new TestSubscriber<>();
32+
TestScheduler scheduler = Schedulers.test();
33+
34+
Observable.amb(
35+
Observable.timer(100, TimeUnit.MILLISECONDS, scheduler).map(i -> "First"),
36+
Observable.timer(50, TimeUnit.MILLISECONDS, scheduler).map(i -> "Second"))
37+
.subscribe(tester);
38+
39+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
40+
tester.assertReceivedOnNext(Arrays.asList("Second"));
41+
tester.assertTerminalEvent();
42+
tester.assertNoErrors();
43+
}
44+
45+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package itrx.chapter3.combining;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class CombineLatestTest {
14+
15+
public void example() {
16+
Observable.combineLatest(
17+
Observable.interval(100, TimeUnit.MILLISECONDS)
18+
.doOnNext(i -> System.out.println("Left emits")),
19+
Observable.interval(150, TimeUnit.MILLISECONDS)
20+
.doOnNext(i -> System.out.println("Right emits")),
21+
(i1,i2) -> i1 + " - " + i2
22+
)
23+
.take(6)
24+
.subscribe(System.out::println);
25+
26+
// Left emits
27+
// Right emits
28+
// 0 - 0
29+
// Left emits
30+
// 1 - 0
31+
// Left emits
32+
// 2 - 0
33+
// Right emits
34+
// 2 - 1
35+
// Left emits
36+
// 3 - 1
37+
// Right emits
38+
// 3 - 2
39+
}
40+
41+
42+
//
43+
// Test
44+
//
45+
46+
@Test
47+
public void test() {
48+
TestScheduler scheduler = Schedulers.test();
49+
TestSubscriber<String> tester = new TestSubscriber<>();
50+
51+
Observable.combineLatest(
52+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler),
53+
Observable.interval(150, TimeUnit.MILLISECONDS, scheduler),
54+
(i1,i2) -> i1 + " - " + i2
55+
)
56+
.subscribe(tester);
57+
58+
scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
59+
scheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
60+
scheduler.advanceTimeTo(200, TimeUnit.MILLISECONDS);
61+
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
62+
63+
tester.assertReceivedOnNext(Arrays.asList(
64+
"0 - 0",
65+
"1 - 0",
66+
"1 - 1",
67+
"2 - 1"
68+
));
69+
}
70+
71+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package itrx.chapter3.combining;
2+
3+
import java.util.Arrays;
4+
5+
import org.junit.Test;
6+
7+
import rx.Observable;
8+
import rx.observers.TestSubscriber;
9+
10+
public class ConcatTest {
11+
12+
public void exampleConcat() {
13+
Observable<Integer> seq1 = Observable.range(0, 3);
14+
Observable<Integer> seq2 = Observable.range(10, 3);
15+
16+
Observable.concat(seq1, seq2)
17+
.subscribe(System.out::println);
18+
19+
// 0
20+
// 1
21+
// 2
22+
// 10
23+
// 11
24+
// 12
25+
}
26+
27+
public void exampleConcatDynamic() {
28+
Observable<String> words = Observable.just(
29+
"First",
30+
"Second",
31+
"Third",
32+
"Fourth",
33+
"Fifth",
34+
"Sixth"
35+
);
36+
37+
Observable.concat(words.groupBy(v -> v.charAt(0)))
38+
.subscribe(System.out::println);
39+
40+
// First
41+
// Fourth
42+
// Fifth
43+
// Second
44+
// Sixth
45+
// Third
46+
}
47+
48+
49+
//
50+
// Tests
51+
//
52+
53+
@Test
54+
public void testConcat() {
55+
TestSubscriber<Integer> tester = new TestSubscriber<>();
56+
57+
Observable<Integer> seq1 = Observable.range(0, 3);
58+
Observable<Integer> seq2 = Observable.range(10, 3);
59+
60+
Observable.concat(seq1, seq2)
61+
.subscribe(tester);
62+
63+
tester.assertReceivedOnNext(Arrays.asList(0,1,2,10,11,12));
64+
tester.assertTerminalEvent();
65+
tester.assertNoErrors();
66+
}
67+
68+
@Test
69+
public void testConcatDynamic() {
70+
TestSubscriber<String> tester = new TestSubscriber<>();
71+
72+
Observable<String> words = Observable.just(
73+
"First",
74+
"Second",
75+
"Third",
76+
"Fourth",
77+
"Fifth",
78+
"Sixth"
79+
);
80+
81+
Observable.concat(words.groupBy(v -> v.charAt(0)))
82+
.subscribe(tester);
83+
84+
tester.assertReceivedOnNext(Arrays.asList(
85+
"First",
86+
"Fourth",
87+
"Fifth",
88+
"Second",
89+
"Sixth",
90+
"Third"));
91+
tester.assertTerminalEvent();
92+
tester.assertNoErrors();
93+
}
94+
95+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package itrx.chapter3.combining;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.observers.TestSubscriber;
11+
import rx.schedulers.Schedulers;
12+
import rx.schedulers.TestScheduler;
13+
14+
public class MergeTest {
15+
16+
public void example() {
17+
Observable.merge(
18+
Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First"),
19+
Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
20+
.take(10)
21+
.subscribe(System.out::println);
22+
23+
// Second
24+
// First
25+
// Second
26+
// Second
27+
// First
28+
// Second
29+
// Second
30+
// First
31+
// Second
32+
// First
33+
}
34+
35+
36+
//
37+
// Test
38+
//
39+
40+
@Test
41+
public void test() {
42+
TestScheduler scheduler = Schedulers.test();
43+
TestSubscriber<Integer> tester = new TestSubscriber<>();
44+
45+
Observable<String> values = Observable.merge(
46+
Observable.interval(250, TimeUnit.MILLISECONDS, scheduler).map(i -> "First"),
47+
Observable.interval(150, TimeUnit.MILLISECONDS, scheduler).map(i -> "Second"))
48+
.take(10);
49+
50+
values
51+
.distinctUntilChanged()
52+
.count()
53+
.subscribe(tester);
54+
// Each time that merge switches between the two sources,
55+
// distinctUntilChanged allows one more value through.
56+
// If more that 2 values comes through, merge is going back and forth
57+
58+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
59+
60+
assertTrue(tester.getOnNextEvents().get(0) > 2);
61+
}
62+
63+
}

0 commit comments

Comments
 (0)
X Tutup