Index: runtime/lib/isolate_patch.dart |
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart |
index a2713d10c604716cf53d1aebbe1a7f90248a4519..867a031e020ffd31fbad37b9617d02bbb93ba05f 100644 |
--- a/runtime/lib/isolate_patch.dart |
+++ b/runtime/lib/isolate_patch.dart |
@@ -2,125 +2,109 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-class _CloseToken { |
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
- /// close themselves. |
- const _CloseToken(); |
-} |
+patch class ReceivePort { |
+ /* patch */ factory ReceivePort() = _ReceivePortImpl; |
-patch bool _isCloseToken(var object) { |
- // TODO(floitsch): can we compare against const _CloseToken()? |
- return object is _CloseToken; |
+ /* patch */ factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) = |
+ _ReceivePortImpl.fromRawReceivePort; |
} |
-patch class MessageBox { |
- /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort()); |
- MessageBox._oneShot(ReceivePort receivePort) |
- : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), |
- sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
- |
- /* patch */ MessageBox() : this._(new ReceivePort()); |
- MessageBox._(ReceivePort receivePort) |
- : stream = new IsolateStream._fromOriginalReceivePort(receivePort), |
- sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
+patch class RawReceivePort { |
+ /** |
+ * Opens a long-lived port for receiving messages. |
+ * |
+ * A [RawReceivePort] is low level and does not work with [Zone]s. It |
+ * can not be paused. The data-handler must be set before the first |
+ * event is received. |
+ */ |
+ /* patch */ factory RawReceivePort([void handler(event)]) { |
+ _RawReceivePortImpl result = new _RawReceivePortImpl(); |
+ result.handler = handler; |
+ return result; |
+ } |
} |
-class _IsolateSink implements IsolateSink { |
- bool _isClosed = false; |
- final SendPort _port; |
- _IsolateSink._fromPort(this._port); |
+class _ReceivePortImpl extends Stream implements ReceivePort { |
+ _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort()); |
- void add(dynamic message) { |
- _port.send(message); |
+ _ReceivePortImpl.fromRawReceivePort(this._rawPort) { |
+ _controller = new StreamController(onCancel: close, sync: true); |
+ _rawPort.handler = _controller.add; |
} |
- void addError(Object errorEvent) { |
- throw new UnimplementedError("addError on isolate streams"); |
+ SendPort get sendPort { |
+ return _rawPort.sendPort; |
} |
- void close() { |
- if (_isClosed) return; |
- add(const _CloseToken()); |
- _isClosed = true; |
+ StreamSubscription listen(void onData(var message), |
+ { Function onError, |
+ void onDone(), |
+ bool cancelOnError }) { |
+ return _controller.stream.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ cancelOnError: cancelOnError); |
} |
- bool operator==(var other) { |
- return other is IsolateSink && _port == other._port; |
+ close() { |
+ _rawPort.close(); |
+ _controller.close(); |
} |
- int get hashCode => _port.hashCode + 499; |
+ final RawReceivePort _rawPort; |
+ StreamController _controller; |
} |
-patch IsolateSink streamSpawnFunction( |
- void topLevelFunction(), |
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) { |
- SendPort sendPort = spawnFunction(topLevelFunction, |
- unhandledExceptionCallback); |
- return new _IsolateSink._fromPort(sendPort); |
-} |
- |
-patch class ReceivePort { |
- /* patch */ factory ReceivePort() { |
- return new _ReceivePortImpl(); |
- } |
-} |
- |
-class _ReceivePortImpl implements ReceivePort { |
- factory _ReceivePortImpl() native "ReceivePortImpl_factory"; |
- |
- receive(void onMessage(var message, SendPort replyTo)) { |
- _onMessage = onMessage; |
- } |
+class _RawReceivePortImpl implements RawReceivePort { |
+ factory _RawReceivePortImpl() native "RawReceivePortImpl_factory"; |
close() { |
_portMap.remove(_id); |
_closeInternal(_id); |
} |
- SendPort toSendPort() { |
+ SendPort get sendPort { |
return new _SendPortImpl(_id); |
} |
/**** Internal implementation details ****/ |
- // Called from the VM to create a new ReceivePort instance. |
- static _ReceivePortImpl _get_or_create(int id) { |
- if (_portMap != null) { |
- _ReceivePortImpl port = _portMap[id]; |
- if (port != null) { |
- return port; |
- } |
+ // Called from the VM to create a new RawReceivePort instance. |
+ static _RawReceivePortImpl _get_or_create(int id) { |
+ _RawReceivePortImpl port = _portMap[id]; |
+ if (port != null) { |
+ return port; |
} |
- return new _ReceivePortImpl._internal(id); |
+ return new _RawReceivePortImpl._internal(id); |
} |
- _ReceivePortImpl._internal(int id) : _id = id { |
- if (_portMap == null) { |
- _portMap = new Map(); |
- } |
+ _RawReceivePortImpl._internal(int id) : _id = id { |
_portMap[id] = this; |
} |
- // Called from the VM to retrieve the ReceivePort for a message. |
- static _ReceivePortImpl _lookupReceivePort(int id) { |
- assert(_portMap != null); |
+ // Called from the VM to retrieve the RawReceivePort for a message. |
+ static _RawReceivePortImpl _lookupReceivePort(int id) { |
return _portMap[id]; |
} |
// Called from the VM to dispatch to the handler. |
- static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { |
+ static void _handleMessage( |
+ _RawReceivePortImpl port, int replyId, var message) { |
assert(port != null); |
- SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); |
- (port._onMessage)(message, replyTo); |
+ port._handler(message); |
} |
// Call into the VM to close the VM maintained mappings. |
- static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; |
+ static _closeInternal(int id) native "RawReceivePortImpl_closeInternal"; |
+ |
+ void set handler(Function newHandler) { |
+ this._handler = newHandler; |
+ } |
final int _id; |
- var _onMessage; |
+ Function _handler; |
- // id to ReceivePort mapping. |
- static Map _portMap; |
+ // id to RawReceivePort mapping. |
+ static final Map _portMap = new HashMap(); |
} |
@@ -135,27 +119,19 @@ class _SendPortImpl implements SendPort { |
_sendInternal(_id, replyId, message); |
} |
- Future call(var message) { |
- final completer = new Completer.sync(); |
- final port = new _ReceivePortImpl(); |
- send(message, port.toSendPort()); |
- port.receive((value, ignoreReplyTo) { |
- port.close(); |
- if (value is Exception) { |
- completer.completeError(value); |
- } else { |
- completer.complete(value); |
- } |
- }); |
- return completer.future; |
- } |
- |
bool operator==(var other) { |
return (other is _SendPortImpl) && _id == other._id; |
} |
int get hashCode { |
- return _id; |
+ const int MASK = 0x3FFFFFFF; |
+ int hash = _id; |
+ hash = (hash + ((hash & (MASK >> 10)) << 10)) & MASK; |
+ hash ^= (hash >> 6); |
+ hash = (hash + ((hash & (MASK >> 3)) << 3)) & MASK; |
+ hash ^= (hash >> 11); |
+ hash = (hash + ((hash & (MASK >> 15)) << 15)) & MASK; |
+ return hash; |
} |
/*--- private implementation ---*/ |
@@ -177,19 +153,86 @@ class _SendPortImpl implements SendPort { |
_getPortInternal() native "isolate_getPortInternal"; |
-ReceivePort _portInternal; |
- |
-patch class _Isolate { |
- /* patch */ static ReceivePort get port { |
- if (_portInternal == null) { |
- _portInternal = _getPortInternal(); |
+typedef _MainFunction(); |
+typedef _MainFunctionArgs(args); |
+typedef _MainFunctionArgsMessage(args, message); |
+ |
+/** |
+ * Takes the real entry point as argument and invokes it with the initial |
+ * message. |
+ * |
+ * The initial message is (currently) received through the global port variable. |
+ */ |
+void _startIsolate(Function entryPoint, bool isSpawnUri) { |
+ Isolate._port.first.then((message) { |
+ SendPort replyTo = message[0]; |
+ // TODO(floitsch): don't send ok-message if we can't find the entry point. |
+ replyTo.send("started"); |
+ if (isSpawnUri) { |
+ assert(message.length == 3); |
+ List<String> args = message[1]; |
+ var isolateMessage = message[2]; |
+ if (entryPoint is _MainFunctionArgsMessage) { |
+ entryPoint(args, isolateMessage); |
+ } else if (entryPoint is _MainFunctionArgs) { |
+ entryPoint(args); |
+ } else { |
+ entryPoint(); |
+ } |
+ } else { |
+ assert(message.length == 2); |
+ var entryMessage = message[1]; |
+ entryPoint(entryMessage); |
} |
- return _portInternal; |
+ }); |
+} |
+ |
+patch class Isolate { |
+ /* patch */ static Future<Isolate> spawn( |
+ void entryPoint(message), var message) { |
+ Completer completer = new Completer<Isolate>.sync(); |
+ try { |
+ // The VM will invoke [_startIsolate] with entryPoint as argument. |
+ SendPort controlPort = _spawnFunction(entryPoint); |
+ RawReceivePort readyPort = new RawReceivePort(); |
+ controlPort.send([readyPort.sendPort, message]); |
+ readyPort.handler = (readyMessage) { |
+ assert(readyMessage == 'started'); |
+ readyPort.close(); |
+ completer.complete(new Isolate._fromControlPort(controlPort)); |
+ }; |
+ } catch(e, st) { |
+ // TODO(floitsch): we want errors to go into the returned future. |
+ rethrow; |
+ }; |
+ return completer.future; |
} |
- /* patch */ static SendPort spawnFunction(void topLevelFunction(), |
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) |
+ /* patch */ static Future<Isolate> spawnUri( |
+ Uri uri, List<String> args, var message) { |
+ Completer completer = new Completer<Isolate>.sync(); |
+ try { |
+ // The VM will invoke [_startIsolate] and not `main`. |
+ SendPort controlPort = _spawnUri(uri.path); |
+ RawReceivePort readyPort = new RawReceivePort(); |
+ controlPort.send([readyPort.sendPort, args, message]); |
+ readyPort.handler = (readyMessage) { |
+ assert(readyMessage == 'started'); |
+ readyPort.close(); |
+ completer.complete(new Isolate._fromControlPort(controlPort)); |
+ }; |
+ } catch(e, st) { |
+ // TODO(floitsch): we want errors to go into the returned future. |
+ rethrow; |
+ }; |
+ return completer.future; |
+ } |
+ |
+ static final ReceivePort _port = |
+ new ReceivePort.fromRawReceivePort(_getPortInternal()); |
+ |
+ static SendPort _spawnFunction(Function topLevelFunction) |
native "isolate_spawnFunction"; |
- /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; |
+ static SendPort _spawnUri(String uri) native "isolate_spawnUri"; |
} |