Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Unified Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 27215002: Very simple version of Isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/_internal/lib/io_patch.dart ('k') | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/lib/isolate_helper.dart
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
index 7d9e722c046e07d9b4576ac114ec2d9bf0e47e94..a28b31ece10b72531551aab0c3e3460f78885e52 100644
--- a/sdk/lib/_internal/lib/isolate_helper.dart
+++ b/sdk/lib/_internal/lib/isolate_helper.dart
@@ -22,38 +22,6 @@ import 'dart:_interceptors' show JSExtendableArray;
ReceivePort lazyPort;
-class CloseToken {
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
- /// close themselves.
- const CloseToken();
-}
-
-class JsIsolateSink extends EventSink<dynamic> implements IsolateSink {
- bool _isClosed = false;
- final SendPort _port;
- JsIsolateSink.fromPort(this._port);
-
- void add(dynamic message) {
- _port.send(message);
- }
-
- void addError(errorEvent) {
- throw new UnimplementedError("addError on isolate streams");
- }
-
- void close() {
- if (_isClosed) return;
- add(const CloseToken());
- _isClosed = true;
- }
-
- bool operator==(var other) {
- return other is IsolateSink && _port == other._port;
- }
-
- int get hashCode => _port.hashCode + 499;
-}
-
/**
* Called by the compiler to support switching
* between isolates when we get a callback from the DOM.
@@ -90,7 +58,13 @@ void startRootIsolate(entry) {
// by having a "default" isolate (the first one created).
_globalState.currentContext = rootContext;
- rootContext.eval(entry);
+ if (entry is _MainFunctionArgs) {
+ rootContext.eval(() { entry([]); });
+ } else if (entry is _MainFunctionArgsMessage) {
+ rootContext.eval(() { entry([], null); });
+ } else {
+ rootContext.eval(entry);
+ }
_globalState.topEventLoop.run();
}
@@ -417,6 +391,10 @@ var globalWorker = JS('', "#.Worker", globalThis);
bool globalPostMessageDefined =
JS('', "#.postMessage !== (void 0)", globalThis);
+typedef _MainFunction();
+typedef _MainFunctionArgs(args);
+typedef _MainFunctionArgsMessage(args, message);
+
class IsolateNatives {
static String thisScript = computeThisScript();
@@ -500,10 +478,13 @@ class IsolateNatives {
Function entryPoint = (functionName == null)
? _globalState.entry
: _getJSFunctionFromName(functionName);
+ var args = msg['args'];
+ var message = _deserializeMessage(msg['msg']);
+ var isSpawnUri = msg['isSpawnUri'];
var replyTo = _deserializeMessage(msg['replyTo']);
var context = new _IsolateContext();
_globalState.topEventLoop.enqueue(context, () {
- _startIsolate(entryPoint, replyTo);
+ _startIsolate(entryPoint, args, message, isSpawnUri, replyTo);
}, 'worker-start');
// Make sure we always have a current context in this worker.
// TODO(7907): This is currently needed because we're using
@@ -515,13 +496,15 @@ class IsolateNatives {
_globalState.topEventLoop.run();
break;
case 'spawn-worker':
- _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']);
+ _spawnWorker(msg['functionName'], msg['uri'],
+ msg['args'], msg['msg'],
+ msg['isSpawnUri'], msg['replyPort']);
break;
case 'message':
SendPort port = msg['port'];
// If the port has been closed, we ignore the message.
if (port != null) {
- msg['port'].send(msg['msg'], msg['replyTo']);
+ msg['port'].send(msg['msg']);
}
_globalState.topEventLoop.run();
break;
@@ -582,19 +565,25 @@ class IsolateNatives {
return JS("", "new #()", ctor);
}
- static SendPort spawnFunction(void topLevelFunction()) {
+ static SendPort spawnFunction(void topLevelFunction(message), message) {
final name = _getJSFunctionName(topLevelFunction);
if (name == null) {
throw new UnsupportedError(
"only top-level functions can be spawned.");
}
- return spawn(name, null, false);
+ return spawn(name, null, null, message, false, false);
+ }
+
+ static SendPort spawnUri(Uri uri, List<String> args, message) {
+ return spawn(null, uri.path, args, message, false, true);
}
// TODO(sigmund): clean up above, after we make the new API the default:
/// If [uri] is `null` it is replaced with the current script.
- static spawn(String functionName, String uri, bool isLight) {
+ static SendPort spawn(String functionName, String uri,
+ List<String> args, message,
+ bool isLight, bool isSpawnUri) {
// Assume that the compiled version of the Dart file lives just next to the
// dart file.
// TODO(floitsch): support precompiled version of dart2js output.
@@ -602,60 +591,86 @@ class IsolateNatives {
Completer<SendPort> completer = new Completer<SendPort>.sync();
ReceivePort port = new ReceivePort();
- port.receive((msg, SendPort replyPort) {
+ port.listen((msg) {
port.close();
- assert(msg == _SPAWNED_SIGNAL);
- completer.complete(replyPort);
+ assert(msg[0] == _SPAWNED_SIGNAL);
+ completer.complete(msg[1]);
});
- SendPort signalReply = port.toSendPort();
+ SendPort signalReply = port.sendPort;
if (_globalState.useWorkers && !isLight) {
- _startWorker(functionName, uri, signalReply);
+ _startWorker(functionName, uri, args, message, isSpawnUri, signalReply);
} else {
- _startNonWorker(functionName, uri, signalReply);
+ _startNonWorker(
+ functionName, uri, args, message, isSpawnUri, signalReply);
}
return new _BufferingSendPort(
_globalState.currentContext.id, completer.future);
}
- static SendPort _startWorker(
- String functionName, String uri, SendPort replyPort) {
+ static void _startWorker(
+ String functionName, String uri,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyPort) {
if (_globalState.isWorker) {
_globalState.mainManager.postMessage(_serializeMessage({
'command': 'spawn-worker',
'functionName': functionName,
+ 'args': args,
+ 'msg': message,
'uri': uri,
+ 'isSpawnUri': isSpawnUri,
'replyPort': replyPort}));
} else {
- _spawnWorker(functionName, uri, replyPort);
+ _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort);
}
}
- static SendPort _startNonWorker(
- String functionName, String uri, SendPort replyPort) {
+ static void _startNonWorker(
+ String functionName, String uri,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyPort) {
// TODO(eub): support IE9 using an iframe -- Dart issue 1702.
- if (uri != null) throw new UnsupportedError(
- "Currently spawnUri is not supported without web workers.");
+ if (uri != null) {
+ throw new UnsupportedError(
+ "Currently spawnUri is not supported without web workers.");
+ }
_globalState.topEventLoop.enqueue(new _IsolateContext(), () {
final func = _getJSFunctionFromName(functionName);
- _startIsolate(func, replyPort);
+ _startIsolate(func, args, message, isSpawnUri, replyPort);
}, 'nonworker start');
}
- static void _startIsolate(Function topLevel, SendPort replyTo) {
+ static void _startIsolate(Function topLevel,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyTo) {
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
Primitives.initializeStatics(context.id);
lazyPort = new ReceivePort();
- replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
- topLevel();
+ replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]);
+ if (!isSpawnUri) {
+ topLevel(message);
+ } else if (topLevel is _MainFunctionArgsMessage) {
+ topLevel(args, message);
+ } else if (topLevel is _MainFunctionArgs) {
+ topLevel(args);
+ } else {
+ topLevel();
+ }
}
/**
* Spawns an isolate in a worker. [factoryName] is the Javascript constructor
* name for the isolate entry point class.
*/
- static void _spawnWorker(functionName, uri, replyPort) {
+ static void _spawnWorker(functionName, String uri,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyPort) {
if (uri == null) uri = thisScript;
final worker = JS('var', 'new Worker(#)', uri);
@@ -676,6 +691,9 @@ class IsolateNatives {
// the port (port deserialization is sensitive to what is the current
// workerId).
'replyTo': _serializeMessage(replyPort),
+ 'args': args,
+ 'msg': _serializeMessage(message),
+ 'isSpawnUri': isSpawnUri,
'functionName': functionName }));
}
}
@@ -700,22 +718,7 @@ class _BaseSendPort implements SendPort {
}
}
- Future call(var message) {
- final completer = new Completer();
- final port = new ReceivePortImpl();
- send(message, port.toSendPort());
- port.receive((value, ignoreReplyTo) {
- port.close();
- if (value is Exception) {
- completer.completeError(value);
- } else {
- completer.complete(value);
- }
- });
- return completer.future;
- }
-
- void send(var message, [SendPort replyTo]);
+ void send(var message);
bool operator ==(var other);
int get hashCode;
}
@@ -726,13 +729,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- _checkReplyTo(replyTo);
+ void send(var message, [SendPort replyTo]) {
+ _waitForPendingPorts(message, () {
// Check that the isolate still runs and the port is still open
final isolate = _globalState.isolates[_isolateId];
if (isolate == null) return;
- if (_receivePort._callback == null) return;
+ if (_receivePort._controller.isClosed) return;
// We force serialization/deserialization as a simple way to ensure
// isolate communication restrictions are respected between isolates that
@@ -744,18 +746,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
final shouldSerialize = _globalState.currentContext != null
&& _globalState.currentContext.id != _isolateId;
var msg = message;
- var reply = replyTo;
if (shouldSerialize) {
msg = _serializeMessage(msg);
- reply = _serializeMessage(reply);
}
_globalState.topEventLoop.enqueue(isolate, () {
- if (_receivePort._callback != null) {
+ if (!_receivePort._controller.isClosed) {
if (shouldSerialize) {
msg = _deserializeMessage(msg);
- reply = _deserializeMessage(reply);
}
- _receivePort._callback(msg, reply);
+ _receivePort._controller.add(msg);
}
}, 'receive $message');
});
@@ -776,14 +775,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
: super(isolateId);
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- _checkReplyTo(replyTo);
+ void send(var message, [SendPort replyTo]) {
+ _waitForPendingPorts(message, () {
final workerMessage = _serializeMessage({
'command': 'message',
'port': this,
- 'msg': message,
- 'replyTo': replyTo});
+ 'msg': message});
if (_globalState.isWorker) {
// Communication from one worker to another go through the
@@ -838,7 +835,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
_futurePort.then((p) {
_port = p;
for (final item in pending) {
- p.send(item['message'], item['replyTo']);
+ p.send(item);
}
pending = null;
});
@@ -853,7 +850,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
if (_port != null) {
_port.send(message, replyTo);
} else {
- pending.add({'message': message, 'replyTo': replyTo});
+ pending.add(message);
}
}
@@ -863,26 +860,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
}
/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
-class ReceivePortImpl implements ReceivePort {
- int _id;
- Function _callback;
+class ReceivePortImpl extends Stream implements ReceivePort {
static int _nextFreeId = 1;
+ final int _id;
+ StreamController _controller;
ReceivePortImpl()
: _id = _nextFreeId++ {
+ _controller = new StreamController(onCancel: close, sync: true);
_globalState.currentContext.register(_id, this);
}
- void receive(void onMessage(var message, SendPort replyTo)) {
- _callback = onMessage;
+ StreamSubscription listen(void onData(var event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
}
void close() {
- _callback = null;
+ if (_controller.isClosed) return;
+ _controller.close();
_globalState.currentContext.unregister(_id);
}
- SendPort toSendPort() {
+ SendPort get sendPort {
return new _NativeJsSendPort(this, _globalState.currentContext.id);
}
}
@@ -908,9 +911,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
final seen = _visited[list];
if (seen != null) return;
_visited[list] = true;
- // TODO(sigmund): replace with the following: (bug #1660)
- // list.forEach(_dispatch);
- list.forEach((e) => _dispatch(e));
+ list.forEach(_dispatch);
}
visitMap(Map map) {
@@ -918,9 +919,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
if (seen != null) return;
_visited[map] = true;
- // TODO(sigmund): replace with the following: (bug #1660)
- // map.values.forEach(_dispatch);
- map.values.forEach((e) => _dispatch(e));
+ map.values.forEach(_dispatch);
}
visitSendPort(var port) {
@@ -928,14 +927,6 @@ class _PendingSendPortFinder extends _MessageTraverser {
ports.add(port._futurePort);
}
}
-
- visitIsolateSink(JsIsolateSink sink) {
- visitSendPort(sink._port);
- }
-
- visitCloseToken(CloseToken token) {
- // Do nothing.
- }
}
/********************************************************
@@ -993,16 +984,6 @@ class _JsSerializer extends _Serializer {
" ports are resolved at this point.";
}
}
-
- visitIsolateSink(JsIsolateSink sink) {
- SendPort port = sink._port;
- bool isClosed = sink._isClosed;
- return ['isolateSink', visitSendPort(port), isClosed];
- }
-
- visitCloseToken(CloseToken token) {
- return ['closeToken'];
- }
}
@@ -1036,18 +1017,6 @@ class _JsCopier extends _Copier {
" ports are resolved at this point.";
}
}
-
- IsolateSink visitIsolateSink(JsIsolateSink sink) {
- SendPort port = sink._port;
- bool isClosed = sink._isClosed;
- JsIsolateSink result = new JsIsolateSink.fromPort(visitSendPort(port));
- result._isClosed = isClosed;
- return result;
- }
-
- CloseToken visitCloseToken(CloseToken token) {
- return token; // Can be shared.
- }
}
class _JsDeserializer extends _Deserializer {
@@ -1068,18 +1037,6 @@ class _JsDeserializer extends _Deserializer {
return new _WorkerSendPort(managerId, isolateId, receivePortId);
}
}
-
- IsolateSink deserializeIsolateSink(List list) {
- SendPort port = deserializeSendPort(list[1]);
- bool isClosed = list[2];
- JsIsolateSink result = new JsIsolateSink.fromPort(port);
- result._isClosed = isClosed;
- return result;
- }
-
- CloseToken deserializeCloseToken(List list) {
- return const CloseToken();
- }
}
class _JsVisitedMap implements _MessageTraverserVisitedMap {
@@ -1177,8 +1134,6 @@ class _MessageTraverser {
if (x is Map) return visitMap(x);
if (x is SendPort) return visitSendPort(x);
if (x is SendPortSync) return visitSendPortSync(x);
- if (x is JsIsolateSink) return visitIsolateSink(x);
- if (x is CloseToken) return visitCloseToken(x);
// Overridable fallback.
return visitObject(x);
@@ -1189,8 +1144,6 @@ class _MessageTraverser {
visitMap(Map x);
visitSendPort(SendPort x);
visitSendPortSync(SendPortSync x);
- visitIsolateSink(IsolateSink x);
- visitCloseToken(CloseToken x);
visitObject(Object x) {
// TODO(floitsch): make this a real exception. (which one)?
@@ -1302,8 +1255,6 @@ class _Deserializer {
case 'list': return _deserializeList(x);
case 'map': return _deserializeMap(x);
case 'sendport': return deserializeSendPort(x);
- case 'isolateSink': return deserializeIsolateSink(x);
- case 'closeToken': return deserializeCloseToken(x);
default: return deserializeObject(x);
}
}
@@ -1345,10 +1296,6 @@ class _Deserializer {
deserializeSendPort(List x);
- deserializeIsolateSink(List x);
-
- deserializeCloseToken(List x);
-
deserializeObject(List x) {
// TODO(floitsch): Use real exception (which one?).
throw "Unexpected serialized object";
« no previous file with comments | « sdk/lib/_internal/lib/io_patch.dart ('k') | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698