X Tutup
Skip to content

Commit ead0cbf

Browse files
committed
4.4 Added doOnRequest
1 parent abffc2a commit ead0cbf

File tree

1 file changed

+124
-0
lines changed

1 file changed

+124
-0
lines changed

Part 4 - Concurrency/4. Backpressure.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,130 @@ class MySubscriber extends Subscriber<T> {
145145

146146
The `request(1)` in `onStart` establishes backpressure and that the observable should only emit the first value. After processing it in `onNext`, we request the next item to be sent, if and when it is available. Calling `request(Long.MAX_VALUE)` would disable backpressure.
147147

148+
### doOnRequested
149+
150+
Back we where discussing the `doOn_` operators for [side effects](/Part%203%20-%20Taming%20the%20sequence/1.%20Side%20effects.md#do), we left out `doOnRequested`.
151+
```java
152+
public final Observable<T> doOnRequest(Action1<java.lang.Long> onRequest)
153+
```
154+
The `doOnRequested` meta-event happens when a subscriber requests for more items. The value supplied to the action is the number of items requested.
155+
156+
At this moment, `doOnRequest` is in beta. It is the only beta operator that we will discuss in this book. We're making an exception, because it enables us to peek into stable backpressure functionality that is otherwise hidden. Let's see what happens in the most simple observable
157+
158+
```java
159+
Observable.range(0, 3)
160+
.doOnRequest(i -> System.out.println("Requested " + i))
161+
.subscribe(System.out::println);
162+
```
163+
Output
164+
```
165+
Requested 9223372036854775807
166+
0
167+
1
168+
2
169+
```
170+
171+
We see that `subscribe` requests the maximum number of items from the beginning. That means that `subscribe` doesn't resist values at all. Subscribe will only use backpressure if we provide a subscriber that implements backpressure. Here is a complete example for such an implementation
172+
173+
```java
174+
public class ControlledPullSubscriber<T> extends Subscriber<T> {
175+
176+
private final Action1<T> onNextAction;
177+
private final Action1<Throwable> onErrorAction;
178+
private final Action0 onCompletedAction;
179+
180+
public ControlledPullSubscriber(
181+
Action1<T> onNextAction,
182+
Action1<Throwable> onErrorAction,
183+
Action0 onCompletedAction) {
184+
this.onNextAction = onNextAction;
185+
this.onErrorAction = onErrorAction;
186+
this.onCompletedAction = onCompletedAction;
187+
}
188+
189+
public ControlledPullSubscriber(
190+
Action1<T> onNextAction,
191+
Action1<Throwable> onErrorAction) {
192+
this(onNextAction, onErrorAction, () -> {});
193+
}
194+
195+
public ControlledPullSubscriber(Action1<T> onNextAction) {
196+
this(onNextAction, e -> {}, () -> {});
197+
}
198+
199+
@Override
200+
public void onStart() {
201+
request(0);
202+
}
203+
204+
@Override
205+
public void onCompleted() {
206+
onCompletedAction.call();
207+
}
208+
209+
@Override
210+
public void onError(Throwable e) {
211+
onErrorAction.call(e);
212+
}
213+
214+
@Override
215+
public void onNext(T t) {
216+
onNextAction.call(t);
217+
}
218+
219+
public void requestMore(int n) {
220+
request(n);
221+
}
222+
}
223+
```
224+
225+
This simple implementation will not request values unless we manually make it do so with `requestMore`.
226+
227+
```java
228+
ControlledPullSubscriber<Integer> puller =
229+
new ControlledPullSubscriber<Integer>(System.out::println);
230+
231+
Observable.range(0, 3)
232+
.doOnRequest(i -> System.out.println("Requested " + i))
233+
.subscribe(puller);
234+
235+
puller.requestMore(2);
236+
puller.requestMore(1);
237+
```
238+
Output
239+
```
240+
Requested 0
241+
Requested 2
242+
0
243+
1
244+
Requested 1
245+
2
246+
```
247+
248+
First we requested no emissions. Then we requested 2 and we got 2 values.
249+
250+
Rx operators that use queues and buffers internally should use backpressure to avoid storing an infinite amount of values. Large-scale buffering should be left to operators that explicitly serve this purpose, such as `cache`, `buffer` etc. `zip` is one operator that needs to buffer items: the first observable might emit two or more values before the second observable emits its next value. Such small asymmetries are expected and they shouldn't cause the operator to fail. For that reason, `zip` has a small buffer of 128 items.
251+
252+
```java
253+
Observable.range(0, 300)
254+
.doOnRequest(i -> System.out.println("Requested " + i))
255+
.zipWith(
256+
Observable.range(10, 300),
257+
(i1, i2) -> i1 + " - " + i2)
258+
.take(300)
259+
.subscribe();
260+
```
261+
Output
262+
```
263+
Requested 128
264+
Requested 90
265+
Requested 90
266+
Requested 90
267+
```
268+
269+
The `zip` operator starts by requesting enough items to fill its buffer, and requests more when it has consumed enough. The details of how many items `zip` requests isn't interesting. What the reader should take away is the realisation that some buffering and backpressure exist in Rx whether the developer requests for it or not. This gives an Rx pipeline some flexibility where you might expect none. This might trick you into thinking that your code is solid, by silently saving small tests from failing, but you're not safe until you have explicitly declared behaviour with regard to backpressure.
270+
271+
148272
## Backpressure policies
149273

150274
Many Rx operators use backpressure internally to avoid overfilling their internal queues. This way, the problem of a slow consumer is propagated backwards in the chain of operators. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. We still need to decide what to do with the values of an overproducing observable.

0 commit comments

Comments
 (0)
X Tutup