Chromium Code Reviews| Index: runtime/lib/isolate_patch.dart |
| diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart |
| index a2713d10c604716cf53d1aebbe1a7f90248a4519..88312681963135c57b400bb62b60e6c3e6bc1fec 100644 |
| --- a/runtime/lib/isolate_patch.dart |
| +++ b/runtime/lib/isolate_patch.dart |
| @@ -2,125 +2,109 @@ |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| -class _CloseToken { |
| - /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
| - /// close themselves. |
| - const _CloseToken(); |
| -} |
| +patch class ReceivePort { |
| + /* patch */ factory ReceivePort() = _ReceivePortImpl; |
| -patch bool _isCloseToken(var object) { |
| - // TODO(floitsch): can we compare against const _CloseToken()? |
| - return object is _CloseToken; |
| + /* patch */ factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) = |
| + _ReceivePortImpl.fromRawReceivePort; |
| } |
| -patch class MessageBox { |
| - /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort()); |
| - MessageBox._oneShot(ReceivePort receivePort) |
| - : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), |
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
| - |
| - /* patch */ MessageBox() : this._(new ReceivePort()); |
| - MessageBox._(ReceivePort receivePort) |
| - : stream = new IsolateStream._fromOriginalReceivePort(receivePort), |
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
| +patch class RawReceivePort { |
| + /** |
| + * Opens a long-lived port for receiving messages. |
| + * |
| + * A [RawReceivePort] is low level and does not work with [Zone]s. It |
| + * can not be paused. The data-handler must be set before the first |
| + * event is received. |
| + */ |
| + /* patch */ factory RawReceivePort([void handler(event)]) { |
| + _RawReceivePortImpl result = new _RawReceivePortImpl(); |
| + result.handler = handler; |
| + return result; |
| + } |
| } |
| -class _IsolateSink implements IsolateSink { |
| - bool _isClosed = false; |
| - final SendPort _port; |
| - _IsolateSink._fromPort(this._port); |
| +class _ReceivePortImpl extends Stream implements ReceivePort { |
| + _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort()); |
| - void add(dynamic message) { |
| - _port.send(message); |
| + _ReceivePortImpl.fromRawReceivePort(this._rawPort) { |
| + _controller = new StreamController(onCancel: close, sync: true); |
| + _rawPort.handler = _controller.add; |
| } |
| - void addError(Object errorEvent) { |
| - throw new UnimplementedError("addError on isolate streams"); |
| + SendPort get sendPort { |
| + return _rawPort.sendPort; |
| } |
| - void close() { |
| - if (_isClosed) return; |
| - add(const _CloseToken()); |
| - _isClosed = true; |
| + StreamSubscription listen(void onData(var message), |
| + { Function onError, |
| + void onDone(), |
| + bool cancelOnError }) { |
| + return _controller.stream.listen(onData, |
| + onError: onError, |
| + onDone: onDone, |
| + cancelOnError: cancelOnError); |
| } |
| - bool operator==(var other) { |
| - return other is IsolateSink && _port == other._port; |
| + close() { |
| + _rawPort.close(); |
| + _controller.close(); |
| } |
| - int get hashCode => _port.hashCode + 499; |
| + final RawReceivePort _rawPort; |
| + StreamController _controller; |
| } |
| -patch IsolateSink streamSpawnFunction( |
| - void topLevelFunction(), |
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) { |
| - SendPort sendPort = spawnFunction(topLevelFunction, |
| - unhandledExceptionCallback); |
| - return new _IsolateSink._fromPort(sendPort); |
| -} |
| - |
| -patch class ReceivePort { |
| - /* patch */ factory ReceivePort() { |
| - return new _ReceivePortImpl(); |
| - } |
| -} |
| - |
| -class _ReceivePortImpl implements ReceivePort { |
| - factory _ReceivePortImpl() native "ReceivePortImpl_factory"; |
| - |
| - receive(void onMessage(var message, SendPort replyTo)) { |
| - _onMessage = onMessage; |
| - } |
| +class _RawReceivePortImpl implements RawReceivePort { |
| + factory _RawReceivePortImpl() native "RawReceivePortImpl_factory"; |
| close() { |
| _portMap.remove(_id); |
| _closeInternal(_id); |
| } |
| - SendPort toSendPort() { |
| + SendPort get sendPort { |
| return new _SendPortImpl(_id); |
| } |
| /**** Internal implementation details ****/ |
| - // Called from the VM to create a new ReceivePort instance. |
| - static _ReceivePortImpl _get_or_create(int id) { |
| - if (_portMap != null) { |
| - _ReceivePortImpl port = _portMap[id]; |
| - if (port != null) { |
| - return port; |
| - } |
| + // Called from the VM to create a new RawReceivePort instance. |
| + static _RawReceivePortImpl _get_or_create(int id) { |
| + _RawReceivePortImpl port = _portMap[id]; |
| + if (port != null) { |
| + return port; |
| } |
| - return new _ReceivePortImpl._internal(id); |
| + return new _RawReceivePortImpl._internal(id); |
| } |
| - _ReceivePortImpl._internal(int id) : _id = id { |
| - if (_portMap == null) { |
| - _portMap = new Map(); |
| - } |
| + _RawReceivePortImpl._internal(int id) : _id = id { |
| _portMap[id] = this; |
| } |
| - // Called from the VM to retrieve the ReceivePort for a message. |
| - static _ReceivePortImpl _lookupReceivePort(int id) { |
| - assert(_portMap != null); |
| + // Called from the VM to retrieve the RawReceivePort for a message. |
| + static _RawReceivePortImpl _lookupReceivePort(int id) { |
| return _portMap[id]; |
| } |
| // Called from the VM to dispatch to the handler. |
| - static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { |
| + static void _handleMessage( |
| + _RawReceivePortImpl port, int replyId, var message) { |
| assert(port != null); |
| - SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); |
| - (port._onMessage)(message, replyTo); |
| + port._handler(message); |
| } |
| // Call into the VM to close the VM maintained mappings. |
| - static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; |
| + static _closeInternal(int id) native "RawReceivePortImpl_closeInternal"; |
| + |
| + void set handler(Function newHandler) { |
| + this._handler = newHandler; |
| + } |
| final int _id; |
| - var _onMessage; |
| + Function _handler; |
| - // id to ReceivePort mapping. |
| - static Map _portMap; |
| + // id two RawReceivePort mapping. |
|
Ivan Posva
2013/10/25 18:41:39
?
floitsch
2013/10/25 18:55:54
don't ask...
I guess I removed the "to" by acciden
|
| + static final Map _portMap = new HashMap(); |
| } |
| @@ -135,27 +119,19 @@ class _SendPortImpl implements SendPort { |
| _sendInternal(_id, replyId, message); |
| } |
| - Future call(var message) { |
| - final completer = new Completer.sync(); |
| - final port = new _ReceivePortImpl(); |
| - send(message, port.toSendPort()); |
| - port.receive((value, ignoreReplyTo) { |
| - port.close(); |
| - if (value is Exception) { |
| - completer.completeError(value); |
| - } else { |
| - completer.complete(value); |
| - } |
| - }); |
| - return completer.future; |
| - } |
| - |
| bool operator==(var other) { |
| return (other is _SendPortImpl) && _id == other._id; |
| } |
| int get hashCode { |
| - return _id; |
| + const int MASK = 0x3FFFFFFF; |
| + int hash = _id; |
| + hash = (hash + ((hash & (MASK >> 10)) << 10)) & MASK; |
| + hash ^= (hash >> 6); |
| + hash = (hash + ((hash & (MASK >> 3)) << 3)) & MASK; |
| + hash ^= (hash >> 11); |
| + hash = (hash + ((hash & (MASK >> 15)) << 15)) & MASK; |
| + return hash; |
| } |
| /*--- private implementation ---*/ |
| @@ -177,19 +153,95 @@ class _SendPortImpl implements SendPort { |
| _getPortInternal() native "isolate_getPortInternal"; |
| -ReceivePort _portInternal; |
| - |
| -patch class _Isolate { |
| - /* patch */ static ReceivePort get port { |
| - if (_portInternal == null) { |
| - _portInternal = _getPortInternal(); |
| +typedef _MainFunction(); |
| +typedef _MainFunctionArgs(args); |
| +typedef _MainFunctionArgsMessage(args, message); |
| + |
| +/** |
| + * Takes the real entry point as argument and invokes it with the initial |
| + * message. |
| + * |
| + * The initial message is (currently) received through the global port variable. |
| + */ |
| +void _startIsolate(Function entryPoint, bool isSpawnUri) { |
| + Isolate._port.first.then((message) { |
| + SendPort replyTo = message[0]; |
| + // TODO(floitsch): don't send ok-message if we can't find the entry point. |
| + replyTo.send("started"); |
| + if (isSpawnUri) { |
| + assert(message.length == 3); |
| + List<String> args = message[1]; |
| + var isolateMessage = message[2]; |
| + if (entryPoint is _MainFunctionArgsMessage) { |
| + entryPoint(args, isolateMessage); |
| + } else if (entryPoint is _MainFunctionArgs) { |
| + entryPoint(args); |
| + } else { |
| + entryPoint(); |
| + } |
| + } else { |
| + assert(message.length == 2); |
| + var entryMessage = message[1]; |
| + entryPoint(entryMessage); |
| } |
| - return _portInternal; |
| + }); |
| +} |
| + |
| +patch class Isolate { |
| + /* patch */ static Future<Isolate> spawn( |
| + void entryPoint(message), var message) { |
| + Completer completer = new Completer<Isolate>.sync(); |
| + try { |
| + // The VM will invoke [_startIsolate] with entryPoint as argument. |
| + SendPort controlPort = _spawnFunction(entryPoint); |
| + RawReceivePort readyPort = new RawReceivePort(); |
| + controlPort.send([readyPort.sendPort, message]); |
| + readyPort.handler = (readyMessage) { |
| + assert(readyMessage == 'started'); |
| + readyPort.close(); |
| + completer.complete(new Isolate._fromControlPort(controlPort)); |
| + }; |
| + } catch(e, st) { |
| + // TODO(floitsch): we want errors to go into the returned future. |
| + rethrow; |
| + }; |
| + return completer.future; |
| + } |
| + |
| + /* patch */ static Future<Isolate> spawnUri( |
| + Uri uri, List<String> args, var message) { |
| + Completer completer = new Completer<Isolate>.sync(); |
| + try { |
| + if (args is List<String>) { |
|
Ivan Posva
2013/10/25 18:41:39
I strongly disagree on this point. It should be up
floitsch
2013/10/25 18:55:54
The reason is that dart2js would otherwise need to
|
| + for (int i = 0; i < args.length; i++) { |
| + if (args[i] is! String) { |
| + throw new ArgumentError("Args must be a list of Strings $args"); |
| + } |
| + } |
| + } else if (args != null) { |
| + throw new ArgumentError("Args must be a list of Strings $args"); |
| + } |
| + // The VM will invoke [_startIsolate] and not `main`. |
| + SendPort controlPort = _spawnUri(uri.path); |
| + RawReceivePort readyPort = new RawReceivePort(); |
| + controlPort.send([readyPort.sendPort, args, message]); |
| + readyPort.handler = (readyMessage) { |
| + assert(readyMessage == 'started'); |
| + readyPort.close(); |
| + completer.complete(new Isolate._fromControlPort(controlPort)); |
| + }; |
| + } catch(e, st) { |
| + // TODO(floitsch): we want errors to go into the returned future. |
| + rethrow; |
| + }; |
| + return completer.future; |
| } |
| - /* patch */ static SendPort spawnFunction(void topLevelFunction(), |
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) |
| + static final ReceivePort _port = |
| + new ReceivePort.fromRawReceivePort(_getPortInternal()); |
| + |
| + static SendPort _spawnFunction(Function topLevelFunction) |
| native "isolate_spawnFunction"; |
| - /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; |
| + static SendPort _spawnUri(String uri) native "isolate_spawnUri"; |
| } |