Index: runtime/lib/isolate_patch.dart |
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart |
index a2713d10c604716cf53d1aebbe1a7f90248a4519..a5cb017a7b3d4d9b8b249ae926a387776aa68cc9 100644 |
--- a/runtime/lib/isolate_patch.dart |
+++ b/runtime/lib/isolate_patch.dart |
@@ -2,85 +2,32 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-class _CloseToken { |
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
- /// close themselves. |
- const _CloseToken(); |
-} |
- |
-patch bool _isCloseToken(var object) { |
- // TODO(floitsch): can we compare against const _CloseToken()? |
- return object is _CloseToken; |
-} |
- |
-patch class MessageBox { |
- /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort()); |
- MessageBox._oneShot(ReceivePort receivePort) |
- : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), |
- sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
- |
- /* patch */ MessageBox() : this._(new ReceivePort()); |
- MessageBox._(ReceivePort receivePort) |
- : stream = new IsolateStream._fromOriginalReceivePort(receivePort), |
- sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
-} |
- |
-class _IsolateSink implements IsolateSink { |
- bool _isClosed = false; |
- final SendPort _port; |
- _IsolateSink._fromPort(this._port); |
- |
- void add(dynamic message) { |
- _port.send(message); |
- } |
- |
- void addError(Object errorEvent) { |
- throw new UnimplementedError("addError on isolate streams"); |
- } |
- |
- void close() { |
- if (_isClosed) return; |
- add(const _CloseToken()); |
- _isClosed = true; |
- } |
- |
- bool operator==(var other) { |
- return other is IsolateSink && _port == other._port; |
- } |
- |
- int get hashCode => _port.hashCode + 499; |
-} |
- |
-patch IsolateSink streamSpawnFunction( |
- void topLevelFunction(), |
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) { |
- SendPort sendPort = spawnFunction(topLevelFunction, |
- unhandledExceptionCallback); |
- return new _IsolateSink._fromPort(sendPort); |
-} |
- |
patch class ReceivePort { |
/* patch */ factory ReceivePort() { |
return new _ReceivePortImpl(); |
} |
} |
-class _ReceivePortImpl implements ReceivePort { |
+class _ReceivePortImpl extends Stream implements ReceivePort { |
factory _ReceivePortImpl() native "ReceivePortImpl_factory"; |
- receive(void onMessage(var message, SendPort replyTo)) { |
- _onMessage = onMessage; |
- } |
- |
close() { |
_portMap.remove(_id); |
_closeInternal(_id); |
+ _controller.close(); |
} |
- SendPort toSendPort() { |
+ SendPort get sendPort { |
return new _SendPortImpl(_id); |
} |
+ StreamSubscription listen(void onData(var message), |
+ { Function onError, |
+ void onDone(), |
+ bool cancelOnError }) { |
+ return _controller.stream.listen(onData); |
+ } |
+ |
/**** Internal implementation details ****/ |
// Called from the VM to create a new ReceivePort instance. |
static _ReceivePortImpl _get_or_create(int id) { |
@@ -98,6 +45,9 @@ class _ReceivePortImpl implements ReceivePort { |
_portMap = new Map(); |
} |
_portMap[id] = this; |
+ |
+ // TODO(floitsch): remove the hack to close receive-ports on cancel. |
+ _controller = new StreamController(onCancel: close, sync: true); |
} |
// Called from the VM to retrieve the ReceivePort for a message. |
@@ -109,15 +59,14 @@ class _ReceivePortImpl implements ReceivePort { |
// Called from the VM to dispatch to the handler. |
static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { |
assert(port != null); |
- SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); |
- (port._onMessage)(message, replyTo); |
+ port._controller.add(message); |
} |
// Call into the VM to close the VM maintained mappings. |
static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; |
final int _id; |
- var _onMessage; |
+ StreamController _controller; |
// id to ReceivePort mapping. |
static Map _portMap; |
@@ -135,21 +84,6 @@ class _SendPortImpl implements SendPort { |
_sendInternal(_id, replyId, message); |
} |
- Future call(var message) { |
- final completer = new Completer.sync(); |
- final port = new _ReceivePortImpl(); |
- send(message, port.toSendPort()); |
- port.receive((value, ignoreReplyTo) { |
- port.close(); |
- if (value is Exception) { |
- completer.completeError(value); |
- } else { |
- completer.complete(value); |
- } |
- }); |
- return completer.future; |
- } |
- |
bool operator==(var other) { |
return (other is _SendPortImpl) && _id == other._id; |
} |
@@ -179,6 +113,42 @@ _getPortInternal() native "isolate_getPortInternal"; |
ReceivePort _portInternal; |
+/** |
+ * Takes the real entry point as argument and invokes it with the initial |
+ * message. |
+ * |
+ * The initial message is (currently) received through the global port variable. |
+ */ |
+void _startIsolate(void entryPoint(message)) { |
+ _Isolate.port.first.then((message) { |
+ var initialMessage = message[0]; |
+ var reply = message[1]; |
+ reply.send("started"); |
+ entryPoint(initialMessage); |
+ }); |
+} |
+ |
+patch class Isolate { |
+ /* patch */ static Future<Isolate> spawn( |
+ void entryPoint(message), var message) { |
+ return new Future<Isolate>.sync(() { |
+ // The VM will invoke [_startIsolate] with entryPoint as argument. |
+ SendPort controlPort = _Isolate._spawnFunction(entryPoint); |
+ ReceivePort readyPort = new ReceivePort(); |
+ controlPort.send([message, readyPort.sendPort]); |
+ Completer completer = new Completer<Isolate>(); |
+ readyPort.first.then((_) { |
+ completer.complete(new Isolate._fromControlPort(controlPort)); |
+ }); |
+ return completer.future; |
+ }); |
+ } |
+ |
+ /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) { |
+ throw new UnimplementedError("Isolate.spawnUri"); |
+ } |
+} |
+ |
patch class _Isolate { |
/* patch */ static ReceivePort get port { |
if (_portInternal == null) { |
@@ -187,8 +157,7 @@ patch class _Isolate { |
return _portInternal; |
} |
- /* patch */ static SendPort spawnFunction(void topLevelFunction(), |
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) |
+ static SendPort _spawnFunction(Function topLevelFunction) |
native "isolate_spawnFunction"; |
/* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; |