Index: runtime/lib/isolate_patch.dart |
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart |
index a2713d10c604716cf53d1aebbe1a7f90248a4519..e9fd3bba5b862f15560a6b8ac4aa1f282b71aa8c 100644 |
--- a/runtime/lib/isolate_patch.dart |
+++ b/runtime/lib/isolate_patch.dart |
@@ -2,85 +2,44 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-class _CloseToken { |
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
- /// close themselves. |
- const _CloseToken(); |
-} |
- |
-patch bool _isCloseToken(var object) { |
- // TODO(floitsch): can we compare against const _CloseToken()? |
- return object is _CloseToken; |
-} |
- |
-patch class MessageBox { |
- /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort()); |
- MessageBox._oneShot(ReceivePort receivePort) |
- : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), |
- sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
- |
- /* patch */ MessageBox() : this._(new ReceivePort()); |
- MessageBox._(ReceivePort receivePort) |
- : stream = new IsolateStream._fromOriginalReceivePort(receivePort), |
- sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
-} |
- |
-class _IsolateSink implements IsolateSink { |
- bool _isClosed = false; |
- final SendPort _port; |
- _IsolateSink._fromPort(this._port); |
- |
- void add(dynamic message) { |
- _port.send(message); |
- } |
- |
- void addError(Object errorEvent) { |
- throw new UnimplementedError("addError on isolate streams"); |
- } |
- |
- void close() { |
- if (_isClosed) return; |
- add(const _CloseToken()); |
- _isClosed = true; |
- } |
- |
- bool operator==(var other) { |
- return other is IsolateSink && _port == other._port; |
- } |
- |
- int get hashCode => _port.hashCode + 499; |
-} |
- |
-patch IsolateSink streamSpawnFunction( |
- void topLevelFunction(), |
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) { |
- SendPort sendPort = spawnFunction(topLevelFunction, |
- unhandledExceptionCallback); |
- return new _IsolateSink._fromPort(sendPort); |
-} |
- |
patch class ReceivePort { |
/* patch */ factory ReceivePort() { |
- return new _ReceivePortImpl(); |
+ _ReceivePortImpl result = new _ReceivePortImpl(); |
+ // TODO(floitsch): remove the hack to close receive-ports on cancel. |
+ result._controller = new StreamController(onCancel: result._close); |
+ return result; |
} |
} |
-class _ReceivePortImpl implements ReceivePort { |
+class _ReceivePortImpl extends Stream implements ReceivePort { |
factory _ReceivePortImpl() native "ReceivePortImpl_factory"; |
+ // Deprecated. |
receive(void onMessage(var message, SendPort replyTo)) { |
_onMessage = onMessage; |
} |
+ // Deprecated. |
close() { |
+ _close(); |
+ } |
+ |
+ _close() { |
_portMap.remove(_id); |
_closeInternal(_id); |
} |
- SendPort toSendPort() { |
+ SendPort get sendPort { |
return new _SendPortImpl(_id); |
} |
+ StreamSubscription listen(void onData(var message), |
+ { Function onError, |
+ void onDone(), |
+ bool cancelOnError }) { |
+ return _controller.stream.listen(onData); |
+ } |
+ |
/**** Internal implementation details ****/ |
// Called from the VM to create a new ReceivePort instance. |
static _ReceivePortImpl _get_or_create(int id) { |
@@ -110,13 +69,18 @@ class _ReceivePortImpl implements ReceivePort { |
static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { |
assert(port != null); |
SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); |
- (port._onMessage)(message, replyTo); |
+ if (port._onMessage != null) { |
+ (port._onMessage)(message, replyTo); |
+ } else { |
+ port._controller.add(message); |
+ } |
} |
// Call into the VM to close the VM maintained mappings. |
static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; |
final int _id; |
+ StreamController _controller; |
var _onMessage; |
// id to ReceivePort mapping. |
@@ -135,6 +99,7 @@ class _SendPortImpl implements SendPort { |
_sendInternal(_id, replyId, message); |
} |
+ /// Deprecated. |
Future call(var message) { |
final completer = new Completer.sync(); |
final port = new _ReceivePortImpl(); |
@@ -179,6 +144,53 @@ _getPortInternal() native "isolate_getPortInternal"; |
ReceivePort _portInternal; |
+typedef _ZeroArgFunction(); |
+ |
+/** |
+ * Takes the real entry point as argument and invokes it with the initial |
+ * message. |
+ * |
+ * The initial message is (currently) received through the global port variable. |
+ */ |
+void _startIsolate(Function entryPoint) { |
+ bool first = true; |
+ _Isolate.port.receive((message, replyTo) { |
+ assert(first); |
+ first = false; |
+ var initialMessage = message[0]; |
+ var reply = message[1]; |
+ reply.send("started"); |
+ if (entryPoint is _ZeroArgFunction) { |
+ entryPoint(); |
+ } else { |
+ entryPoint(initialMessage); |
+ } |
+ }); |
+} |
+ |
+patch class Isolate { |
+ /* patch */ static Future<Isolate> spawn( |
+ void entryPoint(message), var message, { bool startPaused: false }) { |
+ if (startPaused) throw new UnimplementedError("spawn paused isolate"); |
+ return new Future<Isolate>.sync(() { |
+ // The VM will invoke [_startIsolate] with entryPoint as argument. |
+ SendPort controlPort = _Isolate._spawnFunction(entryPoint); |
+ ReceivePort readyPort = new ReceivePort(); |
+ controlPort.send([message, readyPort.sendPort]); |
+ Completer completer = new Completer<Isolate>(); |
+ readyPort.receive((ignored1, ignored2) { |
+ readyPort.close(); |
+ completer.complete(new Isolate._fromControlPort(controlPort)); |
+ }); |
+ return completer.future; |
+ }); |
+ } |
+ |
+ /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) { |
+ throw new UnimplementedError("Isolate.spawnUri"); |
+ } |
+} |
+ |
patch class _Isolate { |
/* patch */ static ReceivePort get port { |
if (_portInternal == null) { |
@@ -187,8 +199,7 @@ patch class _Isolate { |
return _portInternal; |
} |
- /* patch */ static SendPort spawnFunction(void topLevelFunction(), |
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) |
+ static SendPort _spawnFunction(Function topLevelFunction) |
native "isolate_spawnFunction"; |
/* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; |