Chromium Code Reviews| Index: sdk/lib/_internal/lib/isolate_helper.dart |
| diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
| index 8359a30042ffec4faec87402a1da0477f0c11e03..6838f162d2ecc92ecdb5b2eb1cb6c17d1c79166b 100644 |
| --- a/sdk/lib/_internal/lib/isolate_helper.dart |
| +++ b/sdk/lib/_internal/lib/isolate_helper.dart |
| @@ -385,6 +385,10 @@ var globalWorker = JS('', "#.Worker", globalThis); |
| bool globalPostMessageDefined = |
| JS('', "#.postMessage !== (void 0)", globalThis); |
| +typedef _MainFunction(); |
| +typedef _MainFunctionArgs(args); |
| +typedef _MainFunctionArgsMessage(args, message); |
| + |
| class IsolateNatives { |
| static String thisScript = computeThisScript(); |
| @@ -468,10 +472,13 @@ class IsolateNatives { |
| Function entryPoint = (functionName == null) |
| ? _globalState.entry |
| : _getJSFunctionFromName(functionName); |
| + var args = msg['args']; |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Are we sure we don't need to deserializer args too
floitsch
2013/10/25 13:52:56
We need to check, because dart2js will (or should)
|
| + var message = _deserializeMessage(msg['msg']); |
| + var isSpawnUri = msg['isSpawnUri']; |
| var replyTo = _deserializeMessage(msg['replyTo']); |
| var context = new _IsolateContext(); |
| _globalState.topEventLoop.enqueue(context, () { |
| - _startIsolate(entryPoint, replyTo); |
| + _startIsolate(entryPoint, args, message, isSpawnUri, replyTo); |
| }, 'worker-start'); |
| // Make sure we always have a current context in this worker. |
| // TODO(7907): This is currently needed because we're using |
| @@ -483,13 +490,15 @@ class IsolateNatives { |
| _globalState.topEventLoop.run(); |
| break; |
| case 'spawn-worker': |
| - _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']); |
| + _spawnWorker(msg['functionName'], msg['uri'], |
| + msg['args'], msg['msg'], |
| + msg['isSpawnUri'], msg['replyPort']); |
| break; |
| case 'message': |
| SendPort port = msg['port']; |
| // If the port has been closed, we ignore the message. |
| if (port != null) { |
| - msg['port'].send(msg['msg'], msg['replyTo']); |
| + msg['port'].send(msg['msg']); |
| } |
| _globalState.topEventLoop.run(); |
| break; |
| @@ -550,19 +559,25 @@ class IsolateNatives { |
| return JS("", "new #()", ctor); |
| } |
| - static SendPort spawnFunction(void topLevelFunction()) { |
| + static SendPort spawnFunction(void topLevelFunction(message), message) { |
| final name = _getJSFunctionName(topLevelFunction); |
| if (name == null) { |
| throw new UnsupportedError( |
| "only top-level functions can be spawned."); |
| } |
| - return spawn(name, null, false); |
| + return spawn(name, null, null, message, false, false); |
| + } |
| + |
| + static SendPort spawnUri(Uri uri, List<String> args, message) { |
| + return spawn(null, uri.path, args, message, false, true); |
| } |
| // TODO(sigmund): clean up above, after we make the new API the default: |
| /// If [uri] is `null` it is replaced with the current script. |
| - static spawn(String functionName, String uri, bool isLight) { |
| + static SendPort spawn(String functionName, String uri, |
| + List<String> args, message, |
| + bool isLight, bool isSpawnUri) { |
| // Assume that the compiled version of the Dart file lives just next to the |
| // dart file. |
| // TODO(floitsch): support precompiled version of dart2js output. |
| @@ -570,60 +585,85 @@ class IsolateNatives { |
| Completer<SendPort> completer = new Completer<SendPort>.sync(); |
| ReceivePort port = new ReceivePort(); |
| - port.receive((msg, SendPort replyPort) { |
| + port.listen((msg) { |
| port.close(); |
| - assert(msg == _SPAWNED_SIGNAL); |
| - completer.complete(replyPort); |
| + assert(msg[0] == _SPAWNED_SIGNAL); |
| + completer.complete(msg[1]); |
| }); |
| - SendPort signalReply = port.toSendPort(); |
| + SendPort signalReply = port.sendPort; |
| if (_globalState.useWorkers && !isLight) { |
| - _startWorker(functionName, uri, signalReply); |
| + _startWorker(functionName, uri, args, message, isSpawnUri, signalReply); |
| } else { |
| - _startNonWorker(functionName, uri, signalReply); |
| + _startNonWorker( |
| + functionName, uri, args, message, isSpawnUri, signalReply); |
| } |
| return new _BufferingSendPort( |
| _globalState.currentContext.id, completer.future); |
| } |
| - static SendPort _startWorker( |
| - String functionName, String uri, SendPort replyPort) { |
| + static void _startWorker( |
| + String functionName, String uri, |
| + List<String> args, message, |
| + bool isSpawnUri, |
| + SendPort replyPort) { |
| if (_globalState.isWorker) { |
| _globalState.mainManager.postMessage(_serializeMessage({ |
| 'command': 'spawn-worker', |
| 'functionName': functionName, |
| + 'args': args, |
| + 'msg': message, |
| 'uri': uri, |
| + 'isSpawnUri': isSpawnUri, |
| 'replyPort': replyPort})); |
| } else { |
| - _spawnWorker(functionName, uri, replyPort); |
| + _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort); |
| } |
| } |
| - static SendPort _startNonWorker( |
| - String functionName, String uri, SendPort replyPort) { |
| + static void _startNonWorker( |
| + String functionName, String uri, |
| + List<String> args, message, |
| + bool isSpawnUri, |
| + SendPort replyPort) { |
| // TODO(eub): support IE9 using an iframe -- Dart issue 1702. |
| if (uri != null) throw new UnsupportedError( |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Multi-line if needs braces.
floitsch
2013/10/25 13:52:56
Done.
|
| "Currently spawnUri is not supported without web workers."); |
| _globalState.topEventLoop.enqueue(new _IsolateContext(), () { |
| final func = _getJSFunctionFromName(functionName); |
| - _startIsolate(func, replyPort); |
| + _startIsolate(func, args, message, isSpawnUri, replyPort); |
| }, 'nonworker start'); |
| } |
| - static void _startIsolate(Function topLevel, SendPort replyTo) { |
| + static void _startIsolate(Function topLevel, |
| + List<String> args, message, |
| + bool isSpawnUri, |
| + SendPort replyTo) { |
| _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); |
| Primitives.initializeStatics(context.id); |
| lazyPort = new ReceivePort(); |
| - replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); |
| - topLevel(); |
| + replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]); |
| + if (!isSpawnUri) { |
| + topLevel(message); |
| + } else if (topLevel is _MainFunctionArgsMessage) { |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
If message was null, should we have preferred to c
floitsch
2013/10/25 13:52:56
I prefer providing the argument. This is the expli
|
| + topLevel(args, message); |
| + } else if (topLevel is _MainFunctionArgs) { |
| + topLevel(args); |
| + } else { |
| + topLevel(); |
| + } |
| } |
| /** |
| * Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
| * name for the isolate entry point class. |
| */ |
| - static void _spawnWorker(functionName, uri, replyPort) { |
| + static void _spawnWorker(functionName, String uri, |
| + List<String> args, message, |
| + bool isSpawnUri, |
| + SendPort replyPort) { |
| + print("message: $message"); |
|
Lasse Reichstein Nielsen
2013/10/25 12:45:59
Debug print.
Was that mine?
floitsch
2013/10/25 13:52:56
probably mine.
|
| if (uri == null) uri = thisScript; |
| final worker = JS('var', 'new Worker(#)', uri); |
| @@ -644,6 +684,9 @@ class IsolateNatives { |
| // the port (port deserialization is sensitive to what is the current |
| // workerId). |
| 'replyTo': _serializeMessage(replyPort), |
| + 'args': args, |
| + 'msg': _serializeMessage(message), |
| + 'isSpawnUri': isSpawnUri, |
| 'functionName': functionName })); |
| } |
| } |
| @@ -668,22 +711,7 @@ class _BaseSendPort implements SendPort { |
| } |
| } |
| - Future call(var message) { |
| - final completer = new Completer(); |
| - final port = new ReceivePortImpl(); |
| - send(message, port.toSendPort()); |
| - port.receive((value, ignoreReplyTo) { |
| - port.close(); |
| - if (value is Exception) { |
| - completer.completeError(value); |
| - } else { |
| - completer.complete(value); |
| - } |
| - }); |
| - return completer.future; |
| - } |
| - |
| - void send(var message, [SendPort replyTo]); |
| + void send(var message); |
| bool operator ==(var other); |
| int get hashCode; |
| } |
| @@ -694,13 +722,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
| - void send(var message, [SendPort replyTo = null]) { |
| - _waitForPendingPorts([message, replyTo], () { |
| - _checkReplyTo(replyTo); |
| + void send(var message, [SendPort replyTo]) { |
| + _waitForPendingPorts(message, () { |
| // Check that the isolate still runs and the port is still open |
| final isolate = _globalState.isolates[_isolateId]; |
| if (isolate == null) return; |
| - if (_receivePort._callback == null) return; |
| + if (_receivePort._controller.isClosed) return; |
| // We force serialization/deserialization as a simple way to ensure |
| // isolate communication restrictions are respected between isolates that |
| @@ -712,18 +739,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| final shouldSerialize = _globalState.currentContext != null |
| && _globalState.currentContext.id != _isolateId; |
| var msg = message; |
| - var reply = replyTo; |
| if (shouldSerialize) { |
| msg = _serializeMessage(msg); |
| - reply = _serializeMessage(reply); |
| } |
| _globalState.topEventLoop.enqueue(isolate, () { |
| - if (_receivePort._callback != null) { |
| + if (!_receivePort._controller.isClosed) { |
| if (shouldSerialize) { |
| msg = _deserializeMessage(msg); |
| - reply = _deserializeMessage(reply); |
| } |
| - _receivePort._callback(msg, reply); |
| + _receivePort._controller.add(msg); |
| } |
| }, 'receive $message'); |
| }); |
| @@ -744,14 +768,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort { |
| const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
| : super(isolateId); |
| - void send(var message, [SendPort replyTo = null]) { |
| - _waitForPendingPorts([message, replyTo], () { |
| - _checkReplyTo(replyTo); |
| + void send(var message, [SendPort replyTo]) { |
| + _waitForPendingPorts(message, () { |
| final workerMessage = _serializeMessage({ |
| 'command': 'message', |
| 'port': this, |
| - 'msg': message, |
| - 'replyTo': replyTo}); |
| + 'msg': message}); |
| if (_globalState.isWorker) { |
| // Communication from one worker to another go through the |
| @@ -806,7 +828,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
| _futurePort.then((p) { |
| _port = p; |
| for (final item in pending) { |
| - p.send(item['message'], item['replyTo']); |
| + p.send(item); |
| } |
| pending = null; |
| }); |
| @@ -821,7 +843,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
| if (_port != null) { |
| _port.send(message, replyTo); |
| } else { |
| - pending.add({'message': message, 'replyTo': replyTo}); |
| + pending.add(message); |
| } |
| } |
| @@ -831,26 +853,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
| } |
| /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
| -class ReceivePortImpl implements ReceivePort { |
| - int _id; |
| - Function _callback; |
| +class ReceivePortImpl extends Stream implements ReceivePort { |
| static int _nextFreeId = 1; |
| + final int _id; |
| + StreamController _controller; |
| ReceivePortImpl() |
| : _id = _nextFreeId++ { |
| + _controller = new StreamController(onCancel: close, sync: true); |
| _globalState.currentContext.register(_id, this); |
| } |
| - void receive(void onMessage(var message, SendPort replyTo)) { |
| - _callback = onMessage; |
| + StreamSubscription listen(void onData(var event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| } |
| void close() { |
| - _callback = null; |
| + if (_controller.isClosed) return; |
| + _controller.close(); |
| _globalState.currentContext.unregister(_id); |
| } |
| - SendPort toSendPort() { |
| + SendPort get sendPort { |
| return new _NativeJsSendPort(this, _globalState.currentContext.id); |
| } |
| } |
| @@ -876,9 +904,7 @@ class _PendingSendPortFinder extends _MessageTraverser { |
| final seen = _visited[list]; |
| if (seen != null) return; |
| _visited[list] = true; |
| - // TODO(sigmund): replace with the following: (bug #1660) |
| - // list.forEach(_dispatch); |
| - list.forEach((e) => _dispatch(e)); |
| + list.forEach(_dispatch); |
| } |
| visitMap(Map map) { |
| @@ -886,9 +912,7 @@ class _PendingSendPortFinder extends _MessageTraverser { |
| if (seen != null) return; |
| _visited[map] = true; |
| - // TODO(sigmund): replace with the following: (bug #1660) |
| - // map.values.forEach(_dispatch); |
| - map.values.forEach((e) => _dispatch(e)); |
| + map.values.forEach(_dispatch); |
| } |
| visitSendPort(var port) { |