| Index: sdk/lib/_internal/lib/isolate_helper.dart
|
| diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
|
| index 8359a30042ffec4faec87402a1da0477f0c11e03..d4d6c70bbf1edb23f371967622cc1529f2bd4a3e 100644
|
| --- a/sdk/lib/_internal/lib/isolate_helper.dart
|
| +++ b/sdk/lib/_internal/lib/isolate_helper.dart
|
| @@ -471,7 +471,7 @@ class IsolateNatives {
|
| var replyTo = _deserializeMessage(msg['replyTo']);
|
| var context = new _IsolateContext();
|
| _globalState.topEventLoop.enqueue(context, () {
|
| - _startIsolate(entryPoint, replyTo);
|
| + _startIsolate(entryPoint, null, replyTo);
|
| }, 'worker-start');
|
| // Make sure we always have a current context in this worker.
|
| // TODO(7907): This is currently needed because we're using
|
| @@ -483,13 +483,14 @@ class IsolateNatives {
|
| _globalState.topEventLoop.run();
|
| break;
|
| case 'spawn-worker':
|
| - _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']);
|
| + _spawnWorker(msg['functionName'], msg['uri'],
|
| + msg['msg'], msg['replyPort']);
|
| break;
|
| case 'message':
|
| SendPort port = msg['port'];
|
| // If the port has been closed, we ignore the message.
|
| if (port != null) {
|
| - msg['port'].send(msg['msg'], msg['replyTo']);
|
| + msg['port'].send(msg['msg']);
|
| }
|
| _globalState.topEventLoop.run();
|
| break;
|
| @@ -550,19 +551,19 @@ class IsolateNatives {
|
| return JS("", "new #()", ctor);
|
| }
|
|
|
| - static SendPort spawnFunction(void topLevelFunction()) {
|
| + static SendPort spawnFunction(void topLevelFunction(message), message) {
|
| final name = _getJSFunctionName(topLevelFunction);
|
| if (name == null) {
|
| throw new UnsupportedError(
|
| "only top-level functions can be spawned.");
|
| }
|
| - return spawn(name, null, false);
|
| + return spawn(name, null, message, false);
|
| }
|
|
|
| // TODO(sigmund): clean up above, after we make the new API the default:
|
|
|
| /// If [uri] is `null` it is replaced with the current script.
|
| - static spawn(String functionName, String uri, bool isLight) {
|
| + static spawn(String functionName, String uri, message, bool isLight) {
|
| // Assume that the compiled version of the Dart file lives just next to the
|
| // dart file.
|
| // TODO(floitsch): support precompiled version of dart2js output.
|
| @@ -570,60 +571,61 @@ class IsolateNatives {
|
|
|
| Completer<SendPort> completer = new Completer<SendPort>.sync();
|
| ReceivePort port = new ReceivePort();
|
| - port.receive((msg, SendPort replyPort) {
|
| + port.listen((msg) {
|
| port.close();
|
| - assert(msg == _SPAWNED_SIGNAL);
|
| - completer.complete(replyPort);
|
| + assert(msg[0] == _SPAWNED_SIGNAL);
|
| + completer.complete(msg[1]);
|
| });
|
|
|
| - SendPort signalReply = port.toSendPort();
|
| + SendPort signalReply = port.sendPort;
|
|
|
| if (_globalState.useWorkers && !isLight) {
|
| - _startWorker(functionName, uri, signalReply);
|
| + _startWorker(functionName, uri, message, signalReply);
|
| } else {
|
| - _startNonWorker(functionName, uri, signalReply);
|
| + _startNonWorker(functionName, uri, message, signalReply);
|
| }
|
| return new _BufferingSendPort(
|
| _globalState.currentContext.id, completer.future);
|
| }
|
|
|
| static SendPort _startWorker(
|
| - String functionName, String uri, SendPort replyPort) {
|
| + String functionName, String uri, message, SendPort replyPort) {
|
| if (_globalState.isWorker) {
|
| _globalState.mainManager.postMessage(_serializeMessage({
|
| 'command': 'spawn-worker',
|
| 'functionName': functionName,
|
| + 'msg': message,
|
| 'uri': uri,
|
| 'replyPort': replyPort}));
|
| } else {
|
| - _spawnWorker(functionName, uri, replyPort);
|
| + _spawnWorker(functionName, uri, message, replyPort);
|
| }
|
| }
|
|
|
| static SendPort _startNonWorker(
|
| - String functionName, String uri, SendPort replyPort) {
|
| + String functionName, String uri, message, SendPort replyPort) {
|
| // TODO(eub): support IE9 using an iframe -- Dart issue 1702.
|
| if (uri != null) throw new UnsupportedError(
|
| "Currently spawnUri is not supported without web workers.");
|
| _globalState.topEventLoop.enqueue(new _IsolateContext(), () {
|
| final func = _getJSFunctionFromName(functionName);
|
| - _startIsolate(func, replyPort);
|
| + _startIsolate(func, message, replyPort);
|
| }, 'nonworker start');
|
| }
|
|
|
| - static void _startIsolate(Function topLevel, SendPort replyTo) {
|
| + static void _startIsolate(Function topLevel, message, SendPort replyTo) {
|
| _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
|
| Primitives.initializeStatics(context.id);
|
| lazyPort = new ReceivePort();
|
| - replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
|
| - topLevel();
|
| + replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]);
|
| + topLevel(message);
|
| }
|
|
|
| /**
|
| * Spawns an isolate in a worker. [factoryName] is the Javascript constructor
|
| * name for the isolate entry point class.
|
| */
|
| - static void _spawnWorker(functionName, uri, replyPort) {
|
| + static void _spawnWorker(functionName, uri, message, replyPort) {
|
| if (uri == null) uri = thisScript;
|
| final worker = JS('var', 'new Worker(#)', uri);
|
|
|
| @@ -644,6 +646,7 @@ class IsolateNatives {
|
| // the port (port deserialization is sensitive to what is the current
|
| // workerId).
|
| 'replyTo': _serializeMessage(replyPort),
|
| + 'msg': message,
|
| 'functionName': functionName }));
|
| }
|
| }
|
| @@ -668,22 +671,7 @@ class _BaseSendPort implements SendPort {
|
| }
|
| }
|
|
|
| - Future call(var message) {
|
| - final completer = new Completer();
|
| - final port = new ReceivePortImpl();
|
| - send(message, port.toSendPort());
|
| - port.receive((value, ignoreReplyTo) {
|
| - port.close();
|
| - if (value is Exception) {
|
| - completer.completeError(value);
|
| - } else {
|
| - completer.complete(value);
|
| - }
|
| - });
|
| - return completer.future;
|
| - }
|
| -
|
| - void send(var message, [SendPort replyTo]);
|
| + void send(var message);
|
| bool operator ==(var other);
|
| int get hashCode;
|
| }
|
| @@ -694,13 +682,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
|
|
| const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
|
|
|
| - void send(var message, [SendPort replyTo = null]) {
|
| - _waitForPendingPorts([message, replyTo], () {
|
| - _checkReplyTo(replyTo);
|
| + void send(var message, [SendPort replyTo]) {
|
| + _waitForPendingPorts(message, () {
|
| // Check that the isolate still runs and the port is still open
|
| final isolate = _globalState.isolates[_isolateId];
|
| if (isolate == null) return;
|
| - if (_receivePort._callback == null) return;
|
| + if (_receivePort._controller.isClosed) return;
|
|
|
| // We force serialization/deserialization as a simple way to ensure
|
| // isolate communication restrictions are respected between isolates that
|
| @@ -712,18 +699,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
| final shouldSerialize = _globalState.currentContext != null
|
| && _globalState.currentContext.id != _isolateId;
|
| var msg = message;
|
| - var reply = replyTo;
|
| if (shouldSerialize) {
|
| msg = _serializeMessage(msg);
|
| - reply = _serializeMessage(reply);
|
| }
|
| _globalState.topEventLoop.enqueue(isolate, () {
|
| - if (_receivePort._callback != null) {
|
| + if (!_receivePort._controller.isClosed) {
|
| if (shouldSerialize) {
|
| msg = _deserializeMessage(msg);
|
| - reply = _deserializeMessage(reply);
|
| }
|
| - _receivePort._callback(msg, reply);
|
| + _receivePort._controller.add(msg);
|
| }
|
| }, 'receive $message');
|
| });
|
| @@ -744,14 +728,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
|
| const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
|
| : super(isolateId);
|
|
|
| - void send(var message, [SendPort replyTo = null]) {
|
| - _waitForPendingPorts([message, replyTo], () {
|
| - _checkReplyTo(replyTo);
|
| + void send(var message, [SendPort replyTo]) {
|
| + _waitForPendingPorts(message, () {
|
| final workerMessage = _serializeMessage({
|
| 'command': 'message',
|
| 'port': this,
|
| - 'msg': message,
|
| - 'replyTo': replyTo});
|
| + 'msg': message});
|
|
|
| if (_globalState.isWorker) {
|
| // Communication from one worker to another go through the
|
| @@ -806,7 +788,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| _futurePort.then((p) {
|
| _port = p;
|
| for (final item in pending) {
|
| - p.send(item['message'], item['replyTo']);
|
| + p.send(item);
|
| }
|
| pending = null;
|
| });
|
| @@ -821,7 +803,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| if (_port != null) {
|
| _port.send(message, replyTo);
|
| } else {
|
| - pending.add({'message': message, 'replyTo': replyTo});
|
| + pending.add(message);
|
| }
|
| }
|
|
|
| @@ -831,26 +813,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| }
|
|
|
| /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
|
| -class ReceivePortImpl implements ReceivePort {
|
| - int _id;
|
| - Function _callback;
|
| +class ReceivePortImpl extends Stream implements ReceivePort {
|
| static int _nextFreeId = 1;
|
| + final int _id;
|
| + StreamController _controller;
|
|
|
| ReceivePortImpl()
|
| : _id = _nextFreeId++ {
|
| + _controller = new StreamController(onCancel: close, sync: true);
|
| _globalState.currentContext.register(_id, this);
|
| }
|
|
|
| - void receive(void onMessage(var message, SendPort replyTo)) {
|
| - _callback = onMessage;
|
| + StreamSubscription listen(void onData(var event),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| }
|
|
|
| void close() {
|
| - _callback = null;
|
| + if (_controller.isClosed) return;
|
| + _controller.close();
|
| _globalState.currentContext.unregister(_id);
|
| }
|
|
|
| - SendPort toSendPort() {
|
| + SendPort get sendPort {
|
| return new _NativeJsSendPort(this, _globalState.currentContext.id);
|
| }
|
| }
|
| @@ -876,9 +864,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
|
| final seen = _visited[list];
|
| if (seen != null) return;
|
| _visited[list] = true;
|
| - // TODO(sigmund): replace with the following: (bug #1660)
|
| - // list.forEach(_dispatch);
|
| - list.forEach((e) => _dispatch(e));
|
| + list.forEach(_dispatch);
|
| }
|
|
|
| visitMap(Map map) {
|
| @@ -886,9 +872,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
|
| if (seen != null) return;
|
|
|
| _visited[map] = true;
|
| - // TODO(sigmund): replace with the following: (bug #1660)
|
| - // map.values.forEach(_dispatch);
|
| - map.values.forEach((e) => _dispatch(e));
|
| + map.values.forEach(_dispatch);
|
| }
|
|
|
| visitSendPort(var port) {
|
|
|