X Tutup
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions modules/angular2/src/core/zone/ng_zone.dart
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,15 @@ class NgZone {
_inVmTurnDone = true;
parent.run(_innerZone, _onTurnDone);

if (_pendingMicrotasks == 0 && _onEventDone != null) {
runOutsideAngular(_onEventDone);
}
} finally {
_inVmTurnDone = false;
_hasExecutedCodeInInnerZone = false;
}
}

if (_pendingMicrotasks == 0 && _onEventDone != null) {
runOutsideAngular(_onEventDone);
}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions modules/angular2/src/core/zone/ng_zone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export class NgZone {
*
* This hook is useful for validating application state (e.g. in a test).
*/
overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean): void {
overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean = false): void {
var normalizedOnEventDone = normalizeBlank(onEventDoneFn);
if (opt_waitForAsync) {
this._onEventDone = () => {
Expand Down Expand Up @@ -212,14 +212,15 @@ export class NgZone {
try {
this._inVmTurnDone = true;
parentRun.call(ngZone._innerZone, ngZone._onTurnDone);
if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) {
ngZone.runOutsideAngular(ngZone._onEventDone);
}
} finally {
this._inVmTurnDone = false;
ngZone._hasExecutedCodeInInnerZone = false;
}
}

if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) {
ngZone.runOutsideAngular(ngZone._onEventDone);
}
}
}
};
Expand Down
8 changes: 8 additions & 0 deletions modules/angular2/src/mock/ng_zone_mock.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import {NgZone} from 'angular2/src/core/zone/ng_zone';

