 Chromium Code Reviews
 Chromium Code Reviews Issue 27215002:
  Very simple version of Isolates.  (Closed) 
  Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
    
  
    Issue 27215002:
  Very simple version of Isolates.  (Closed) 
  Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart| Index: runtime/lib/isolate_patch.dart | 
| diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart | 
| index a2713d10c604716cf53d1aebbe1a7f90248a4519..88312681963135c57b400bb62b60e6c3e6bc1fec 100644 | 
| --- a/runtime/lib/isolate_patch.dart | 
| +++ b/runtime/lib/isolate_patch.dart | 
| @@ -2,125 +2,109 @@ | 
| // for details. All rights reserved. Use of this source code is governed by a | 
| // BSD-style license that can be found in the LICENSE file. | 
| -class _CloseToken { | 
| - /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to | 
| - /// close themselves. | 
| - const _CloseToken(); | 
| -} | 
| +patch class ReceivePort { | 
| + /* patch */ factory ReceivePort() = _ReceivePortImpl; | 
| -patch bool _isCloseToken(var object) { | 
| - // TODO(floitsch): can we compare against const _CloseToken()? | 
| - return object is _CloseToken; | 
| + /* patch */ factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) = | 
| + _ReceivePortImpl.fromRawReceivePort; | 
| } | 
| -patch class MessageBox { | 
| - /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort()); | 
| - MessageBox._oneShot(ReceivePort receivePort) | 
| - : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), | 
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort()); | 
| - | 
| - /* patch */ MessageBox() : this._(new ReceivePort()); | 
| - MessageBox._(ReceivePort receivePort) | 
| - : stream = new IsolateStream._fromOriginalReceivePort(receivePort), | 
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort()); | 
| +patch class RawReceivePort { | 
| + /** | 
| + * Opens a long-lived port for receiving messages. | 
| + * | 
| + * A [RawReceivePort] is low level and does not work with [Zone]s. It | 
| + * can not be paused. The data-handler must be set before the first | 
| + * event is received. | 
| + */ | 
| + /* patch */ factory RawReceivePort([void handler(event)]) { | 
| + _RawReceivePortImpl result = new _RawReceivePortImpl(); | 
| + result.handler = handler; | 
| + return result; | 
| + } | 
| } | 
| -class _IsolateSink implements IsolateSink { | 
| - bool _isClosed = false; | 
| - final SendPort _port; | 
| - _IsolateSink._fromPort(this._port); | 
| +class _ReceivePortImpl extends Stream implements ReceivePort { | 
| + _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort()); | 
| - void add(dynamic message) { | 
| - _port.send(message); | 
| + _ReceivePortImpl.fromRawReceivePort(this._rawPort) { | 
| + _controller = new StreamController(onCancel: close, sync: true); | 
| + _rawPort.handler = _controller.add; | 
| } | 
| - void addError(Object errorEvent) { | 
| - throw new UnimplementedError("addError on isolate streams"); | 
| + SendPort get sendPort { | 
| + return _rawPort.sendPort; | 
| } | 
| - void close() { | 
| - if (_isClosed) return; | 
| - add(const _CloseToken()); | 
| - _isClosed = true; | 
| + StreamSubscription listen(void onData(var message), | 
| + { Function onError, | 
| + void onDone(), | 
| + bool cancelOnError }) { | 
| + return _controller.stream.listen(onData, | 
| + onError: onError, | 
| + onDone: onDone, | 
| + cancelOnError: cancelOnError); | 
| } | 
| - bool operator==(var other) { | 
| - return other is IsolateSink && _port == other._port; | 
| + close() { | 
| + _rawPort.close(); | 
| + _controller.close(); | 
| } | 
| - int get hashCode => _port.hashCode + 499; | 
| + final RawReceivePort _rawPort; | 
| + StreamController _controller; | 
| } | 
| -patch IsolateSink streamSpawnFunction( | 
| - void topLevelFunction(), | 
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) { | 
| - SendPort sendPort = spawnFunction(topLevelFunction, | 
| - unhandledExceptionCallback); | 
| - return new _IsolateSink._fromPort(sendPort); | 
| -} | 
| - | 
| -patch class ReceivePort { | 
| - /* patch */ factory ReceivePort() { | 
| - return new _ReceivePortImpl(); | 
| - } | 
| -} | 
| - | 
| -class _ReceivePortImpl implements ReceivePort { | 
| - factory _ReceivePortImpl() native "ReceivePortImpl_factory"; | 
| - | 
| - receive(void onMessage(var message, SendPort replyTo)) { | 
| - _onMessage = onMessage; | 
| - } | 
| +class _RawReceivePortImpl implements RawReceivePort { | 
| + factory _RawReceivePortImpl() native "RawReceivePortImpl_factory"; | 
| close() { | 
| _portMap.remove(_id); | 
| _closeInternal(_id); | 
| } | 
| - SendPort toSendPort() { | 
| + SendPort get sendPort { | 
| return new _SendPortImpl(_id); | 
| } | 
| /**** Internal implementation details ****/ | 
| - // Called from the VM to create a new ReceivePort instance. | 
| - static _ReceivePortImpl _get_or_create(int id) { | 
| - if (_portMap != null) { | 
| - _ReceivePortImpl port = _portMap[id]; | 
| - if (port != null) { | 
| - return port; | 
| - } | 
| + // Called from the VM to create a new RawReceivePort instance. | 
| + static _RawReceivePortImpl _get_or_create(int id) { | 
| + _RawReceivePortImpl port = _portMap[id]; | 
| + if (port != null) { | 
| + return port; | 
| } | 
| - return new _ReceivePortImpl._internal(id); | 
| + return new _RawReceivePortImpl._internal(id); | 
| } | 
| - _ReceivePortImpl._internal(int id) : _id = id { | 
| - if (_portMap == null) { | 
| - _portMap = new Map(); | 
| - } | 
| + _RawReceivePortImpl._internal(int id) : _id = id { | 
| _portMap[id] = this; | 
| } | 
| - // Called from the VM to retrieve the ReceivePort for a message. | 
| - static _ReceivePortImpl _lookupReceivePort(int id) { | 
| - assert(_portMap != null); | 
| + // Called from the VM to retrieve the RawReceivePort for a message. | 
| + static _RawReceivePortImpl _lookupReceivePort(int id) { | 
| return _portMap[id]; | 
| } | 
| // Called from the VM to dispatch to the handler. | 
| - static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { | 
| + static void _handleMessage( | 
| + _RawReceivePortImpl port, int replyId, var message) { | 
| assert(port != null); | 
| - SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); | 
| - (port._onMessage)(message, replyTo); | 
| + port._handler(message); | 
| } | 
| // Call into the VM to close the VM maintained mappings. | 
| - static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; | 
| + static _closeInternal(int id) native "RawReceivePortImpl_closeInternal"; | 
| + | 
| + void set handler(Function newHandler) { | 
| + this._handler = newHandler; | 
| + } | 
| final int _id; | 
| - var _onMessage; | 
| + Function _handler; | 
| - // id to ReceivePort mapping. | 
| - static Map _portMap; | 
| + // id two RawReceivePort mapping. | 
| 
Ivan Posva
2013/10/25 18:41:39
?
 
floitsch
2013/10/25 18:55:54
don't ask...
I guess I removed the "to" by acciden
 | 
| + static final Map _portMap = new HashMap(); | 
| } | 
| @@ -135,27 +119,19 @@ class _SendPortImpl implements SendPort { | 
| _sendInternal(_id, replyId, message); | 
| } | 
| - Future call(var message) { | 
| - final completer = new Completer.sync(); | 
| - final port = new _ReceivePortImpl(); | 
| - send(message, port.toSendPort()); | 
| - port.receive((value, ignoreReplyTo) { | 
| - port.close(); | 
| - if (value is Exception) { | 
| - completer.completeError(value); | 
| - } else { | 
| - completer.complete(value); | 
| - } | 
| - }); | 
| - return completer.future; | 
| - } | 
| - | 
| bool operator==(var other) { | 
| return (other is _SendPortImpl) && _id == other._id; | 
| } | 
| int get hashCode { | 
| - return _id; | 
| + const int MASK = 0x3FFFFFFF; | 
| + int hash = _id; | 
| + hash = (hash + ((hash & (MASK >> 10)) << 10)) & MASK; | 
| + hash ^= (hash >> 6); | 
| + hash = (hash + ((hash & (MASK >> 3)) << 3)) & MASK; | 
| + hash ^= (hash >> 11); | 
| + hash = (hash + ((hash & (MASK >> 15)) << 15)) & MASK; | 
| + return hash; | 
| } | 
| /*--- private implementation ---*/ | 
| @@ -177,19 +153,95 @@ class _SendPortImpl implements SendPort { | 
| _getPortInternal() native "isolate_getPortInternal"; | 
| -ReceivePort _portInternal; | 
| - | 
| -patch class _Isolate { | 
| - /* patch */ static ReceivePort get port { | 
| - if (_portInternal == null) { | 
| - _portInternal = _getPortInternal(); | 
| +typedef _MainFunction(); | 
| +typedef _MainFunctionArgs(args); | 
| +typedef _MainFunctionArgsMessage(args, message); | 
| + | 
| +/** | 
| + * Takes the real entry point as argument and invokes it with the initial | 
| + * message. | 
| + * | 
| + * The initial message is (currently) received through the global port variable. | 
| + */ | 
| +void _startIsolate(Function entryPoint, bool isSpawnUri) { | 
| + Isolate._port.first.then((message) { | 
| + SendPort replyTo = message[0]; | 
| + // TODO(floitsch): don't send ok-message if we can't find the entry point. | 
| + replyTo.send("started"); | 
| + if (isSpawnUri) { | 
| + assert(message.length == 3); | 
| + List<String> args = message[1]; | 
| + var isolateMessage = message[2]; | 
| + if (entryPoint is _MainFunctionArgsMessage) { | 
| + entryPoint(args, isolateMessage); | 
| + } else if (entryPoint is _MainFunctionArgs) { | 
| + entryPoint(args); | 
| + } else { | 
| + entryPoint(); | 
| + } | 
| + } else { | 
| + assert(message.length == 2); | 
| + var entryMessage = message[1]; | 
| + entryPoint(entryMessage); | 
| } | 
| - return _portInternal; | 
| + }); | 
| +} | 
| + | 
| +patch class Isolate { | 
| + /* patch */ static Future<Isolate> spawn( | 
| + void entryPoint(message), var message) { | 
| + Completer completer = new Completer<Isolate>.sync(); | 
| + try { | 
| + // The VM will invoke [_startIsolate] with entryPoint as argument. | 
| + SendPort controlPort = _spawnFunction(entryPoint); | 
| + RawReceivePort readyPort = new RawReceivePort(); | 
| + controlPort.send([readyPort.sendPort, message]); | 
| + readyPort.handler = (readyMessage) { | 
| + assert(readyMessage == 'started'); | 
| + readyPort.close(); | 
| + completer.complete(new Isolate._fromControlPort(controlPort)); | 
| + }; | 
| + } catch(e, st) { | 
| + // TODO(floitsch): we want errors to go into the returned future. | 
| + rethrow; | 
| + }; | 
| + return completer.future; | 
| + } | 
| + | 
| + /* patch */ static Future<Isolate> spawnUri( | 
| + Uri uri, List<String> args, var message) { | 
| + Completer completer = new Completer<Isolate>.sync(); | 
| + try { | 
| + if (args is List<String>) { | 
| 
Ivan Posva
2013/10/25 18:41:39
I strongly disagree on this point. It should be up
 
floitsch
2013/10/25 18:55:54
The reason is that dart2js would otherwise need to
 | 
| + for (int i = 0; i < args.length; i++) { | 
| + if (args[i] is! String) { | 
| + throw new ArgumentError("Args must be a list of Strings $args"); | 
| + } | 
| + } | 
| + } else if (args != null) { | 
| + throw new ArgumentError("Args must be a list of Strings $args"); | 
| + } | 
| + // The VM will invoke [_startIsolate] and not `main`. | 
| + SendPort controlPort = _spawnUri(uri.path); | 
| + RawReceivePort readyPort = new RawReceivePort(); | 
| + controlPort.send([readyPort.sendPort, args, message]); | 
| + readyPort.handler = (readyMessage) { | 
| + assert(readyMessage == 'started'); | 
| + readyPort.close(); | 
| + completer.complete(new Isolate._fromControlPort(controlPort)); | 
| + }; | 
| + } catch(e, st) { | 
| + // TODO(floitsch): we want errors to go into the returned future. | 
| + rethrow; | 
| + }; | 
| + return completer.future; | 
| } | 
| - /* patch */ static SendPort spawnFunction(void topLevelFunction(), | 
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) | 
| + static final ReceivePort _port = | 
| + new ReceivePort.fromRawReceivePort(_getPortInternal()); | 
| + | 
| + static SendPort _spawnFunction(Function topLevelFunction) | 
| native "isolate_spawnFunction"; | 
| - /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; | 
| + static SendPort _spawnUri(String uri) native "isolate_spawnUri"; | 
| } |