Chromium Code Reviews| Index: runtime/lib/isolate_patch.dart |
| diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart |
| index a2713d10c604716cf53d1aebbe1a7f90248a4519..e7814533d71396eb14f5867cb13d11c64c045d13 100644 |
| --- a/runtime/lib/isolate_patch.dart |
| +++ b/runtime/lib/isolate_patch.dart |
| @@ -2,124 +2,110 @@ |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| -class _CloseToken { |
| - /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
| - /// close themselves. |
| - const _CloseToken(); |
| -} |
| +patch class ReceivePort { |
| + /* patch */ factory ReceivePort() = _ReceivePortImpl; |
| -patch bool _isCloseToken(var object) { |
| - // TODO(floitsch): can we compare against const _CloseToken()? |
| - return object is _CloseToken; |
| + /* patch */ factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) = |
| + _ReceivePortImpl.fromRawReceivePort; |
| } |
| -patch class MessageBox { |
| - /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort()); |
| - MessageBox._oneShot(ReceivePort receivePort) |
| - : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort), |
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
| - |
| - /* patch */ MessageBox() : this._(new ReceivePort()); |
| - MessageBox._(ReceivePort receivePort) |
| - : stream = new IsolateStream._fromOriginalReceivePort(receivePort), |
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort()); |
| +patch class RawReceivePort { |
| + /** |
| + * Opens a long-lived port for receiving messages. |
| + * |
| + * A [RawReceivePort] is low level and does not work with [Zone]s. It |
| + * can not be paused. The data-handler must be set before the first |
| + * event is received. |
| + */ |
| + /* patch */ factory RawReceivePort([void handler(event)]) { |
| + _RawReceivePortImpl result = new _RawReceivePortImpl(); |
| + result.handler = handler; |
| + return result; |
| + } |
| } |
| -class _IsolateSink implements IsolateSink { |
| - bool _isClosed = false; |
| - final SendPort _port; |
| - _IsolateSink._fromPort(this._port); |
| +class _ReceivePortImpl extends Stream implements ReceivePort { |
| + _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort()); |
| - void add(dynamic message) { |
| - _port.send(message); |
| + _ReceivePortImpl.fromRawReceivePort(this._rawPort) { |
| + _controller = new StreamController(onCancel: close, sync: true); |
| + _rawPort.handler = _controller.add; |
| } |
| - void addError(Object errorEvent) { |
| - throw new UnimplementedError("addError on isolate streams"); |
| + SendPort get sendPort { |
| + return _rawPort.sendPort; |
| } |
| - void close() { |
| - if (_isClosed) return; |
| - add(const _CloseToken()); |
| - _isClosed = true; |
| + StreamSubscription listen(void onData(var message), |
| + { Function onError, |
| + void onDone(), |
| + bool cancelOnError }) { |
| + return _controller.stream.listen(onData); |
| } |
| - bool operator==(var other) { |
| - return other is IsolateSink && _port == other._port; |
| + close() { |
| + _rawPort.close(); |
| + _controller.close(); |
| } |
| - int get hashCode => _port.hashCode + 499; |
| + final RawReceivePort _rawPort; |
| + StreamController _controller; |
| } |
| -patch IsolateSink streamSpawnFunction( |
| - void topLevelFunction(), |
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) { |
| - SendPort sendPort = spawnFunction(topLevelFunction, |
| - unhandledExceptionCallback); |
| - return new _IsolateSink._fromPort(sendPort); |
| -} |
| - |
| -patch class ReceivePort { |
| - /* patch */ factory ReceivePort() { |
| - return new _ReceivePortImpl(); |
| - } |
| -} |
| - |
| -class _ReceivePortImpl implements ReceivePort { |
| - factory _ReceivePortImpl() native "ReceivePortImpl_factory"; |
| - |
| - receive(void onMessage(var message, SendPort replyTo)) { |
| - _onMessage = onMessage; |
| - } |
| +class _RawReceivePortImpl implements RawReceivePort { |
| + factory _RawReceivePortImpl() native "ReceivePortImpl_factory"; |
|
Ivan Posva
2013/10/24 06:48:38
"RawReceivePortImpl_factory"
floitsch
2013/10/24 16:15:58
Done.
|
| close() { |
| _portMap.remove(_id); |
| _closeInternal(_id); |
| } |
| - SendPort toSendPort() { |
| + SendPort get sendPort { |
| return new _SendPortImpl(_id); |
| } |
| /**** Internal implementation details ****/ |
| - // Called from the VM to create a new ReceivePort instance. |
| - static _ReceivePortImpl _get_or_create(int id) { |
| + // Called from the VM to create a new RawReceivePort instance. |
| + static _RawReceivePortImpl _get_or_create(int id) { |
| if (_portMap != null) { |
| - _ReceivePortImpl port = _portMap[id]; |
| + _RawReceivePortImpl port = _portMap[id]; |
| if (port != null) { |
| return port; |
| } |
| } |
| - return new _ReceivePortImpl._internal(id); |
| + return new _RawReceivePortImpl._internal(id); |
| } |
| - _ReceivePortImpl._internal(int id) : _id = id { |
| + _RawReceivePortImpl._internal(int id) : _id = id { |
| if (_portMap == null) { |
| _portMap = new Map(); |
| } |
| _portMap[id] = this; |
| } |
| - // Called from the VM to retrieve the ReceivePort for a message. |
| - static _ReceivePortImpl _lookupReceivePort(int id) { |
| + // Called from the VM to retrieve the RawReceivePort for a message. |
| + static _RawReceivePortImpl _lookupReceivePort(int id) { |
| assert(_portMap != null); |
| return _portMap[id]; |
| } |
| // Called from the VM to dispatch to the handler. |
| - static void _handleMessage(_ReceivePortImpl port, int replyId, var message) { |
| + static void _handleMessage(_RawReceivePortImpl port, int replyId, var message) { |
| assert(port != null); |
| - SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId); |
| - (port._onMessage)(message, replyTo); |
| + port._handler(message); |
| } |
| // Call into the VM to close the VM maintained mappings. |
| static _closeInternal(int id) native "ReceivePortImpl_closeInternal"; |
| + void set handler(Function newHandler) { |
| + this._handler = newHandler; |
| + } |
| + |
| final int _id; |
| - var _onMessage; |
| + Function _handler; |
| - // id to ReceivePort mapping. |
| + // id to RawReceivePort mapping. |
| static Map _portMap; |
| } |
| @@ -135,21 +121,6 @@ class _SendPortImpl implements SendPort { |
| _sendInternal(_id, replyId, message); |
| } |
| - Future call(var message) { |
| - final completer = new Completer.sync(); |
| - final port = new _ReceivePortImpl(); |
| - send(message, port.toSendPort()); |
| - port.receive((value, ignoreReplyTo) { |
| - port.close(); |
| - if (value is Exception) { |
| - completer.completeError(value); |
| - } else { |
| - completer.complete(value); |
| - } |
| - }); |
| - return completer.future; |
| - } |
| - |
| bool operator==(var other) { |
| return (other is _SendPortImpl) && _id == other._id; |
| } |
| @@ -179,17 +150,85 @@ _getPortInternal() native "isolate_getPortInternal"; |
| ReceivePort _portInternal; |
| +typedef _MainFunction(); |
| +typedef _MainFunctionArgs(args); |
| +typedef _MainFunctionMessage({message}); |
| +typedef _MainFunctionArgsMessage(args, {message}); |
| + |
| +/** |
| + * Takes the real entry point as argument and invokes it with the initial |
| + * message. |
| + * |
| + * The initial message is (currently) received through the global port variable. |
| + */ |
| +void _startIsolate(Function entryPoint, bool isFunctionSpawn) { |
| + _Isolate.port.first.then((message) { |
| + var initialMessage = message[0]; |
| + var reply = message[1]; |
| + reply.send("started"); |
| + if (isFunctionSpawn) { |
| + entryPoint(initialMessage); |
| + } else if (entryPoint is _MainFunctionArgsMessage) { |
| + entryPoint(const [], message: initialMessage); |
| + } else if (entryPoint is _MainFunctionMessage) { |
| + entryPoint(message: initialMessage); |
| + } else if (entryPoint is _MainFunctionArgs) { |
| + entryPoint(const []); |
| + } else { |
| + entryPoint(); |
| + } |
| + }); |
| +} |
| + |
| +patch class Isolate { |
| + /* patch */ static Future<Isolate> spawn( |
| + void entryPoint(message), var message) { |
| + Completer completer = new Completer<Isolate>.sync(); |
| + try { |
| + // The VM will invoke [_startIsolate] with entryPoint as argument. |
| + SendPort controlPort = _Isolate._spawnFunction(entryPoint); |
| + RawReceivePort readyPort = new RawReceivePort(); |
| + controlPort.send([message, readyPort.sendPort]); |
| + readyPort.handler = (_) { |
| + readyPort.close(); |
| + completer.complete(new Isolate._fromControlPort(controlPort)); |
| + }; |
| + } catch(e, st) { |
| + // TODO(floitsch): we want errors to go into the returned future. |
| + rethrow; |
| + }; |
| + return completer.future; |
| + } |
| + |
| + /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) { |
| + Completer completer = new Completer<Isolate>.sync(); |
| + try { |
| + // The VM will invoke [_startIsolate]. |
| + SendPort controlPort = _Isolate._spawnUri(uri.path); |
| + RawReceivePort readyPort = new RawReceivePort(); |
| + controlPort.send([message, readyPort.sendPort]); |
| + readyPort.handler = (_) { |
| + readyPort.close(); |
| + completer.complete(new Isolate._fromControlPort(controlPort)); |
| + }; |
| + } catch(e, st) { |
| + // TODO(floitsch): we want errors to go into the returned future. |
| + rethrow; |
| + }; |
| + return completer.future; |
| + } |
| +} |
| + |
| patch class _Isolate { |
| /* patch */ static ReceivePort get port { |
| if (_portInternal == null) { |
| - _portInternal = _getPortInternal(); |
| + _portInternal = new ReceivePort.fromRawReceivePort(_getPortInternal()); |
| } |
| return _portInternal; |
| } |
| - /* patch */ static SendPort spawnFunction(void topLevelFunction(), |
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) |
| + static SendPort _spawnFunction(Function topLevelFunction) |
| native "isolate_spawnFunction"; |
| - /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri"; |
| + /* patch */ static SendPort _spawnUri(String uri) native "isolate_spawnUri"; |
| } |