You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: Part 4 - Concurrency/1. Scheduling and threading.md
+16-12Lines changed: 16 additions & 12 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -46,6 +46,8 @@ public final Observable<T> subscribeOn(Scheduler scheduler)
46
46
47
47
In Rx you don't juggle threads directly. Instead you wrap them in policies called `Scheduler`. We will see more on that later.
48
48
49
+
### subscribeOn
50
+
49
51
With `subscribeOn` you decide on what thread the `Observable.create` is executed. Even if you're not calling `create` yourself, there is an internal equivalent to it. Consider the following example:
50
52
51
53
```java
@@ -73,7 +75,7 @@ Received 2 on 1
73
75
Finished main: 1
74
76
```
75
77
76
-
We see here that, non only is everything executed on the same thread, it is actually sequential: `create` does not unblock until our lambda has completed, and that include the calls to `onNext`, which execute the entire chain of operators. Effectively, `create` is blocking.
78
+
We see here that, non only is everything executed on the same thread, it is actually sequential: `subscribe` does not unblock until it has completed creating (and subscribing to) its observable, which means executing all of `create`'s lambda. The calls to `onNext` within that lambda execute the entire chain of operators, all the way to the `println`. Effectively, `create` is blocking.
77
79
78
80
If you uncomment `.subscribeOn(Schedulers.newThread())`, the output now is
79
81
```
@@ -84,9 +86,9 @@ Received 1 on 11
84
86
Received 2 on 11
85
87
```
86
88
87
-
`Schedulers.newThread()` provided a new thread for our lambda function to run on. `create` no longer blocks and the main thread is free to proceed.
89
+
`Schedulers.newThread()` provided a new thread for our lambda function to run on. `subscribe` no longer blocks until `create`'s lambda is executed and the main thread is free to proceed.
88
90
89
-
Some observables create their own threads regardless of you what you requested. For example, `Observable.interval` is anyway asynchronous. In such cases, `subscribeOn` will dictate on what thread to run the function which creates the resources. It gives you no control over what resources the source of your observable requires.
91
+
Some observables create their own threads regardless of you what you requested. For example, `Observable.interval` is asynchronous regardless. In such cases, `subscribeOn` will dictate on what thread to run the function which creates the resources, which typically won't be helpful. It gives you no control over what resources the source of your observable requires.
`observeOn` controls the other side of the pipeline. The creation and emission of values will work like normal, but the actions of your observer will be invoked on a different thread, as specified by the `Scheduler`.
112
+
### observeOn
113
+
114
+
`observeOn` controls the other side of the pipeline. The creation and emission of values will work like normal, but the actions of your observer will be invoked on a different thread, as specified by the `Scheduler` policy.
111
115
112
116
```java
113
117
Observable.create(o -> {
114
-
System.out.println("Created on "+Thread.currentThread().getId());
115
-
o.onNext(1);
116
-
o.onNext(2);
117
-
o.onCompleted();
118
-
})
119
-
.observeOn(Schedulers.newThread())
120
-
.subscribe(i ->
121
-
System.out.println("Received "+ i +" on "+Thread.currentThread().getId()));
118
+
System.out.println("Created on "+Thread.currentThread().getId());
119
+
o.onNext(1);
120
+
o.onNext(2);
121
+
o.onCompleted();
122
+
})
123
+
.observeOn(Schedulers.newThread())
124
+
.subscribe(i ->
125
+
System.out.println("Received "+ i +" on "+Thread.currentThread().getId()));
0 commit comments