Index: sdk/lib/_internal/lib/isolate_helper.dart |
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
index 7d9e722c046e07d9b4576ac114ec2d9bf0e47e94..a28b31ece10b72531551aab0c3e3460f78885e52 100644 |
--- a/sdk/lib/_internal/lib/isolate_helper.dart |
+++ b/sdk/lib/_internal/lib/isolate_helper.dart |
@@ -22,38 +22,6 @@ import 'dart:_interceptors' show JSExtendableArray; |
ReceivePort lazyPort; |
-class CloseToken { |
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to |
- /// close themselves. |
- const CloseToken(); |
-} |
- |
-class JsIsolateSink extends EventSink<dynamic> implements IsolateSink { |
- bool _isClosed = false; |
- final SendPort _port; |
- JsIsolateSink.fromPort(this._port); |
- |
- void add(dynamic message) { |
- _port.send(message); |
- } |
- |
- void addError(errorEvent) { |
- throw new UnimplementedError("addError on isolate streams"); |
- } |
- |
- void close() { |
- if (_isClosed) return; |
- add(const CloseToken()); |
- _isClosed = true; |
- } |
- |
- bool operator==(var other) { |
- return other is IsolateSink && _port == other._port; |
- } |
- |
- int get hashCode => _port.hashCode + 499; |
-} |
- |
/** |
* Called by the compiler to support switching |
* between isolates when we get a callback from the DOM. |
@@ -90,7 +58,13 @@ void startRootIsolate(entry) { |
// by having a "default" isolate (the first one created). |
_globalState.currentContext = rootContext; |
- rootContext.eval(entry); |
+ if (entry is _MainFunctionArgs) { |
+ rootContext.eval(() { entry([]); }); |
+ } else if (entry is _MainFunctionArgsMessage) { |
+ rootContext.eval(() { entry([], null); }); |
+ } else { |
+ rootContext.eval(entry); |
+ } |
_globalState.topEventLoop.run(); |
} |
@@ -417,6 +391,10 @@ var globalWorker = JS('', "#.Worker", globalThis); |
bool globalPostMessageDefined = |
JS('', "#.postMessage !== (void 0)", globalThis); |
+typedef _MainFunction(); |
+typedef _MainFunctionArgs(args); |
+typedef _MainFunctionArgsMessage(args, message); |
+ |
class IsolateNatives { |
static String thisScript = computeThisScript(); |
@@ -500,10 +478,13 @@ class IsolateNatives { |
Function entryPoint = (functionName == null) |
? _globalState.entry |
: _getJSFunctionFromName(functionName); |
+ var args = msg['args']; |
+ var message = _deserializeMessage(msg['msg']); |
+ var isSpawnUri = msg['isSpawnUri']; |
var replyTo = _deserializeMessage(msg['replyTo']); |
var context = new _IsolateContext(); |
_globalState.topEventLoop.enqueue(context, () { |
- _startIsolate(entryPoint, replyTo); |
+ _startIsolate(entryPoint, args, message, isSpawnUri, replyTo); |
}, 'worker-start'); |
// Make sure we always have a current context in this worker. |
// TODO(7907): This is currently needed because we're using |
@@ -515,13 +496,15 @@ class IsolateNatives { |
_globalState.topEventLoop.run(); |
break; |
case 'spawn-worker': |
- _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']); |
+ _spawnWorker(msg['functionName'], msg['uri'], |
+ msg['args'], msg['msg'], |
+ msg['isSpawnUri'], msg['replyPort']); |
break; |
case 'message': |
SendPort port = msg['port']; |
// If the port has been closed, we ignore the message. |
if (port != null) { |
- msg['port'].send(msg['msg'], msg['replyTo']); |
+ msg['port'].send(msg['msg']); |
} |
_globalState.topEventLoop.run(); |
break; |
@@ -582,19 +565,25 @@ class IsolateNatives { |
return JS("", "new #()", ctor); |
} |
- static SendPort spawnFunction(void topLevelFunction()) { |
+ static SendPort spawnFunction(void topLevelFunction(message), message) { |
final name = _getJSFunctionName(topLevelFunction); |
if (name == null) { |
throw new UnsupportedError( |
"only top-level functions can be spawned."); |
} |
- return spawn(name, null, false); |
+ return spawn(name, null, null, message, false, false); |
+ } |
+ |
+ static SendPort spawnUri(Uri uri, List<String> args, message) { |
+ return spawn(null, uri.path, args, message, false, true); |
} |
// TODO(sigmund): clean up above, after we make the new API the default: |
/// If [uri] is `null` it is replaced with the current script. |
- static spawn(String functionName, String uri, bool isLight) { |
+ static SendPort spawn(String functionName, String uri, |
+ List<String> args, message, |
+ bool isLight, bool isSpawnUri) { |
// Assume that the compiled version of the Dart file lives just next to the |
// dart file. |
// TODO(floitsch): support precompiled version of dart2js output. |
@@ -602,60 +591,86 @@ class IsolateNatives { |
Completer<SendPort> completer = new Completer<SendPort>.sync(); |
ReceivePort port = new ReceivePort(); |
- port.receive((msg, SendPort replyPort) { |
+ port.listen((msg) { |
port.close(); |
- assert(msg == _SPAWNED_SIGNAL); |
- completer.complete(replyPort); |
+ assert(msg[0] == _SPAWNED_SIGNAL); |
+ completer.complete(msg[1]); |
}); |
- SendPort signalReply = port.toSendPort(); |
+ SendPort signalReply = port.sendPort; |
if (_globalState.useWorkers && !isLight) { |
- _startWorker(functionName, uri, signalReply); |
+ _startWorker(functionName, uri, args, message, isSpawnUri, signalReply); |
} else { |
- _startNonWorker(functionName, uri, signalReply); |
+ _startNonWorker( |
+ functionName, uri, args, message, isSpawnUri, signalReply); |
} |
return new _BufferingSendPort( |
_globalState.currentContext.id, completer.future); |
} |
- static SendPort _startWorker( |
- String functionName, String uri, SendPort replyPort) { |
+ static void _startWorker( |
+ String functionName, String uri, |
+ List<String> args, message, |
+ bool isSpawnUri, |
+ SendPort replyPort) { |
if (_globalState.isWorker) { |
_globalState.mainManager.postMessage(_serializeMessage({ |
'command': 'spawn-worker', |
'functionName': functionName, |
+ 'args': args, |
+ 'msg': message, |
'uri': uri, |
+ 'isSpawnUri': isSpawnUri, |
'replyPort': replyPort})); |
} else { |
- _spawnWorker(functionName, uri, replyPort); |
+ _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort); |
} |
} |
- static SendPort _startNonWorker( |
- String functionName, String uri, SendPort replyPort) { |
+ static void _startNonWorker( |
+ String functionName, String uri, |
+ List<String> args, message, |
+ bool isSpawnUri, |
+ SendPort replyPort) { |
// TODO(eub): support IE9 using an iframe -- Dart issue 1702. |
- if (uri != null) throw new UnsupportedError( |
- "Currently spawnUri is not supported without web workers."); |
+ if (uri != null) { |
+ throw new UnsupportedError( |
+ "Currently spawnUri is not supported without web workers."); |
+ } |
_globalState.topEventLoop.enqueue(new _IsolateContext(), () { |
final func = _getJSFunctionFromName(functionName); |
- _startIsolate(func, replyPort); |
+ _startIsolate(func, args, message, isSpawnUri, replyPort); |
}, 'nonworker start'); |
} |
- static void _startIsolate(Function topLevel, SendPort replyTo) { |
+ static void _startIsolate(Function topLevel, |
+ List<String> args, message, |
+ bool isSpawnUri, |
+ SendPort replyTo) { |
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); |
Primitives.initializeStatics(context.id); |
lazyPort = new ReceivePort(); |
- replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); |
- topLevel(); |
+ replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]); |
+ if (!isSpawnUri) { |
+ topLevel(message); |
+ } else if (topLevel is _MainFunctionArgsMessage) { |
+ topLevel(args, message); |
+ } else if (topLevel is _MainFunctionArgs) { |
+ topLevel(args); |
+ } else { |
+ topLevel(); |
+ } |
} |
/** |
* Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
* name for the isolate entry point class. |
*/ |
- static void _spawnWorker(functionName, uri, replyPort) { |
+ static void _spawnWorker(functionName, String uri, |
+ List<String> args, message, |
+ bool isSpawnUri, |
+ SendPort replyPort) { |
if (uri == null) uri = thisScript; |
final worker = JS('var', 'new Worker(#)', uri); |
@@ -676,6 +691,9 @@ class IsolateNatives { |
// the port (port deserialization is sensitive to what is the current |
// workerId). |
'replyTo': _serializeMessage(replyPort), |
+ 'args': args, |
+ 'msg': _serializeMessage(message), |
+ 'isSpawnUri': isSpawnUri, |
'functionName': functionName })); |
} |
} |
@@ -700,22 +718,7 @@ class _BaseSendPort implements SendPort { |
} |
} |
- Future call(var message) { |
- final completer = new Completer(); |
- final port = new ReceivePortImpl(); |
- send(message, port.toSendPort()); |
- port.receive((value, ignoreReplyTo) { |
- port.close(); |
- if (value is Exception) { |
- completer.completeError(value); |
- } else { |
- completer.complete(value); |
- } |
- }); |
- return completer.future; |
- } |
- |
- void send(var message, [SendPort replyTo]); |
+ void send(var message); |
bool operator ==(var other); |
int get hashCode; |
} |
@@ -726,13 +729,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
- void send(var message, [SendPort replyTo = null]) { |
- _waitForPendingPorts([message, replyTo], () { |
- _checkReplyTo(replyTo); |
+ void send(var message, [SendPort replyTo]) { |
+ _waitForPendingPorts(message, () { |
// Check that the isolate still runs and the port is still open |
final isolate = _globalState.isolates[_isolateId]; |
if (isolate == null) return; |
- if (_receivePort._callback == null) return; |
+ if (_receivePort._controller.isClosed) return; |
// We force serialization/deserialization as a simple way to ensure |
// isolate communication restrictions are respected between isolates that |
@@ -744,18 +746,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
final shouldSerialize = _globalState.currentContext != null |
&& _globalState.currentContext.id != _isolateId; |
var msg = message; |
- var reply = replyTo; |
if (shouldSerialize) { |
msg = _serializeMessage(msg); |
- reply = _serializeMessage(reply); |
} |
_globalState.topEventLoop.enqueue(isolate, () { |
- if (_receivePort._callback != null) { |
+ if (!_receivePort._controller.isClosed) { |
if (shouldSerialize) { |
msg = _deserializeMessage(msg); |
- reply = _deserializeMessage(reply); |
} |
- _receivePort._callback(msg, reply); |
+ _receivePort._controller.add(msg); |
} |
}, 'receive $message'); |
}); |
@@ -776,14 +775,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort { |
const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
: super(isolateId); |
- void send(var message, [SendPort replyTo = null]) { |
- _waitForPendingPorts([message, replyTo], () { |
- _checkReplyTo(replyTo); |
+ void send(var message, [SendPort replyTo]) { |
+ _waitForPendingPorts(message, () { |
final workerMessage = _serializeMessage({ |
'command': 'message', |
'port': this, |
- 'msg': message, |
- 'replyTo': replyTo}); |
+ 'msg': message}); |
if (_globalState.isWorker) { |
// Communication from one worker to another go through the |
@@ -838,7 +835,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
_futurePort.then((p) { |
_port = p; |
for (final item in pending) { |
- p.send(item['message'], item['replyTo']); |
+ p.send(item); |
} |
pending = null; |
}); |
@@ -853,7 +850,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
if (_port != null) { |
_port.send(message, replyTo); |
} else { |
- pending.add({'message': message, 'replyTo': replyTo}); |
+ pending.add(message); |
} |
} |
@@ -863,26 +860,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
} |
/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
-class ReceivePortImpl implements ReceivePort { |
- int _id; |
- Function _callback; |
+class ReceivePortImpl extends Stream implements ReceivePort { |
static int _nextFreeId = 1; |
+ final int _id; |
+ StreamController _controller; |
ReceivePortImpl() |
: _id = _nextFreeId++ { |
+ _controller = new StreamController(onCancel: close, sync: true); |
_globalState.currentContext.register(_id, this); |
} |
- void receive(void onMessage(var message, SendPort replyTo)) { |
- _callback = onMessage; |
+ StreamSubscription listen(void onData(var event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
+ cancelOnError: cancelOnError); |
} |
void close() { |
- _callback = null; |
+ if (_controller.isClosed) return; |
+ _controller.close(); |
_globalState.currentContext.unregister(_id); |
} |
- SendPort toSendPort() { |
+ SendPort get sendPort { |
return new _NativeJsSendPort(this, _globalState.currentContext.id); |
} |
} |
@@ -908,9 +911,7 @@ class _PendingSendPortFinder extends _MessageTraverser { |
final seen = _visited[list]; |
if (seen != null) return; |
_visited[list] = true; |
- // TODO(sigmund): replace with the following: (bug #1660) |
- // list.forEach(_dispatch); |
- list.forEach((e) => _dispatch(e)); |
+ list.forEach(_dispatch); |
} |
visitMap(Map map) { |
@@ -918,9 +919,7 @@ class _PendingSendPortFinder extends _MessageTraverser { |
if (seen != null) return; |
_visited[map] = true; |
- // TODO(sigmund): replace with the following: (bug #1660) |
- // map.values.forEach(_dispatch); |
- map.values.forEach((e) => _dispatch(e)); |
+ map.values.forEach(_dispatch); |
} |
visitSendPort(var port) { |
@@ -928,14 +927,6 @@ class _PendingSendPortFinder extends _MessageTraverser { |
ports.add(port._futurePort); |
} |
} |
- |
- visitIsolateSink(JsIsolateSink sink) { |
- visitSendPort(sink._port); |
- } |
- |
- visitCloseToken(CloseToken token) { |
- // Do nothing. |
- } |
} |
/******************************************************** |
@@ -993,16 +984,6 @@ class _JsSerializer extends _Serializer { |
" ports are resolved at this point."; |
} |
} |
- |
- visitIsolateSink(JsIsolateSink sink) { |
- SendPort port = sink._port; |
- bool isClosed = sink._isClosed; |
- return ['isolateSink', visitSendPort(port), isClosed]; |
- } |
- |
- visitCloseToken(CloseToken token) { |
- return ['closeToken']; |
- } |
} |
@@ -1036,18 +1017,6 @@ class _JsCopier extends _Copier { |
" ports are resolved at this point."; |
} |
} |
- |
- IsolateSink visitIsolateSink(JsIsolateSink sink) { |
- SendPort port = sink._port; |
- bool isClosed = sink._isClosed; |
- JsIsolateSink result = new JsIsolateSink.fromPort(visitSendPort(port)); |
- result._isClosed = isClosed; |
- return result; |
- } |
- |
- CloseToken visitCloseToken(CloseToken token) { |
- return token; // Can be shared. |
- } |
} |
class _JsDeserializer extends _Deserializer { |
@@ -1068,18 +1037,6 @@ class _JsDeserializer extends _Deserializer { |
return new _WorkerSendPort(managerId, isolateId, receivePortId); |
} |
} |
- |
- IsolateSink deserializeIsolateSink(List list) { |
- SendPort port = deserializeSendPort(list[1]); |
- bool isClosed = list[2]; |
- JsIsolateSink result = new JsIsolateSink.fromPort(port); |
- result._isClosed = isClosed; |
- return result; |
- } |
- |
- CloseToken deserializeCloseToken(List list) { |
- return const CloseToken(); |
- } |
} |
class _JsVisitedMap implements _MessageTraverserVisitedMap { |
@@ -1177,8 +1134,6 @@ class _MessageTraverser { |
if (x is Map) return visitMap(x); |
if (x is SendPort) return visitSendPort(x); |
if (x is SendPortSync) return visitSendPortSync(x); |
- if (x is JsIsolateSink) return visitIsolateSink(x); |
- if (x is CloseToken) return visitCloseToken(x); |
// Overridable fallback. |
return visitObject(x); |
@@ -1189,8 +1144,6 @@ class _MessageTraverser { |
visitMap(Map x); |
visitSendPort(SendPort x); |
visitSendPortSync(SendPortSync x); |
- visitIsolateSink(IsolateSink x); |
- visitCloseToken(CloseToken x); |
visitObject(Object x) { |
// TODO(floitsch): make this a real exception. (which one)? |
@@ -1302,8 +1255,6 @@ class _Deserializer { |
case 'list': return _deserializeList(x); |
case 'map': return _deserializeMap(x); |
case 'sendport': return deserializeSendPort(x); |
- case 'isolateSink': return deserializeIsolateSink(x); |
- case 'closeToken': return deserializeCloseToken(x); |
default: return deserializeObject(x); |
} |
} |
@@ -1345,10 +1296,6 @@ class _Deserializer { |
deserializeSendPort(List x); |
- deserializeIsolateSink(List x); |
- |
- deserializeCloseToken(List x); |
- |
deserializeObject(List x) { |
// TODO(floitsch): Use real exception (which one?). |
throw "Unexpected serialized object"; |