Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(140)

Unified Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 40023002: Make dart2js Isolate.spawn work. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/lib/isolate_helper.dart
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
index 8359a30042ffec4faec87402a1da0477f0c11e03..d4d6c70bbf1edb23f371967622cc1529f2bd4a3e 100644
--- a/sdk/lib/_internal/lib/isolate_helper.dart
+++ b/sdk/lib/_internal/lib/isolate_helper.dart
@@ -471,7 +471,7 @@ class IsolateNatives {
var replyTo = _deserializeMessage(msg['replyTo']);
var context = new _IsolateContext();
_globalState.topEventLoop.enqueue(context, () {
- _startIsolate(entryPoint, replyTo);
+ _startIsolate(entryPoint, null, replyTo);
}, 'worker-start');
// Make sure we always have a current context in this worker.
// TODO(7907): This is currently needed because we're using
@@ -483,13 +483,14 @@ class IsolateNatives {
_globalState.topEventLoop.run();
break;
case 'spawn-worker':
- _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']);
+ _spawnWorker(msg['functionName'], msg['uri'],
+ msg['msg'], msg['replyPort']);
break;
case 'message':
SendPort port = msg['port'];
// If the port has been closed, we ignore the message.
if (port != null) {
- msg['port'].send(msg['msg'], msg['replyTo']);
+ msg['port'].send(msg['msg']);
}
_globalState.topEventLoop.run();
break;
@@ -550,19 +551,19 @@ class IsolateNatives {
return JS("", "new #()", ctor);
}
- static SendPort spawnFunction(void topLevelFunction()) {
+ static SendPort spawnFunction(void topLevelFunction(message), message) {
final name = _getJSFunctionName(topLevelFunction);
if (name == null) {
throw new UnsupportedError(
"only top-level functions can be spawned.");
}
- return spawn(name, null, false);
+ return spawn(name, null, message, false);
}
// TODO(sigmund): clean up above, after we make the new API the default:
/// If [uri] is `null` it is replaced with the current script.
- static spawn(String functionName, String uri, bool isLight) {
+ static spawn(String functionName, String uri, message, bool isLight) {
// Assume that the compiled version of the Dart file lives just next to the
// dart file.
// TODO(floitsch): support precompiled version of dart2js output.
@@ -570,60 +571,61 @@ class IsolateNatives {
Completer<SendPort> completer = new Completer<SendPort>.sync();
ReceivePort port = new ReceivePort();
- port.receive((msg, SendPort replyPort) {
+ port.listen((msg) {
port.close();
- assert(msg == _SPAWNED_SIGNAL);
- completer.complete(replyPort);
+ assert(msg[0] == _SPAWNED_SIGNAL);
+ completer.complete(msg[1]);
});
- SendPort signalReply = port.toSendPort();
+ SendPort signalReply = port.sendPort;
if (_globalState.useWorkers && !isLight) {
- _startWorker(functionName, uri, signalReply);
+ _startWorker(functionName, uri, message, signalReply);
} else {
- _startNonWorker(functionName, uri, signalReply);
+ _startNonWorker(functionName, uri, message, signalReply);
}
return new _BufferingSendPort(
_globalState.currentContext.id, completer.future);
}
static SendPort _startWorker(
- String functionName, String uri, SendPort replyPort) {
+ String functionName, String uri, message, SendPort replyPort) {
if (_globalState.isWorker) {
_globalState.mainManager.postMessage(_serializeMessage({
'command': 'spawn-worker',
'functionName': functionName,
+ 'msg': message,
'uri': uri,
'replyPort': replyPort}));
} else {
- _spawnWorker(functionName, uri, replyPort);
+ _spawnWorker(functionName, uri, message, replyPort);
}
}
static SendPort _startNonWorker(
- String functionName, String uri, SendPort replyPort) {
+ String functionName, String uri, message, SendPort replyPort) {
// TODO(eub): support IE9 using an iframe -- Dart issue 1702.
if (uri != null) throw new UnsupportedError(
"Currently spawnUri is not supported without web workers.");
_globalState.topEventLoop.enqueue(new _IsolateContext(), () {
final func = _getJSFunctionFromName(functionName);
- _startIsolate(func, replyPort);
+ _startIsolate(func, message, replyPort);
}, 'nonworker start');
}
- static void _startIsolate(Function topLevel, SendPort replyTo) {
+ static void _startIsolate(Function topLevel, message, SendPort replyTo) {
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
Primitives.initializeStatics(context.id);
lazyPort = new ReceivePort();
- replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
- topLevel();
+ replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]);
+ topLevel(message);
}
/**
* Spawns an isolate in a worker. [factoryName] is the Javascript constructor
* name for the isolate entry point class.
*/
- static void _spawnWorker(functionName, uri, replyPort) {
+ static void _spawnWorker(functionName, uri, message, replyPort) {
if (uri == null) uri = thisScript;
final worker = JS('var', 'new Worker(#)', uri);
@@ -644,6 +646,7 @@ class IsolateNatives {
// the port (port deserialization is sensitive to what is the current
// workerId).
'replyTo': _serializeMessage(replyPort),
+ 'msg': message,
'functionName': functionName }));
}
}
@@ -668,22 +671,7 @@ class _BaseSendPort implements SendPort {
}
}
- Future call(var message) {
- final completer = new Completer();
- final port = new ReceivePortImpl();
- send(message, port.toSendPort());
- port.receive((value, ignoreReplyTo) {
- port.close();
- if (value is Exception) {
- completer.completeError(value);
- } else {
- completer.complete(value);
- }
- });
- return completer.future;
- }
-
- void send(var message, [SendPort replyTo]);
+ void send(var message);
bool operator ==(var other);
int get hashCode;
}
@@ -694,13 +682,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- _checkReplyTo(replyTo);
+ void send(var message, [SendPort replyTo]) {
+ _waitForPendingPorts(message, () {
// Check that the isolate still runs and the port is still open
final isolate = _globalState.isolates[_isolateId];
if (isolate == null) return;
- if (_receivePort._callback == null) return;
+ if (_receivePort._controller.isClosed) return;
// We force serialization/deserialization as a simple way to ensure
// isolate communication restrictions are respected between isolates that
@@ -712,18 +699,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
final shouldSerialize = _globalState.currentContext != null
&& _globalState.currentContext.id != _isolateId;
var msg = message;
- var reply = replyTo;
if (shouldSerialize) {
msg = _serializeMessage(msg);
- reply = _serializeMessage(reply);
}
_globalState.topEventLoop.enqueue(isolate, () {
- if (_receivePort._callback != null) {
+ if (!_receivePort._controller.isClosed) {
if (shouldSerialize) {
msg = _deserializeMessage(msg);
- reply = _deserializeMessage(reply);
}
- _receivePort._callback(msg, reply);
+ _receivePort._controller.add(msg);
}
}, 'receive $message');
});
@@ -744,14 +728,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
: super(isolateId);
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- _checkReplyTo(replyTo);
+ void send(var message, [SendPort replyTo]) {
+ _waitForPendingPorts(message, () {
final workerMessage = _serializeMessage({
'command': 'message',
'port': this,
- 'msg': message,
- 'replyTo': replyTo});
+ 'msg': message});
if (_globalState.isWorker) {
// Communication from one worker to another go through the
@@ -806,7 +788,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
_futurePort.then((p) {
_port = p;
for (final item in pending) {
- p.send(item['message'], item['replyTo']);
+ p.send(item);
}
pending = null;
});
@@ -821,7 +803,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
if (_port != null) {
_port.send(message, replyTo);
} else {
- pending.add({'message': message, 'replyTo': replyTo});
+ pending.add(message);
}
}
@@ -831,26 +813,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
}
/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
-class ReceivePortImpl implements ReceivePort {
- int _id;
- Function _callback;
+class ReceivePortImpl extends Stream implements ReceivePort {
static int _nextFreeId = 1;
+ final int _id;
+ StreamController _controller;
ReceivePortImpl()
: _id = _nextFreeId++ {
+ _controller = new StreamController(onCancel: close, sync: true);
_globalState.currentContext.register(_id, this);
}
- void receive(void onMessage(var message, SendPort replyTo)) {
- _callback = onMessage;
+ StreamSubscription listen(void onData(var event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
}
void close() {
- _callback = null;
+ if (_controller.isClosed) return;
+ _controller.close();
_globalState.currentContext.unregister(_id);
}
- SendPort toSendPort() {
+ SendPort get sendPort {
return new _NativeJsSendPort(this, _globalState.currentContext.id);
}
}
@@ -876,9 +864,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
final seen = _visited[list];
if (seen != null) return;
_visited[list] = true;
- // TODO(sigmund): replace with the following: (bug #1660)
- // list.forEach(_dispatch);
- list.forEach((e) => _dispatch(e));
+ list.forEach(_dispatch);
}
visitMap(Map map) {
@@ -886,9 +872,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
if (seen != null) return;
_visited[map] = true;
- // TODO(sigmund): replace with the following: (bug #1660)
- // map.values.forEach(_dispatch);
- map.values.forEach((e) => _dispatch(e));
+ map.values.forEach(_dispatch);
}
visitSendPort(var port) {
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698