| Index: runtime/lib/isolate_patch.dart
|
| diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart
|
| index a2713d10c604716cf53d1aebbe1a7f90248a4519..a5cb017a7b3d4d9b8b249ae926a387776aa68cc9 100644
|
| --- a/runtime/lib/isolate_patch.dart
|
| +++ b/runtime/lib/isolate_patch.dart
|
| @@ -2,85 +2,32 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -class _CloseToken {
|
| - /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
|
| - /// close themselves.
|
| - const _CloseToken();
|
| -}
|
| -
|
| -patch bool _isCloseToken(var object) {
|
| - // TODO(floitsch): can we compare against const _CloseToken()?
|
| - return object is _CloseToken;
|
| -}
|
| -
|
| -patch class MessageBox {
|
| - /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort());
|
| - MessageBox._oneShot(ReceivePort receivePort)
|
| - : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
|
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort());
|
| -
|
| - /* patch */ MessageBox() : this._(new ReceivePort());
|
| - MessageBox._(ReceivePort receivePort)
|
| - : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
|
| - sink = new _IsolateSink._fromPort(receivePort.toSendPort());
|
| -}
|
| -
|
| -class _IsolateSink implements IsolateSink {
|
| - bool _isClosed = false;
|
| - final SendPort _port;
|
| - _IsolateSink._fromPort(this._port);
|
| -
|
| - void add(dynamic message) {
|
| - _port.send(message);
|
| - }
|
| -
|
| - void addError(Object errorEvent) {
|
| - throw new UnimplementedError("addError on isolate streams");
|
| - }
|
| -
|
| - void close() {
|
| - if (_isClosed) return;
|
| - add(const _CloseToken());
|
| - _isClosed = true;
|
| - }
|
| -
|
| - bool operator==(var other) {
|
| - return other is IsolateSink && _port == other._port;
|
| - }
|
| -
|
| - int get hashCode => _port.hashCode + 499;
|
| -}
|
| -
|
| -patch IsolateSink streamSpawnFunction(
|
| - void topLevelFunction(),
|
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)]) {
|
| - SendPort sendPort = spawnFunction(topLevelFunction,
|
| - unhandledExceptionCallback);
|
| - return new _IsolateSink._fromPort(sendPort);
|
| -}
|
| -
|
| patch class ReceivePort {
|
| /* patch */ factory ReceivePort() {
|
| return new _ReceivePortImpl();
|
| }
|
| }
|
|
|
| -class _ReceivePortImpl implements ReceivePort {
|
| +class _ReceivePortImpl extends Stream implements ReceivePort {
|
| factory _ReceivePortImpl() native "ReceivePortImpl_factory";
|
|
|
| - receive(void onMessage(var message, SendPort replyTo)) {
|
| - _onMessage = onMessage;
|
| - }
|
| -
|
| close() {
|
| _portMap.remove(_id);
|
| _closeInternal(_id);
|
| + _controller.close();
|
| }
|
|
|
| - SendPort toSendPort() {
|
| + SendPort get sendPort {
|
| return new _SendPortImpl(_id);
|
| }
|
|
|
| + StreamSubscription listen(void onData(var message),
|
| + { Function onError,
|
| + void onDone(),
|
| + bool cancelOnError }) {
|
| + return _controller.stream.listen(onData);
|
| + }
|
| +
|
| /**** Internal implementation details ****/
|
| // Called from the VM to create a new ReceivePort instance.
|
| static _ReceivePortImpl _get_or_create(int id) {
|
| @@ -98,6 +45,9 @@ class _ReceivePortImpl implements ReceivePort {
|
| _portMap = new Map();
|
| }
|
| _portMap[id] = this;
|
| +
|
| + // TODO(floitsch): remove the hack to close receive-ports on cancel.
|
| + _controller = new StreamController(onCancel: close, sync: true);
|
| }
|
|
|
| // Called from the VM to retrieve the ReceivePort for a message.
|
| @@ -109,15 +59,14 @@ class _ReceivePortImpl implements ReceivePort {
|
| // Called from the VM to dispatch to the handler.
|
| static void _handleMessage(_ReceivePortImpl port, int replyId, var message) {
|
| assert(port != null);
|
| - SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId);
|
| - (port._onMessage)(message, replyTo);
|
| + port._controller.add(message);
|
| }
|
|
|
| // Call into the VM to close the VM maintained mappings.
|
| static _closeInternal(int id) native "ReceivePortImpl_closeInternal";
|
|
|
| final int _id;
|
| - var _onMessage;
|
| + StreamController _controller;
|
|
|
| // id to ReceivePort mapping.
|
| static Map _portMap;
|
| @@ -135,21 +84,6 @@ class _SendPortImpl implements SendPort {
|
| _sendInternal(_id, replyId, message);
|
| }
|
|
|
| - Future call(var message) {
|
| - final completer = new Completer.sync();
|
| - final port = new _ReceivePortImpl();
|
| - send(message, port.toSendPort());
|
| - port.receive((value, ignoreReplyTo) {
|
| - port.close();
|
| - if (value is Exception) {
|
| - completer.completeError(value);
|
| - } else {
|
| - completer.complete(value);
|
| - }
|
| - });
|
| - return completer.future;
|
| - }
|
| -
|
| bool operator==(var other) {
|
| return (other is _SendPortImpl) && _id == other._id;
|
| }
|
| @@ -179,6 +113,42 @@ _getPortInternal() native "isolate_getPortInternal";
|
|
|
| ReceivePort _portInternal;
|
|
|
| +/**
|
| + * Takes the real entry point as argument and invokes it with the initial
|
| + * message.
|
| + *
|
| + * The initial message is (currently) received through the global port variable.
|
| + */
|
| +void _startIsolate(void entryPoint(message)) {
|
| + _Isolate.port.first.then((message) {
|
| + var initialMessage = message[0];
|
| + var reply = message[1];
|
| + reply.send("started");
|
| + entryPoint(initialMessage);
|
| + });
|
| +}
|
| +
|
| +patch class Isolate {
|
| + /* patch */ static Future<Isolate> spawn(
|
| + void entryPoint(message), var message) {
|
| + return new Future<Isolate>.sync(() {
|
| + // The VM will invoke [_startIsolate] with entryPoint as argument.
|
| + SendPort controlPort = _Isolate._spawnFunction(entryPoint);
|
| + ReceivePort readyPort = new ReceivePort();
|
| + controlPort.send([message, readyPort.sendPort]);
|
| + Completer completer = new Completer<Isolate>();
|
| + readyPort.first.then((_) {
|
| + completer.complete(new Isolate._fromControlPort(controlPort));
|
| + });
|
| + return completer.future;
|
| + });
|
| + }
|
| +
|
| + /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) {
|
| + throw new UnimplementedError("Isolate.spawnUri");
|
| + }
|
| +}
|
| +
|
| patch class _Isolate {
|
| /* patch */ static ReceivePort get port {
|
| if (_portInternal == null) {
|
| @@ -187,8 +157,7 @@ patch class _Isolate {
|
| return _portInternal;
|
| }
|
|
|
| - /* patch */ static SendPort spawnFunction(void topLevelFunction(),
|
| - [bool unhandledExceptionCallback(IsolateUnhandledException e)])
|
| + static SendPort _spawnFunction(Function topLevelFunction)
|
| native "isolate_spawnFunction";
|
|
|
| /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri";
|
|
|