| Index: runtime/lib/isolate_patch.dart
|
| diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart
|
| index a2713d10c604716cf53d1aebbe1a7f90248a4519..867a031e020ffd31fbad37b9617d02bbb93ba05f 100644
|
| --- a/runtime/lib/isolate_patch.dart
|
| +++ b/runtime/lib/isolate_patch.dart
|
| @@ -2,125 +2,109 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -class _CloseToken {
|
| - /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
|
| - /// close themselves.
|
| - const _CloseToken();
|
| -}
|
| +patch class ReceivePort {
|
| + /* patch */ factory ReceivePort() = _ReceivePortImpl;
|
|
|
| -patch bool _isCloseToken(var object) {
|
| - // TODO(floitsch): can we compare against const _CloseToken()?
|
| - return object is _CloseToken;
|
| + /* patch */ factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) =
|
| + _ReceivePortImpl.fromRawReceivePort;
|
| }
|
|
|
| -patch class MessageBox {
|
| - /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort());
|
| - MessageBox._oneShot(ReceivePort receivePort)
|
| - : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
|
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort());
|
| -
|
| - /* patch */ MessageBox() : this._(new ReceivePort());
|
| - MessageBox._(ReceivePort receivePort)
|
| - : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
|
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort());
|
| +patch class RawReceivePort {
|
| + /**
|
| + * Opens a long-lived port for receiving messages.
|
| + *
|
| + * A [RawReceivePort] is low level and does not work with [Zone]s. It
|
| + * can not be paused. The data-handler must be set before the first
|
| + * event is received.
|
| + */
|
| + /* patch */ factory RawReceivePort([void handler(event)]) {
|
| + _RawReceivePortImpl result = new _RawReceivePortImpl();
|
| + result.handler = handler;
|
| + return result;
|
| + }
|
| }
|
|
|
| -class _IsolateSink implements IsolateSink {
|
| - bool _isClosed = false;
|
| - final SendPort _port;
|
| - _IsolateSink._fromPort(this._port);
|
| +class _ReceivePortImpl extends Stream implements ReceivePort {
|
| + _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort());
|
|
|
| - void add(dynamic message) {
|
| - _port.send(message);
|
| + _ReceivePortImpl.fromRawReceivePort(this._rawPort) {
|
| + _controller = new StreamController(onCancel: close, sync: true);
|
| + _rawPort.handler = _controller.add;
|
| }
|
|
|
| - void addError(Object errorEvent) {
|
| - throw new UnimplementedError("addError on isolate streams");
|
| + SendPort get sendPort {
|
| + return _rawPort.sendPort;
|
| }
|
|
|
| - void close() {
|
| - if (_isClosed) return;
|
| - add(const _CloseToken());
|
| - _isClosed = true;
|
| + StreamSubscription listen(void onData(var message),
|
| + { Function onError,
|
| + void onDone(),
|
| + bool cancelOnError }) {
|
| + return _controller.stream.listen(onData,
|
| + onError: onError,
|
| + onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| }
|
|
|
| - bool operator==(var other) {
|
| - return other is IsolateSink && _port == other._port;
|
| + close() {
|
| + _rawPort.close();
|
| + _controller.close();
|
| }
|
|
|
| - int get hashCode => _port.hashCode + 499;
|
| + final RawReceivePort _rawPort;
|
| + StreamController _controller;
|
| }
|
|
|
| -patch IsolateSink streamSpawnFunction(
|
| - void topLevelFunction(),
|
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) {
|
| - SendPort sendPort = spawnFunction(topLevelFunction,
|
| - unhandledExceptionCallback);
|
| - return new _IsolateSink._fromPort(sendPort);
|
| -}
|
| -
|
| -patch class ReceivePort {
|
| - /* patch */ factory ReceivePort() {
|
| - return new _ReceivePortImpl();
|
| - }
|
| -}
|
| -
|
| -class _ReceivePortImpl implements ReceivePort {
|
| - factory _ReceivePortImpl() native "ReceivePortImpl_factory";
|
| -
|
| - receive(void onMessage(var message, SendPort replyTo)) {
|
| - _onMessage = onMessage;
|
| - }
|
| +class _RawReceivePortImpl implements RawReceivePort {
|
| + factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";
|
|
|
| close() {
|
| _portMap.remove(_id);
|
| _closeInternal(_id);
|
| }
|
|
|
| - SendPort toSendPort() {
|
| + SendPort get sendPort {
|
| return new _SendPortImpl(_id);
|
| }
|
|
|
| /**** Internal implementation details ****/
|
| - // Called from the VM to create a new ReceivePort instance.
|
| - static _ReceivePortImpl _get_or_create(int id) {
|
| - if (_portMap != null) {
|
| - _ReceivePortImpl port = _portMap[id];
|
| - if (port != null) {
|
| - return port;
|
| - }
|
| + // Called from the VM to create a new RawReceivePort instance.
|
| + static _RawReceivePortImpl _get_or_create(int id) {
|
| + _RawReceivePortImpl port = _portMap[id];
|
| + if (port != null) {
|
| + return port;
|
| }
|
| - return new _ReceivePortImpl._internal(id);
|
| + return new _RawReceivePortImpl._internal(id);
|
| }
|
|
|
| - _ReceivePortImpl._internal(int id) : _id = id {
|
| - if (_portMap == null) {
|
| - _portMap = new Map();
|
| - }
|
| + _RawReceivePortImpl._internal(int id) : _id = id {
|
| _portMap[id] = this;
|
| }
|
|
|
| - // Called from the VM to retrieve the ReceivePort for a message.
|
| - static _ReceivePortImpl _lookupReceivePort(int id) {
|
| - assert(_portMap != null);
|
| + // Called from the VM to retrieve the RawReceivePort for a message.
|
| + static _RawReceivePortImpl _lookupReceivePort(int id) {
|
| return _portMap[id];
|
| }
|
|
|
| // Called from the VM to dispatch to the handler.
|
| - static void _handleMessage(_ReceivePortImpl port, int replyId, var message) {
|
| + static void _handleMessage(
|
| + _RawReceivePortImpl port, int replyId, var message) {
|
| assert(port != null);
|
| - SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId);
|
| - (port._onMessage)(message, replyTo);
|
| + port._handler(message);
|
| }
|
|
|
| // Call into the VM to close the VM maintained mappings.
|
| - static _closeInternal(int id) native "ReceivePortImpl_closeInternal";
|
| + static _closeInternal(int id) native "RawReceivePortImpl_closeInternal";
|
| +
|
| + void set handler(Function newHandler) {
|
| + this._handler = newHandler;
|
| + }
|
|
|
| final int _id;
|
| - var _onMessage;
|
| + Function _handler;
|
|
|
| - // id to ReceivePort mapping.
|
| - static Map _portMap;
|
| + // id to RawReceivePort mapping.
|
| + static final Map _portMap = new HashMap();
|
| }
|
|
|
|
|
| @@ -135,27 +119,19 @@ class _SendPortImpl implements SendPort {
|
| _sendInternal(_id, replyId, message);
|
| }
|
|
|
| - Future call(var message) {
|
| - final completer = new Completer.sync();
|
| - final port = new _ReceivePortImpl();
|
| - send(message, port.toSendPort());
|
| - port.receive((value, ignoreReplyTo) {
|
| - port.close();
|
| - if (value is Exception) {
|
| - completer.completeError(value);
|
| - } else {
|
| - completer.complete(value);
|
| - }
|
| - });
|
| - return completer.future;
|
| - }
|
| -
|
| bool operator==(var other) {
|
| return (other is _SendPortImpl) && _id == other._id;
|
| }
|
|
|
| int get hashCode {
|
| - return _id;
|
| + const int MASK = 0x3FFFFFFF;
|
| + int hash = _id;
|
| + hash = (hash + ((hash & (MASK >> 10)) << 10)) & MASK;
|
| + hash ^= (hash >> 6);
|
| + hash = (hash + ((hash & (MASK >> 3)) << 3)) & MASK;
|
| + hash ^= (hash >> 11);
|
| + hash = (hash + ((hash & (MASK >> 15)) << 15)) & MASK;
|
| + return hash;
|
| }
|
|
|
| /*--- private implementation ---*/
|
| @@ -177,19 +153,86 @@ class _SendPortImpl implements SendPort {
|
|
|
| _getPortInternal() native "isolate_getPortInternal";
|
|
|
| -ReceivePort _portInternal;
|
| -
|
| -patch class _Isolate {
|
| - /* patch */ static ReceivePort get port {
|
| - if (_portInternal == null) {
|
| - _portInternal = _getPortInternal();
|
| +typedef _MainFunction();
|
| +typedef _MainFunctionArgs(args);
|
| +typedef _MainFunctionArgsMessage(args, message);
|
| +
|
| +/**
|
| + * Takes the real entry point as argument and invokes it with the initial
|
| + * message.
|
| + *
|
| + * The initial message is (currently) received through the global port variable.
|
| + */
|
| +void _startIsolate(Function entryPoint, bool isSpawnUri) {
|
| + Isolate._port.first.then((message) {
|
| + SendPort replyTo = message[0];
|
| + // TODO(floitsch): don't send ok-message if we can't find the entry point.
|
| + replyTo.send("started");
|
| + if (isSpawnUri) {
|
| + assert(message.length == 3);
|
| + List<String> args = message[1];
|
| + var isolateMessage = message[2];
|
| + if (entryPoint is _MainFunctionArgsMessage) {
|
| + entryPoint(args, isolateMessage);
|
| + } else if (entryPoint is _MainFunctionArgs) {
|
| + entryPoint(args);
|
| + } else {
|
| + entryPoint();
|
| + }
|
| + } else {
|
| + assert(message.length == 2);
|
| + var entryMessage = message[1];
|
| + entryPoint(entryMessage);
|
| }
|
| - return _portInternal;
|
| + });
|
| +}
|
| +
|
| +patch class Isolate {
|
| + /* patch */ static Future<Isolate> spawn(
|
| + void entryPoint(message), var message) {
|
| + Completer completer = new Completer<Isolate>.sync();
|
| + try {
|
| + // The VM will invoke [_startIsolate] with entryPoint as argument.
|
| + SendPort controlPort = _spawnFunction(entryPoint);
|
| + RawReceivePort readyPort = new RawReceivePort();
|
| + controlPort.send([readyPort.sendPort, message]);
|
| + readyPort.handler = (readyMessage) {
|
| + assert(readyMessage == 'started');
|
| + readyPort.close();
|
| + completer.complete(new Isolate._fromControlPort(controlPort));
|
| + };
|
| + } catch(e, st) {
|
| + // TODO(floitsch): we want errors to go into the returned future.
|
| + rethrow;
|
| + };
|
| + return completer.future;
|
| }
|
|
|
| - /* patch */ static SendPort spawnFunction(void topLevelFunction(),
|
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)])
|
| + /* patch */ static Future<Isolate> spawnUri(
|
| + Uri uri, List<String> args, var message) {
|
| + Completer completer = new Completer<Isolate>.sync();
|
| + try {
|
| + // The VM will invoke [_startIsolate] and not `main`.
|
| + SendPort controlPort = _spawnUri(uri.path);
|
| + RawReceivePort readyPort = new RawReceivePort();
|
| + controlPort.send([readyPort.sendPort, args, message]);
|
| + readyPort.handler = (readyMessage) {
|
| + assert(readyMessage == 'started');
|
| + readyPort.close();
|
| + completer.complete(new Isolate._fromControlPort(controlPort));
|
| + };
|
| + } catch(e, st) {
|
| + // TODO(floitsch): we want errors to go into the returned future.
|
| + rethrow;
|
| + };
|
| + return completer.future;
|
| + }
|
| +
|
| + static final ReceivePort _port =
|
| + new ReceivePort.fromRawReceivePort(_getPortInternal());
|
| +
|
| + static SendPort _spawnFunction(Function topLevelFunction)
|
| native "isolate_spawnFunction";
|
|
|
| - /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri";
|
| + static SendPort _spawnUri(String uri) native "isolate_spawnUri";
|
| }
|
|
|