|
| 1 | +# Sequences of coincidence |
| 2 | + |
| 3 | +Rx tries to avoid state outside of the pipeline. However, some things are inherently stateful. A server can be up or down, a mobile device may have access to wifi, a button is held down. In Rx, we see those as events with a duration. We call them windows. Other events that happen within those windows may need to be treated differently. For example, a mobile device will postpone network requests with low priority while using more expensive channels of communication. |
| 4 | + |
| 5 | +## Window |
| 6 | + |
| 7 | +With [buffer](https://github.com/Froussios/New-Intro-To-Rx/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md#buffer) we saw an operator that can take a sequence and group values into chunks, based on a variety of overloads. The `window` operator has a one for one relationship with `buffer`. The main difference is that it doesn't return the groups in buffered chunks. Instead it returns a sequence of sequences, each sequence corresponding to what would have been a buffer. This means that every emitted observable emits its values as soon as they appear in the source observable, rather than producing them all at the end of the window. That relationship between `buffer` and `window` is immediately apparent by a quick look on the marble diagrams of two corresponding overloads: |
| 8 | + |
| 9 | + |
| 10 | +With `window` this becomes: |
| 11 | + |
| 12 | + |
| 13 | +If you are not already familiar with `buffer`, I strongly recommend that you begin with that. The overloads and resulting groupings are the same in both operators, but `buffer` is easier to understand and present examples for. Every `buffer` overload can be contructed from the `window` overload with the same arguments as such: |
| 14 | +```java |
| 15 | +source.buffer(...) |
| 16 | +// same as |
| 17 | +source.window(...).flatMap(w -> w.toList()) |
| 18 | +``` |
| 19 | + |
| 20 | +### Window by count |
| 21 | + |
| 22 | +You can have windows with a fixed number of elements. Once the window has emitted the required number of elements, the observable terminates and a new one starts. |
| 23 | + |
| 24 | + |
| 25 | + |
| 26 | +You can also have skipping and overlapping windows like you do in `buffer` with `window(int count, int skip)`. When windows overlap they will be emitting values simultaneously, as can be seen in the next example. |
| 27 | + |
| 28 | +```java |
| 29 | +Observable |
| 30 | + .merge( |
| 31 | + Observable.range(0, 5) |
| 32 | + .window(3,1)) |
| 33 | + .subscribe(new PrintSubscriber("Window")); |
| 34 | +``` |
| 35 | +Output |
| 36 | +``` |
| 37 | +Window: 0 |
| 38 | +Window: 1 |
| 39 | +Window: 1 |
| 40 | +Window: 2 |
| 41 | +Window: 2 |
| 42 | +Window: 2 |
| 43 | +Window: 3 |
| 44 | +Window: 3 |
| 45 | +Window: 3 |
| 46 | +Window: 4 |
| 47 | +Window: 4 |
| 48 | +Window: 4 |
| 49 | +Window: Completed |
| 50 | +``` |
| 51 | + |
| 52 | +We can see here that the inner observables are emitting the same value simultaneously, if that value belongs to multiple windows. |
| 53 | + |
| 54 | + |
| 55 | +### Window by time |
| 56 | + |
| 57 | +Rather than having windows of fixed size, you can have windows of a fixed duration in time. |
| 58 | + |
| 59 | + |
| 60 | +You can contruct windows that overlap or skip elements with `public final Observable<Observable<T>> window(long timespan, long timeshift, java.util.concurrent.TimeUnit unit)`, just like you would with `buffer`. |
| 61 | + |
| 62 | +### Window with signal |
| 63 | + |
| 64 | +Lastly, you can define windows using another observable. Every time your signaling observable emits a value, the old window closes and a new one starts. |
| 65 | + |
| 66 | +. |
| 67 | + |
| 68 | +Alternatively, to have overlapping windows, you can provide a function that uses the values emitted by your signaling observable to contruct another observable that will signal the closing of the window. When the observable corresponding to a window terminates, the window closes. |
| 69 | + |
| 70 | + |
| 71 | + |
| 72 | + |
| 73 | +## Join |
| 74 | + |
| 75 | +`join` allow you to pair items from two sequences together. We've already seen `zip`, which pairs value based on their index. `join` allows you to pair values based on durations. Let's see the signature first: |
| 76 | + |
| 77 | +```java |
| 78 | +public final <TRight,TLeftDuration,TRightDuration,R> Observable<R> join( |
| 79 | + Observable<TRight> right, |
| 80 | + Func1<T,Observable<TLeftDuration>> leftDurationSelector, |
| 81 | + Func1<TRight,Observable<TRightDuration>> rightDurationSelector, |
| 82 | + Func2<T,TRight,R> resultSelector) |
| 83 | +``` |
| 84 | + |
| 85 | +`join` combines two sequences, called "left" and "right". The method is not static and the left sequence is the one that `join` is being called on. In the signature, we can see two methods called `leftDurationSelector` and `rightDurationSelector` which take as an argument an item of the left and right sequence respectively. They return an observable that defines a duration (i.e. a window), just like in the last overload of `window`. These windows are used to select values to be paired together. Values that are paired are passed to the `resultSelector` function which will combine them into a single value, like `zip` does. That value will be emitted by `join`. |
| 86 | + |
| 87 | +The thing that makes `join` powerful, but also complicated to grasp, is how values are selected to be paired. Every value that arrives in a sequence begins a window for itself. The corresponding duration selector decides when the window for each value will terminate. While the window is open, any value arriving in the opposite sequence will be paired with it. The process is symmetrical, so let's just consider a case where the items of only one sequence have windows. |
| 88 | + |
| 89 | +In the first example, the windows in the left sequence never close, while the windows in the right sequence are 0. |
| 90 | + |
| 91 | + |
| 92 | + |
| 93 | +```java |
| 94 | +Observable<String> left = |
| 95 | + Observable.interval(100, TimeUnit.MILLISECONDS) |
| 96 | + .map(i -> "L" + i); |
| 97 | +Observable<String> right = |
| 98 | + Observable.interval(200, TimeUnit.MILLISECONDS) |
| 99 | + .map(i -> "R" + i); |
| 100 | + |
| 101 | +left |
| 102 | + .join( |
| 103 | + right, |
| 104 | + i -> Observable.never(), |
| 105 | + i -> Observable.timer(0, TimeUnit.MILLISECONDS), |
| 106 | + (l,r) -> l + " - " + r |
| 107 | + ) |
| 108 | + .take(10) |
| 109 | + .subscribe(System.out::println); |
| 110 | +``` |
| 111 | +Output |
| 112 | +``` |
| 113 | +L0 - R0 |
| 114 | +L1 - R0 |
| 115 | +L0 - R1 |
| 116 | +L1 - R1 |
| 117 | +L2 - R1 |
| 118 | +L3 - R1 |
| 119 | +L0 - R2 |
| 120 | +L1 - R2 |
| 121 | +L2 - R2 |
| 122 | +L3 - R2 |
| 123 | +``` |
| 124 | + |
| 125 | +When a window for a left value never ends, what that means is every value emitted by the right sequence for here on will be paired with the left value. Because here the right sequence has half the frequence of the left sequence, between two right values, two more windows have opened on the left. The first right value is paired with the first 2 left values, the second right value is paired with the first 4 left values, the third with 6 and so on. |
| 126 | + |
| 127 | +Lets change the example and see what happens when left and right emit every 100ms and left windows close after 150ms. What happens then is that every left window remains open long enough to catch two right values: one that is emitted at the same time and another after 100ms. |
| 128 | + |
| 129 | +```java |
| 130 | +Observable<String> left = |
| 131 | + Observable.interval(100, TimeUnit.MILLISECONDS) |
| 132 | + .map(i -> "L" + i); |
| 133 | +Observable<String> right = |
| 134 | + Observable.interval(100, TimeUnit.MILLISECONDS) |
| 135 | + .map(i -> "R" + i); |
| 136 | + |
| 137 | +left |
| 138 | + .join( |
| 139 | + right, |
| 140 | + i -> Observable.timer(150, TimeUnit.MILLISECONDS), |
| 141 | + i -> Observable.timer(0, TimeUnit.MILLISECONDS), |
| 142 | + (l,r) -> l + " - " + r |
| 143 | + ) |
| 144 | + .take(10) |
| 145 | + .subscribe(System.out::println); |
| 146 | +``` |
| 147 | +Output |
| 148 | +``` |
| 149 | +L0 - R0 |
| 150 | +L0 - R1 |
| 151 | +L1 - R1 |
| 152 | +L1 - R2 |
| 153 | +L2 - R2 |
| 154 | +L2 - R3 |
| 155 | +L3 - R3 |
| 156 | +L3 - R4 |
| 157 | +L4 - R4 |
| 158 | +L4 - R5 |
| 159 | +``` |
| 160 | + |
| 161 | +Both sequences have windows. Every value of a sequence is paired with: |
| 162 | +* Any older value of the opposite sequence, if the window of the older sequence is still open |
| 163 | +* Any newer value of the opposite sequence, if the window for this value is still open |
| 164 | + |
| 165 | + |
| 166 | +## groupJoin |
| 167 | + |
| 168 | +As soon as it detected a pair, `join` passed the two values to the result selector and emitted the result. `groupJoin` takes it one step further. Let's start with the signature |
| 169 | + |
| 170 | +```java |
| 171 | +public final <T2,D1,D2,R> Observable<R> groupJoin( |
| 172 | + Observable<T2> right, |
| 173 | + Func1<? super T,? extends Observable<D1>> leftDuration, |
| 174 | + Func1<? super T2,? extends Observable<D2>> rightDuration, |
| 175 | + Func2<? super T,? super Observable<T2>,? extends R> resultSelector) |
| 176 | +``` |
| 177 | + |
| 178 | +The signature is the same as `join` exept for the `resultSelector`. Now the result selector takes item from the left sequence and an observable of values from the right sequence. That observable will emit every right value that the left value is paired with. The pairing in `groupJoin` is symmetrical, just like `join`, but the contruction of results isn't. Instead of the way it is now, the argument of the `resultSelect` could just as well have been a single `GroupedObservable` with the left value as a key and the right values being emitted. |
| 179 | + |
| 180 | +Lets revisit our example from `join` where the windows on the left never close. |
| 181 | + |
| 182 | +```java |
| 183 | +Observable<String> left = |
| 184 | + Observable.interval(100, TimeUnit.MILLISECONDS) |
| 185 | + .map(i -> "L" + i) |
| 186 | + .take(6); |
| 187 | +Observable<String> right = |
| 188 | + Observable.interval(200, TimeUnit.MILLISECONDS) |
| 189 | + .map(i -> "R" + i) |
| 190 | + .take(3); |
| 191 | + |
| 192 | +left |
| 193 | + .groupJoin( |
| 194 | + right, |
| 195 | + i -> Observable.never(), |
| 196 | + i -> Observable.timer(0, TimeUnit.MILLISECONDS), |
| 197 | + (l, rs) -> rs.toList().subscribe(list -> System.out.println(l + ": " + list)) |
| 198 | + ) |
| 199 | + .subscribe(); |
| 200 | +``` |
| 201 | +Output |
| 202 | +``` |
| 203 | +L0: [R0, R1, R2] |
| 204 | +L1: [R0, R1, R2] |
| 205 | +L2: [R1, R2] |
| 206 | +L3: [R1, R2] |
| 207 | +L4: [R2] |
| 208 | +L5: [R2] |
| 209 | +``` |
| 210 | + |
| 211 | +In the result selector, we a left value and an observable of right values. We used that to print all the values from the right that were paired to each right value. If you go back to the example using `join`, you'll see that the pairs are the same. What is changes is how they are made available to us to process. |
| 212 | + |
| 213 | +You can implement `join` with `groupJoin` and `flatMap` |
| 214 | +```java |
| 215 | +.join( |
| 216 | + right, |
| 217 | + leftDuration |
| 218 | + rightDuration, |
| 219 | + (l,r) -> joinResultSelector(l,r) |
| 220 | +) |
| 221 | +// same as |
| 222 | +.groupJoin( |
| 223 | + right, |
| 224 | + leftDuration |
| 225 | + rightDuration, |
| 226 | + (l, rs) -> rs.map(r -> joinResultSelector(l,r)) |
| 227 | +) |
| 228 | +.flatMap(i -> i) |
| 229 | +``` |
| 230 | + |
| 231 | +You can also implement `groupJoin` with `join` and `groupBy`. Doing so would require you to contruct tuples as a result and do `groupBy` on the left part of the tuple. Since Java doesn't provide default tuples, we will leave an example to the reader's imagination. |
1 | 232 |
|
2 | 233 |
|
3 | 234 | #### Continue reading |
|
0 commit comments