You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: Part 4 - Concurrency/4. Backpressure.md
+124Lines changed: 124 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -145,6 +145,130 @@ class MySubscriber extends Subscriber<T> {
145
145
146
146
The `request(1)` in `onStart` establishes backpressure and that the observable should only emit the first value. After processing it in `onNext`, we request the next item to be sent, if and when it is available. Calling `request(Long.MAX_VALUE)` would disable backpressure.
147
147
148
+
### doOnRequested
149
+
150
+
Back we where discussing the `doOn_` operators for [side effects](/Part%203%20-%20Taming%20the%20sequence/1.%20Side%20effects.md#do), we left out `doOnRequested`.
The `doOnRequested` meta-event happens when a subscriber requests for more items. The value supplied to the action is the number of items requested.
155
+
156
+
At this moment, `doOnRequest` is in beta. It is the only beta operator that we will discuss in this book. We're making an exception, because it enables us to peek into stable backpressure functionality that is otherwise hidden. Let's see what happens in the most simple observable
We see that `subscribe` requests the maximum number of items from the beginning. That means that `subscribe` doesn't resist values at all. Subscribe will only use backpressure if we provide a subscriber that implements backpressure. Here is a complete example for such an implementation
First we requested no emissions. Then we requested 2 and we got 2 values.
249
+
250
+
Rx operators that use queues and buffers internally should use backpressure to avoid storing an infinite amount of values. Large-scale buffering should be left to operators that explicitly serve this purpose, such as `cache`, `buffer` etc. `zip` is one operator that needs to buffer items: the first observable might emit two or more values before the second observable emits its next value. Such small asymmetries are expected and they shouldn't cause the operator to fail. For that reason, `zip` has a small buffer of 128 items.
The `zip` operator starts by requesting enough items to fill its buffer, and requests more when it has consumed enough. The details of how many items `zip` requests isn't interesting. What the reader should take away is the realisation that some buffering and backpressure exist in Rx whether the developer requests for it or not. This gives an Rx pipeline some flexibility where you might expect none. This might trick you into thinking that your code is solid, by silently saving small tests from failing, but you're not safe until you have explicitly declared behaviour with regard to backpressure.
270
+
271
+
148
272
## Backpressure policies
149
273
150
274
Many Rx operators use backpressure internally to avoid overfilling their internal queues. This way, the problem of a slow consumer is propagated backwards in the chain of operators. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. We still need to decide what to do with the values of an overproducing observable.
0 commit comments