| Index: sdk/lib/_internal/lib/isolate_helper.dart
|
| diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
|
| index 7d9e722c046e07d9b4576ac114ec2d9bf0e47e94..34bdfbe4606e6ae158c5d350dbe75bdd85517b71 100644
|
| --- a/sdk/lib/_internal/lib/isolate_helper.dart
|
| +++ b/sdk/lib/_internal/lib/isolate_helper.dart
|
| @@ -22,38 +22,6 @@ import 'dart:_interceptors' show JSExtendableArray;
|
|
|
| ReceivePort lazyPort;
|
|
|
| -class CloseToken {
|
| - /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
|
| - /// close themselves.
|
| - const CloseToken();
|
| -}
|
| -
|
| -class JsIsolateSink extends EventSink<dynamic> implements IsolateSink {
|
| - bool _isClosed = false;
|
| - final SendPort _port;
|
| - JsIsolateSink.fromPort(this._port);
|
| -
|
| - void add(dynamic message) {
|
| - _port.send(message);
|
| - }
|
| -
|
| - void addError(errorEvent) {
|
| - throw new UnimplementedError("addError on isolate streams");
|
| - }
|
| -
|
| - void close() {
|
| - if (_isClosed) return;
|
| - add(const CloseToken());
|
| - _isClosed = true;
|
| - }
|
| -
|
| - bool operator==(var other) {
|
| - return other is IsolateSink && _port == other._port;
|
| - }
|
| -
|
| - int get hashCode => _port.hashCode + 499;
|
| -}
|
| -
|
| /**
|
| * Called by the compiler to support switching
|
| * between isolates when we get a callback from the DOM.
|
| @@ -90,7 +58,13 @@ void startRootIsolate(entry) {
|
| // by having a "default" isolate (the first one created).
|
| _globalState.currentContext = rootContext;
|
|
|
| - rootContext.eval(entry);
|
| + if (entry is _MainFunctionArgs) {
|
| + rootContext.eval(() { entry([]); });
|
| + } else if (entry is _MainFunctionArgsMessage) {
|
| + rootContext.eval(() { entry([], null); });
|
| + } else {
|
| + rootContext.eval(entry);
|
| + }
|
| _globalState.topEventLoop.run();
|
| }
|
|
|
| @@ -417,6 +391,10 @@ var globalWorker = JS('', "#.Worker", globalThis);
|
| bool globalPostMessageDefined =
|
| JS('', "#.postMessage !== (void 0)", globalThis);
|
|
|
| +typedef _MainFunction();
|
| +typedef _MainFunctionArgs(args);
|
| +typedef _MainFunctionArgsMessage(args, message);
|
| +
|
| class IsolateNatives {
|
|
|
| static String thisScript = computeThisScript();
|
| @@ -500,10 +478,13 @@ class IsolateNatives {
|
| Function entryPoint = (functionName == null)
|
| ? _globalState.entry
|
| : _getJSFunctionFromName(functionName);
|
| + var args = msg['args'];
|
| + var message = _deserializeMessage(msg['msg']);
|
| + var isSpawnUri = msg['isSpawnUri'];
|
| var replyTo = _deserializeMessage(msg['replyTo']);
|
| var context = new _IsolateContext();
|
| _globalState.topEventLoop.enqueue(context, () {
|
| - _startIsolate(entryPoint, replyTo);
|
| + _startIsolate(entryPoint, args, message, isSpawnUri, replyTo);
|
| }, 'worker-start');
|
| // Make sure we always have a current context in this worker.
|
| // TODO(7907): This is currently needed because we're using
|
| @@ -515,13 +496,15 @@ class IsolateNatives {
|
| _globalState.topEventLoop.run();
|
| break;
|
| case 'spawn-worker':
|
| - _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']);
|
| + _spawnWorker(msg['functionName'], msg['uri'],
|
| + msg['args'], msg['msg'],
|
| + msg['isSpawnUri'], msg['replyPort']);
|
| break;
|
| case 'message':
|
| SendPort port = msg['port'];
|
| // If the port has been closed, we ignore the message.
|
| if (port != null) {
|
| - msg['port'].send(msg['msg'], msg['replyTo']);
|
| + msg['port'].send(msg['msg']);
|
| }
|
| _globalState.topEventLoop.run();
|
| break;
|
| @@ -582,19 +565,25 @@ class IsolateNatives {
|
| return JS("", "new #()", ctor);
|
| }
|
|
|
| - static SendPort spawnFunction(void topLevelFunction()) {
|
| + static SendPort spawnFunction(void topLevelFunction(message), message) {
|
| final name = _getJSFunctionName(topLevelFunction);
|
| if (name == null) {
|
| throw new UnsupportedError(
|
| "only top-level functions can be spawned.");
|
| }
|
| - return spawn(name, null, false);
|
| + return spawn(name, null, null, message, false, false);
|
| + }
|
| +
|
| + static SendPort spawnUri(Uri uri, List<String> args, message) {
|
| + return spawn(null, uri.path, args, message, false, true);
|
| }
|
|
|
| // TODO(sigmund): clean up above, after we make the new API the default:
|
|
|
| /// If [uri] is `null` it is replaced with the current script.
|
| - static spawn(String functionName, String uri, bool isLight) {
|
| + static SendPort spawn(String functionName, String uri,
|
| + List<String> args, message,
|
| + bool isLight, bool isSpawnUri) {
|
| // Assume that the compiled version of the Dart file lives just next to the
|
| // dart file.
|
| // TODO(floitsch): support precompiled version of dart2js output.
|
| @@ -602,60 +591,86 @@ class IsolateNatives {
|
|
|
| Completer<SendPort> completer = new Completer<SendPort>.sync();
|
| ReceivePort port = new ReceivePort();
|
| - port.receive((msg, SendPort replyPort) {
|
| + port.listen((msg) {
|
| port.close();
|
| - assert(msg == _SPAWNED_SIGNAL);
|
| - completer.complete(replyPort);
|
| + assert(msg[0] == _SPAWNED_SIGNAL);
|
| + completer.complete(msg[1]);
|
| });
|
|
|
| - SendPort signalReply = port.toSendPort();
|
| + SendPort signalReply = port.sendPort;
|
|
|
| if (_globalState.useWorkers && !isLight) {
|
| - _startWorker(functionName, uri, signalReply);
|
| + _startWorker(functionName, uri, args, message, isSpawnUri, signalReply);
|
| } else {
|
| - _startNonWorker(functionName, uri, signalReply);
|
| + _startNonWorker(
|
| + functionName, uri, args, message, isSpawnUri, signalReply);
|
| }
|
| return new _BufferingSendPort(
|
| _globalState.currentContext.id, completer.future);
|
| }
|
|
|
| - static SendPort _startWorker(
|
| - String functionName, String uri, SendPort replyPort) {
|
| + static void _startWorker(
|
| + String functionName, String uri,
|
| + List<String> args, message,
|
| + bool isSpawnUri,
|
| + SendPort replyPort) {
|
| if (_globalState.isWorker) {
|
| _globalState.mainManager.postMessage(_serializeMessage({
|
| 'command': 'spawn-worker',
|
| 'functionName': functionName,
|
| + 'args': args,
|
| + 'msg': message,
|
| 'uri': uri,
|
| + 'isSpawnUri': isSpawnUri,
|
| 'replyPort': replyPort}));
|
| } else {
|
| - _spawnWorker(functionName, uri, replyPort);
|
| + _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort);
|
| }
|
| }
|
|
|
| - static SendPort _startNonWorker(
|
| - String functionName, String uri, SendPort replyPort) {
|
| + static void _startNonWorker(
|
| + String functionName, String uri,
|
| + List<String> args, message,
|
| + bool isSpawnUri,
|
| + SendPort replyPort) {
|
| // TODO(eub): support IE9 using an iframe -- Dart issue 1702.
|
| - if (uri != null) throw new UnsupportedError(
|
| - "Currently spawnUri is not supported without web workers.");
|
| + if (uri != null) {
|
| + throw new UnsupportedError(
|
| + "Currently spawnUri is not supported without web workers.");
|
| + }
|
| _globalState.topEventLoop.enqueue(new _IsolateContext(), () {
|
| final func = _getJSFunctionFromName(functionName);
|
| - _startIsolate(func, replyPort);
|
| + _startIsolate(func, args, message, isSpawnUri, replyPort);
|
| }, 'nonworker start');
|
| }
|
|
|
| - static void _startIsolate(Function topLevel, SendPort replyTo) {
|
| + static void _startIsolate(Function topLevel,
|
| + List<String> args, message,
|
| + bool isSpawnUri,
|
| + SendPort replyTo) {
|
| _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
|
| Primitives.initializeStatics(context.id);
|
| lazyPort = new ReceivePort();
|
| - replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
|
| - topLevel();
|
| + replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]);
|
| + if (!isSpawnUri) {
|
| + topLevel(message);
|
| + } else if (topLevel is _MainFunctionArgsMessage) {
|
| + topLevel(args, message);
|
| + } else if (topLevel is _MainFunctionArgs) {
|
| + topLevel(args);
|
| + } else {
|
| + topLevel();
|
| + }
|
| }
|
|
|
| /**
|
| * Spawns an isolate in a worker. [factoryName] is the Javascript constructor
|
| * name for the isolate entry point class.
|
| */
|
| - static void _spawnWorker(functionName, uri, replyPort) {
|
| + static void _spawnWorker(functionName, String uri,
|
| + List<String> args, message,
|
| + bool isSpawnUri,
|
| + SendPort replyPort) {
|
| if (uri == null) uri = thisScript;
|
| final worker = JS('var', 'new Worker(#)', uri);
|
|
|
| @@ -676,6 +691,9 @@ class IsolateNatives {
|
| // the port (port deserialization is sensitive to what is the current
|
| // workerId).
|
| 'replyTo': _serializeMessage(replyPort),
|
| + 'args': args,
|
| + 'msg': _serializeMessage(message),
|
| + 'isSpawnUri': isSpawnUri,
|
| 'functionName': functionName }));
|
| }
|
| }
|
| @@ -700,22 +718,7 @@ class _BaseSendPort implements SendPort {
|
| }
|
| }
|
|
|
| - Future call(var message) {
|
| - final completer = new Completer();
|
| - final port = new ReceivePortImpl();
|
| - send(message, port.toSendPort());
|
| - port.receive((value, ignoreReplyTo) {
|
| - port.close();
|
| - if (value is Exception) {
|
| - completer.completeError(value);
|
| - } else {
|
| - completer.complete(value);
|
| - }
|
| - });
|
| - return completer.future;
|
| - }
|
| -
|
| - void send(var message, [SendPort replyTo]);
|
| + void send(var message);
|
| bool operator ==(var other);
|
| int get hashCode;
|
| }
|
| @@ -726,13 +729,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
|
|
| const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
|
|
|
| - void send(var message, [SendPort replyTo = null]) {
|
| - _waitForPendingPorts([message, replyTo], () {
|
| - _checkReplyTo(replyTo);
|
| + void send(var message, [SendPort replyTo]) {
|
| + _waitForPendingPorts(message, () {
|
| // Check that the isolate still runs and the port is still open
|
| final isolate = _globalState.isolates[_isolateId];
|
| if (isolate == null) return;
|
| - if (_receivePort._callback == null) return;
|
| + if (_receivePort._controller.isClosed) return;
|
|
|
| // We force serialization/deserialization as a simple way to ensure
|
| // isolate communication restrictions are respected between isolates that
|
| @@ -744,18 +746,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
| final shouldSerialize = _globalState.currentContext != null
|
| && _globalState.currentContext.id != _isolateId;
|
| var msg = message;
|
| - var reply = replyTo;
|
| if (shouldSerialize) {
|
| msg = _serializeMessage(msg);
|
| - reply = _serializeMessage(reply);
|
| }
|
| _globalState.topEventLoop.enqueue(isolate, () {
|
| - if (_receivePort._callback != null) {
|
| + if (!_receivePort._controller.isClosed) {
|
| if (shouldSerialize) {
|
| msg = _deserializeMessage(msg);
|
| - reply = _deserializeMessage(reply);
|
| }
|
| - _receivePort._callback(msg, reply);
|
| + _receivePort._controller.add(msg);
|
| }
|
| }, 'receive $message');
|
| });
|
| @@ -776,14 +775,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
|
| const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
|
| : super(isolateId);
|
|
|
| - void send(var message, [SendPort replyTo = null]) {
|
| - _waitForPendingPorts([message, replyTo], () {
|
| - _checkReplyTo(replyTo);
|
| + void send(var message, [SendPort replyTo]) {
|
| + _waitForPendingPorts(message, () {
|
| final workerMessage = _serializeMessage({
|
| 'command': 'message',
|
| 'port': this,
|
| - 'msg': message,
|
| - 'replyTo': replyTo});
|
| + 'msg': message});
|
|
|
| if (_globalState.isWorker) {
|
| // Communication from one worker to another go through the
|
| @@ -838,7 +835,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| _futurePort.then((p) {
|
| _port = p;
|
| for (final item in pending) {
|
| - p.send(item['message'], item['replyTo']);
|
| + p.send(item);
|
| }
|
| pending = null;
|
| });
|
| @@ -853,7 +850,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| if (_port != null) {
|
| _port.send(message, replyTo);
|
| } else {
|
| - pending.add({'message': message, 'replyTo': replyTo});
|
| + pending.add(message);
|
| }
|
| }
|
|
|
| @@ -863,26 +860,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| }
|
|
|
| /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
|
| -class ReceivePortImpl implements ReceivePort {
|
| - int _id;
|
| - Function _callback;
|
| +class ReceivePortImpl extends Stream implements ReceivePort {
|
| static int _nextFreeId = 1;
|
| + final int _id;
|
| + StreamController _controller;
|
|
|
| ReceivePortImpl()
|
| : _id = _nextFreeId++ {
|
| + _controller = new StreamController(onCancel: close, sync: true);
|
| _globalState.currentContext.register(_id, this);
|
| }
|
|
|
| - void receive(void onMessage(var message, SendPort replyTo)) {
|
| - _callback = onMessage;
|
| + StreamSubscription listen(void onData(var event),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| }
|
|
|
| void close() {
|
| - _callback = null;
|
| + if (_controller.isClosed) return;
|
| + _controller.close();
|
| _globalState.currentContext.unregister(_id);
|
| }
|
|
|
| - SendPort toSendPort() {
|
| + SendPort get sendPort {
|
| return new _NativeJsSendPort(this, _globalState.currentContext.id);
|
| }
|
| }
|
| @@ -908,9 +911,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
|
| final seen = _visited[list];
|
| if (seen != null) return;
|
| _visited[list] = true;
|
| - // TODO(sigmund): replace with the following: (bug #1660)
|
| - // list.forEach(_dispatch);
|
| - list.forEach((e) => _dispatch(e));
|
| + list.forEach(_dispatch);
|
| }
|
|
|
| visitMap(Map map) {
|
| @@ -918,9 +919,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
|
| if (seen != null) return;
|
|
|
| _visited[map] = true;
|
| - // TODO(sigmund): replace with the following: (bug #1660)
|
| - // map.values.forEach(_dispatch);
|
| - map.values.forEach((e) => _dispatch(e));
|
| + map.values.forEach(_dispatch);
|
| }
|
|
|
| visitSendPort(var port) {
|
| @@ -928,14 +927,6 @@ class _PendingSendPortFinder extends _MessageTraverser {
|
| ports.add(port._futurePort);
|
| }
|
| }
|
| -
|
| - visitIsolateSink(JsIsolateSink sink) {
|
| - visitSendPort(sink._port);
|
| - }
|
| -
|
| - visitCloseToken(CloseToken token) {
|
| - // Do nothing.
|
| - }
|
| }
|
|
|
| /********************************************************
|
| @@ -993,16 +984,6 @@ class _JsSerializer extends _Serializer {
|
| " ports are resolved at this point.";
|
| }
|
| }
|
| -
|
| - visitIsolateSink(JsIsolateSink sink) {
|
| - SendPort port = sink._port;
|
| - bool isClosed = sink._isClosed;
|
| - return ['isolateSink', visitSendPort(port), isClosed];
|
| - }
|
| -
|
| - visitCloseToken(CloseToken token) {
|
| - return ['closeToken'];
|
| - }
|
| }
|
|
|
|
|
| @@ -1036,18 +1017,6 @@ class _JsCopier extends _Copier {
|
| " ports are resolved at this point.";
|
| }
|
| }
|
| -
|
| - IsolateSink visitIsolateSink(JsIsolateSink sink) {
|
| - SendPort port = sink._port;
|
| - bool isClosed = sink._isClosed;
|
| - JsIsolateSink result = new JsIsolateSink.fromPort(visitSendPort(port));
|
| - result._isClosed = isClosed;
|
| - return result;
|
| - }
|
| -
|
| - CloseToken visitCloseToken(CloseToken token) {
|
| - return token; // Can be shared.
|
| - }
|
| }
|
|
|
| class _JsDeserializer extends _Deserializer {
|
| @@ -1068,18 +1037,6 @@ class _JsDeserializer extends _Deserializer {
|
| return new _WorkerSendPort(managerId, isolateId, receivePortId);
|
| }
|
| }
|
| -
|
| - IsolateSink deserializeIsolateSink(List list) {
|
| - SendPort port = deserializeSendPort(list[1]);
|
| - bool isClosed = list[2];
|
| - JsIsolateSink result = new JsIsolateSink.fromPort(port);
|
| - result._isClosed = isClosed;
|
| - return result;
|
| - }
|
| -
|
| - CloseToken deserializeCloseToken(List list) {
|
| - return const CloseToken();
|
| - }
|
| }
|
|
|
| class _JsVisitedMap implements _MessageTraverserVisitedMap {
|
| @@ -1176,9 +1133,6 @@ class _MessageTraverser {
|
| if (x is List) return visitList(x);
|
| if (x is Map) return visitMap(x);
|
| if (x is SendPort) return visitSendPort(x);
|
| - if (x is SendPortSync) return visitSendPortSync(x);
|
| - if (x is JsIsolateSink) return visitIsolateSink(x);
|
| - if (x is CloseToken) return visitCloseToken(x);
|
|
|
| // Overridable fallback.
|
| return visitObject(x);
|
| @@ -1188,9 +1142,6 @@ class _MessageTraverser {
|
| visitList(List x);
|
| visitMap(Map x);
|
| visitSendPort(SendPort x);
|
| - visitSendPortSync(SendPortSync x);
|
| - visitIsolateSink(IsolateSink x);
|
| - visitCloseToken(CloseToken x);
|
|
|
| visitObject(Object x) {
|
| // TODO(floitsch): make this a real exception. (which one)?
|
| @@ -1302,8 +1253,6 @@ class _Deserializer {
|
| case 'list': return _deserializeList(x);
|
| case 'map': return _deserializeMap(x);
|
| case 'sendport': return deserializeSendPort(x);
|
| - case 'isolateSink': return deserializeIsolateSink(x);
|
| - case 'closeToken': return deserializeCloseToken(x);
|
| default: return deserializeObject(x);
|
| }
|
| }
|
| @@ -1345,10 +1294,6 @@ class _Deserializer {
|
|
|
| deserializeSendPort(List x);
|
|
|
| - deserializeIsolateSink(List x);
|
| -
|
| - deserializeCloseToken(List x);
|
| -
|
| deserializeObject(List x) {
|
| // TODO(floitsch): Use real exception (which one?).
|
| throw "Unexpected serialized object";
|
|
|