export class MockNgZone extends NgZone {
_onEventDone: () => void;

constructor() { super({enableLongStackTrace: false}); }

run(fn: Function): any { return fn(); }

runOutsideAngular(fn: Function): any { return fn(); }

overrideOnEventDone(fn: () => void, opt_waitForAsync: boolean = false): void {
this._onEventDone = fn;
}

simulateZoneExit(): void { this._onEventDone(); }
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
library angular2.src.web_workers.debug_tools.multi_client_server_message_bus;

import "package:angular2/src/web_workers/shared/message_bus.dart"
show MessageBus, MessageBusSink, MessageBusSource;
import 'dart:io';
import 'dart:convert' show JSON;
import 'dart:async';
import 'package:angular2/src/core/facade/async.dart' show EventEmitter;
import 'package:angular2/src/web_workers/shared/messaging_api.dart';
import 'package:angular2/src/web_workers/shared/generic_message_bus.dart';

// TODO(jteplitz602): Remove hard coded result type and
// clear messageHistory once app is done with it #3859
class MultiClientServerMessageBus implements MessageBus {
final MultiClientServerMessageBusSink sink;
MultiClientServerMessageBusSource source;
class MultiClientServerMessageBus extends GenericMessageBus {
bool hasPrimary = false;

MultiClientServerMessageBus(this.sink, this.source);
@override
MultiClientServerMessageBusSink get sink => super.sink;
@override
MultiClientServerMessageBusSource get source => super.source;

MultiClientServerMessageBus(MultiClientServerMessageBusSink sink,
MultiClientServerMessageBusSource source)
: super(sink, source);

MultiClientServerMessageBus.fromHttpServer(HttpServer server)
: sink = new MultiClientServerMessageBusSink() {
source = new MultiClientServerMessageBusSource(resultReceived);
: super(new MultiClientServerMessageBusSink(),
new MultiClientServerMessageBusSource()) {
source.onResult.listen(_resultReceived);
server.listen((HttpRequest request) {
if (request.uri.path == "/ws") {
WebSocketTransformer.upgrade(request).then((WebSocket socket) {
Expand All @@ -38,18 +42,10 @@ class MultiClientServerMessageBus implements MessageBus {
});
}

void resultReceived() {
void _resultReceived(_) {
sink.resultReceived();
}

EventEmitter from(String channel) {
return source.from(channel);
}

EventEmitter to(String channel) {
return sink.to(channel);
}

Function _handleDisconnect(WebSocketWrapper wrapper) {
return () {
sink.removeConnection(wrapper);
Expand All @@ -72,12 +68,15 @@ class WebSocketWrapper {
WebSocketWrapper(this._messageHistory, this._resultMarkers, this.socket) {
stream = socket.asBroadcastStream();
stream.listen((encodedMessage) {
var message = JSON.decode(encodedMessage)['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
resultReceived();
var messages = JSON.decode(encodedMessage);
messages.forEach((data) {
var message = data['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
resultReceived();
}
}
}
});
});
}

Expand Down Expand Up @@ -121,10 +120,9 @@ class WebSocketWrapper {
}
}

class MultiClientServerMessageBusSink implements MessageBusSink {
class MultiClientServerMessageBusSink extends GenericMessageBusSink {
final List<String> messageHistory = new List<String>();
final Set<WebSocketWrapper> openConnections = new Set<WebSocketWrapper>();
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
final List<int> resultMarkers = new List<int>();

void resultReceived() {
Expand All @@ -141,76 +139,77 @@ class MultiClientServerMessageBusSink implements MessageBusSink {
openConnections.remove(webSocket);
}

EventEmitter to(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
emitter.listen((message) {
_send({'channel': channel, 'message': message});
});
return emitter;
}
}

void _send(dynamic message) {
String encodedMessage = JSON.encode(message);
@override
void sendMessages(List<dynamic> messages) {
String encodedMessages = JSON.encode(messages);
openConnections.forEach((WebSocketWrapper webSocket) {
if (webSocket.caughtUp) {
webSocket.socket.add(encodedMessage);
webSocket.socket.add(encodedMessages);
}
});
messageHistory.add(encodedMessage);
messageHistory.add(encodedMessages);
}
}

class MultiClientServerMessageBusSource implements MessageBusSource {
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
class MultiClientServerMessageBusSource extends GenericMessageBusSource {
Function onResultReceived;
final StreamController mainController;
final StreamController resultController = new StreamController();

MultiClientServerMessageBusSource(this.onResultReceived);
MultiClientServerMessageBusSource._(controller)
: mainController = controller,
super(controller.stream);

EventEmitter from(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
_channels[channel] = emitter;
return emitter;
}
factory MultiClientServerMessageBusSource() {
return new MultiClientServerMessageBusSource._(
new StreamController.broadcast());
}

Stream get onResult => resultController.stream;

void addConnection(WebSocketWrapper webSocket) {
if (webSocket.isPrimary) {
webSocket.stream.listen((encodedMessage) {
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
// tell the bus that a result was received on the primary
onResultReceived();
webSocket.stream.listen((encodedMessages) {
var decodedMessages = _decodeMessages(encodedMessages);
decodedMessages.forEach((decodedMessage) {
var message = decodedMessage['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
// tell the bus that a result was received on the primary
resultController.add(message);
}
}
}
});

if (_channels.containsKey(channel)) {
_channels[channel].add(message);
}
mainController.add(decodedMessages);
});
} else {
webSocket.stream.listen((encodedMessage) {
// handle events from non-primary browser
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (_channels.containsKey(EVENT_CHANNEL) && channel == EVENT_CHANNEL) {
_channels[channel].add(message);
webSocket.stream.listen((encodedMessages) {
// handle events from non-primary connection.
var decodedMessages = _decodeMessages(encodedMessages);
var eventMessages = new List<Map<String, dynamic>>();
decodedMessages.forEach((decodedMessage) {
var channel = decodedMessage['channel'];
if (channel == EVENT_CHANNEL) {
eventMessages.add(decodedMessage);
}
});
if (eventMessages.length > 0) {
mainController.add(eventMessages);
}
});
}
}

Map<String, dynamic> decodeMessage(dynamic message) {
return JSON.decode(message);
List<dynamic> _decodeMessages(dynamic messages) {
return JSON.decode(messages);
}

// This is a noop for the MultiClientBus because it has to decode the JSON messages before
// the generic bus receives them in order to check for results and forward events
// from the non-primary connection.
@override
List<dynamic> decodeMessages(dynamic messages) {
return messages;
}
}
Loading
X Tutup