forked from angular/angular
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync.dart
More file actions
132 lines (104 loc) · 3.01 KB
/
async.dart
File metadata and controls
132 lines (104 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
library angular2.core.facade.async;
import 'dart:async';
export 'dart:async' show Stream, StreamController, StreamSubscription;
export 'promise.dart';
class TimerWrapper {
static Timer setTimeout(fn(), int millis) =>
new Timer(new Duration(milliseconds: millis), fn);
static void clearTimeout(Timer timer) {
timer.cancel();
}
static Timer setInterval(fn(), int millis) {
var interval = new Duration(milliseconds: millis);
return new Timer.periodic(interval, (Timer timer) {
fn();
});
}
static void clearInterval(Timer timer) {
timer.cancel();
}
}
class ObservableWrapper {
static StreamSubscription subscribe/*<T>*/(Stream s, onNext(/*=T*/ value),
[onError, onComplete]) {
return s.listen(onNext,
onError: onError, onDone: onComplete, cancelOnError: true);
}
static bool isObservable(obs) {
return obs is Stream;
}
/**
* Returns whether `emitter` has any subscribers listening to events.
*/
static bool hasSubscribers(EventEmitter emitter) {
return emitter._controller.hasListener;
}
static void dispose(StreamSubscription s) {
s.cancel();
}
@Deprecated('Use callEmit() instead')
static void callNext(EventEmitter emitter, value) {
emitter.add(value);
}
static void callEmit(EventEmitter emitter, value) {
emitter.add(value);
}
static void callError(EventEmitter emitter, error) {
emitter.addError(error);
}
static void callComplete(EventEmitter emitter) {
emitter.close();
}
static Stream fromPromise(Future f) {
return new Stream.fromFuture(f);
}
static Future toPromise(Stream s) {
return s.single;
}
}
class EventEmitter<T> extends Stream<T> {
StreamController<T> _controller;
/// Creates an instance of [EventEmitter], which depending on [isAsync],
/// delivers events synchronously or asynchronously.
EventEmitter([bool isAsync = true]) {
_controller = new StreamController<T>.broadcast(sync: !isAsync);
}
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
void add(value) {
_controller.add(value);
}
void emit(value) {
_controller.add(value);
}
void addError(error) {
_controller.addError(error);
}
void close() {
_controller.close();
}
}
//todo(robwormald): maybe fix in ts2dart?
class Subject<T> extends Stream<T> {
StreamController<T> _controller;
Subject([bool isAsync = true]) {
_controller = new StreamController<T>.broadcast(sync: !isAsync);
}
StreamSubscription<T> listen(void onData(T data),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
void add(value) {
_controller.add(value);
}
void addError(error) {
_controller.addError(error);
}
void close() {
_controller.close();
}
}