X Tutup
Skip to content

Commit 04e235f

Browse files
committed
3.6 Added replay overloads
1 parent 716a276 commit 04e235f

File tree

1 file changed

+59
-1
lines changed

1 file changed

+59
-1
lines changed

Part 3 - Taming the sequence/6. Hot and Cold observables.md

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ There is a variant that takes a selector that transforms a sequence before publi
5656
```java
5757
public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector)
5858
```
59-
The `selector` can do anything that we've learned to do on observables. This convenience method returns an `Observable<T>` instead of a `ConnectableObservable<T>`, so the connection functionality we are about to discuss does not apply there.
59+
The `selector` can do anything that we've learned to do on observables. The usefulness of this is that a single subscription is made for the selector, which can be reused as much as needed. Without this overload, reusing the observable could lead to multiple subscriptions. There is no way to guarantee that the subscriptions would happen at the same exact time and therefore see the exact same sequence.
60+
61+
This method returns an `Observable<T>` instead of a `ConnectableObservable<T>`, so the connection functionality we are about to discuss does not apply there.
6062

6163
### connect
6264

@@ -229,6 +231,62 @@ Second: 3
229231

230232
`replay` returns an `ConnectableObservable` like `publish`, so we can use the same ways to unsubscribe or create a `refCount` observable.
231233

234+
There are 8 overloads for `replay`.
235+
```java
236+
ConnectableObservable<T> replay()
237+
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector)
238+
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize)
239+
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnit unit)
240+
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnit unit)
241+
ConnectableObservable<T> replay(int bufferSize)
242+
ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnit unit)
243+
ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnit unit)
244+
```
245+
246+
They are different ways of providing one or more of 3 parameters: `bufferSize`, `selector` and `time` (plus `unit` for time).
247+
* `bufferSize` determines the maximum amount of items to be stored and replayed. Upon subscription, the observable will replay the last `bufferSize` number of items. Older items are forgotten. This is useful for conserving memory.
248+
* `time`, `unit` determines how old an element can be before being forgotten. Uopn subscription, the observable will replay items that are newer than `time`.
249+
* `selector` will transform the replayed observable, in the same way that `publish(selector)` works.
250+
251+
Here's an example with `bufferSize`
252+
253+
```java
254+
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
255+
.take(5)
256+
.replay(2);
257+
258+
source.connect();
259+
Thread.sleep(4500);
260+
source.subscribe(System.out::println);
261+
```
262+
Output
263+
```
264+
2
265+
3
266+
4
267+
```
268+
269+
When we `connect`, the source begins emitting the sequence 0,1,2,3,4 in 1s intervals. We sleep for 4.5s before subscribing, which means that the source has emitted 0,1,2,3. 0 and 1 have fallen off the buffer, so only 2 and 3 are replayed. When 4 is emitted, we receive it normally.
270+
271+
When providing a time window, items fall off the buffer based on time
272+
273+
```java
274+
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
275+
.take(5)
276+
.replay(2000, TimeUnit.MILLISECONDS);
277+
278+
source.connect();
279+
Thread.sleep(4500);
280+
source.subscribe(System.out::println);
281+
```
282+
Output
283+
```
284+
2
285+
3
286+
4
287+
```
288+
289+
232290
## Multicast
233291

234292
The `share` method is an alias for `Observable.publish().refCount()`. It allows your subscribers to share a subscription. The subscription is kept for as long as there are subscribers.

0 commit comments

Comments
 (0)
X Tutup