Index: sdk/lib/_internal/lib/isolate_helper.dart |
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
index 8359a30042ffec4faec87402a1da0477f0c11e03..d4d6c70bbf1edb23f371967622cc1529f2bd4a3e 100644 |
--- a/sdk/lib/_internal/lib/isolate_helper.dart |
+++ b/sdk/lib/_internal/lib/isolate_helper.dart |
@@ -471,7 +471,7 @@ class IsolateNatives { |
var replyTo = _deserializeMessage(msg['replyTo']); |
var context = new _IsolateContext(); |
_globalState.topEventLoop.enqueue(context, () { |
- _startIsolate(entryPoint, replyTo); |
+ _startIsolate(entryPoint, null, replyTo); |
}, 'worker-start'); |
// Make sure we always have a current context in this worker. |
// TODO(7907): This is currently needed because we're using |
@@ -483,13 +483,14 @@ class IsolateNatives { |
_globalState.topEventLoop.run(); |
break; |
case 'spawn-worker': |
- _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']); |
+ _spawnWorker(msg['functionName'], msg['uri'], |
+ msg['msg'], msg['replyPort']); |
break; |
case 'message': |
SendPort port = msg['port']; |
// If the port has been closed, we ignore the message. |
if (port != null) { |
- msg['port'].send(msg['msg'], msg['replyTo']); |
+ msg['port'].send(msg['msg']); |
} |
_globalState.topEventLoop.run(); |
break; |
@@ -550,19 +551,19 @@ class IsolateNatives { |
return JS("", "new #()", ctor); |
} |
- static SendPort spawnFunction(void topLevelFunction()) { |
+ static SendPort spawnFunction(void topLevelFunction(message), message) { |
final name = _getJSFunctionName(topLevelFunction); |
if (name == null) { |
throw new UnsupportedError( |
"only top-level functions can be spawned."); |
} |
- return spawn(name, null, false); |
+ return spawn(name, null, message, false); |
} |
// TODO(sigmund): clean up above, after we make the new API the default: |
/// If [uri] is `null` it is replaced with the current script. |
- static spawn(String functionName, String uri, bool isLight) { |
+ static spawn(String functionName, String uri, message, bool isLight) { |
// Assume that the compiled version of the Dart file lives just next to the |
// dart file. |
// TODO(floitsch): support precompiled version of dart2js output. |
@@ -570,60 +571,61 @@ class IsolateNatives { |
Completer<SendPort> completer = new Completer<SendPort>.sync(); |
ReceivePort port = new ReceivePort(); |
- port.receive((msg, SendPort replyPort) { |
+ port.listen((msg) { |
port.close(); |
- assert(msg == _SPAWNED_SIGNAL); |
- completer.complete(replyPort); |
+ assert(msg[0] == _SPAWNED_SIGNAL); |
+ completer.complete(msg[1]); |
}); |
- SendPort signalReply = port.toSendPort(); |
+ SendPort signalReply = port.sendPort; |
if (_globalState.useWorkers && !isLight) { |
- _startWorker(functionName, uri, signalReply); |
+ _startWorker(functionName, uri, message, signalReply); |
} else { |
- _startNonWorker(functionName, uri, signalReply); |
+ _startNonWorker(functionName, uri, message, signalReply); |
} |
return new _BufferingSendPort( |
_globalState.currentContext.id, completer.future); |
} |
static SendPort _startWorker( |
- String functionName, String uri, SendPort replyPort) { |
+ String functionName, String uri, message, SendPort replyPort) { |
if (_globalState.isWorker) { |
_globalState.mainManager.postMessage(_serializeMessage({ |
'command': 'spawn-worker', |
'functionName': functionName, |
+ 'msg': message, |
'uri': uri, |
'replyPort': replyPort})); |
} else { |
- _spawnWorker(functionName, uri, replyPort); |
+ _spawnWorker(functionName, uri, message, replyPort); |
} |
} |
static SendPort _startNonWorker( |
- String functionName, String uri, SendPort replyPort) { |
+ String functionName, String uri, message, SendPort replyPort) { |
// TODO(eub): support IE9 using an iframe -- Dart issue 1702. |
if (uri != null) throw new UnsupportedError( |
"Currently spawnUri is not supported without web workers."); |
_globalState.topEventLoop.enqueue(new _IsolateContext(), () { |
final func = _getJSFunctionFromName(functionName); |
- _startIsolate(func, replyPort); |
+ _startIsolate(func, message, replyPort); |
}, 'nonworker start'); |
} |
- static void _startIsolate(Function topLevel, SendPort replyTo) { |
+ static void _startIsolate(Function topLevel, message, SendPort replyTo) { |
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); |
Primitives.initializeStatics(context.id); |
lazyPort = new ReceivePort(); |
- replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); |
- topLevel(); |
+ replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]); |
+ topLevel(message); |
} |
/** |
* Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
* name for the isolate entry point class. |
*/ |
- static void _spawnWorker(functionName, uri, replyPort) { |
+ static void _spawnWorker(functionName, uri, message, replyPort) { |
if (uri == null) uri = thisScript; |
final worker = JS('var', 'new Worker(#)', uri); |
@@ -644,6 +646,7 @@ class IsolateNatives { |
// the port (port deserialization is sensitive to what is the current |
// workerId). |
'replyTo': _serializeMessage(replyPort), |
+ 'msg': message, |
'functionName': functionName })); |
} |
} |
@@ -668,22 +671,7 @@ class _BaseSendPort implements SendPort { |
} |
} |
- Future call(var message) { |
- final completer = new Completer(); |
- final port = new ReceivePortImpl(); |
- send(message, port.toSendPort()); |
- port.receive((value, ignoreReplyTo) { |
- port.close(); |
- if (value is Exception) { |
- completer.completeError(value); |
- } else { |
- completer.complete(value); |
- } |
- }); |
- return completer.future; |
- } |
- |
- void send(var message, [SendPort replyTo]); |
+ void send(var message); |
bool operator ==(var other); |
int get hashCode; |
} |
@@ -694,13 +682,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
- void send(var message, [SendPort replyTo = null]) { |
- _waitForPendingPorts([message, replyTo], () { |
- _checkReplyTo(replyTo); |
+ void send(var message, [SendPort replyTo]) { |
+ _waitForPendingPorts(message, () { |
// Check that the isolate still runs and the port is still open |
final isolate = _globalState.isolates[_isolateId]; |
if (isolate == null) return; |
- if (_receivePort._callback == null) return; |
+ if (_receivePort._controller.isClosed) return; |
// We force serialization/deserialization as a simple way to ensure |
// isolate communication restrictions are respected between isolates that |
@@ -712,18 +699,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
final shouldSerialize = _globalState.currentContext != null |
&& _globalState.currentContext.id != _isolateId; |
var msg = message; |
- var reply = replyTo; |
if (shouldSerialize) { |
msg = _serializeMessage(msg); |
- reply = _serializeMessage(reply); |
} |
_globalState.topEventLoop.enqueue(isolate, () { |
- if (_receivePort._callback != null) { |
+ if (!_receivePort._controller.isClosed) { |
if (shouldSerialize) { |
msg = _deserializeMessage(msg); |
- reply = _deserializeMessage(reply); |
} |
- _receivePort._callback(msg, reply); |
+ _receivePort._controller.add(msg); |
} |
}, 'receive $message'); |
}); |
@@ -744,14 +728,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort { |
const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
: super(isolateId); |
- void send(var message, [SendPort replyTo = null]) { |
- _waitForPendingPorts([message, replyTo], () { |
- _checkReplyTo(replyTo); |
+ void send(var message, [SendPort replyTo]) { |
+ _waitForPendingPorts(message, () { |
final workerMessage = _serializeMessage({ |
'command': 'message', |
'port': this, |
- 'msg': message, |
- 'replyTo': replyTo}); |
+ 'msg': message}); |
if (_globalState.isWorker) { |
// Communication from one worker to another go through the |
@@ -806,7 +788,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
_futurePort.then((p) { |
_port = p; |
for (final item in pending) { |
- p.send(item['message'], item['replyTo']); |
+ p.send(item); |
} |
pending = null; |
}); |
@@ -821,7 +803,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
if (_port != null) { |
_port.send(message, replyTo); |
} else { |
- pending.add({'message': message, 'replyTo': replyTo}); |
+ pending.add(message); |
} |
} |
@@ -831,26 +813,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
} |
/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
-class ReceivePortImpl implements ReceivePort { |
- int _id; |
- Function _callback; |
+class ReceivePortImpl extends Stream implements ReceivePort { |
static int _nextFreeId = 1; |
+ final int _id; |
+ StreamController _controller; |
ReceivePortImpl() |
: _id = _nextFreeId++ { |
+ _controller = new StreamController(onCancel: close, sync: true); |
_globalState.currentContext.register(_id, this); |
} |
- void receive(void onMessage(var message, SendPort replyTo)) { |
- _callback = onMessage; |
+ StreamSubscription listen(void onData(var event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
+ cancelOnError: cancelOnError); |
} |
void close() { |
- _callback = null; |
+ if (_controller.isClosed) return; |
+ _controller.close(); |
_globalState.currentContext.unregister(_id); |
} |
- SendPort toSendPort() { |
+ SendPort get sendPort { |
return new _NativeJsSendPort(this, _globalState.currentContext.id); |
} |
} |
@@ -876,9 +864,7 @@ class _PendingSendPortFinder extends _MessageTraverser { |
final seen = _visited[list]; |
if (seen != null) return; |
_visited[list] = true; |
- // TODO(sigmund): replace with the following: (bug #1660) |
- // list.forEach(_dispatch); |
- list.forEach((e) => _dispatch(e)); |
+ list.forEach(_dispatch); |
} |
visitMap(Map map) { |
@@ -886,9 +872,7 @@ class _PendingSendPortFinder extends _MessageTraverser { |
if (seen != null) return; |
_visited[map] = true; |
- // TODO(sigmund): replace with the following: (bug #1660) |
- // map.values.forEach(_dispatch); |
- map.values.forEach((e) => _dispatch(e)); |
+ map.values.forEach(_dispatch); |
} |
visitSendPort(var port) { |