Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Unified Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 153553009: Implement Isolate.pause in dart2js. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/lib/isolate_helper.dart
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
index 05208648a9a3f0e46e579ed2cf703337350684ee..cd3ee4ad56d5108a7bea28b98c0d61e420773756 100644
--- a/sdk/lib/_internal/lib/isolate_helper.dart
+++ b/sdk/lib/_internal/lib/isolate_helper.dart
@@ -22,8 +22,6 @@ import 'dart:_foreign_helper' show DART_CLOSURE_TO_JS,
IsolateContext;
import 'dart:_interceptors' show JSExtendableArray;
-ReceivePort controlPort;
-
/**
* Called by the compiler to support switching
* between isolates when we get a callback from the DOM.
@@ -236,6 +234,39 @@ class _IsolateContext implements IsolateContext {
// native object containing all globals of an isolate.
final isolateStatics = JS_CREATE_ISOLATE();
+ final RawReceivePortImpl controlPort = new RawReceivePortImpl._controlPort();
+
+ final Capability pauseCapability = new Capability();
+
+ // TODO(lrn): Store these in single "pausestate" object, so they don't take
+ // up as much room when not pausing.
+ bool isPaused = false;
+ List<_IsolateEvent> delayedEvents = [];
+ Set<Capability> pauseTokens = new Set();
+
+ _IsolateContext() {
+ this.registerWeak(controlPort._id, controlPort);
+ }
+
+ void addPause(Capability authentification, Capability resume) {
+ if (pauseCapability != authentification) return;
+ if (pauseTokens.add(resume) && !isPaused) {
+ isPaused = true;
+ }
+ }
+
+ void removePause(Capability resume) {
+ if (!isPaused) return;
+ pauseTokens.remove(resume);
+ if (pauseTokens.isEmpty) {
+ while(delayedEvents.isNotEmpty) {
+ _IsolateEvent event = delayedEvents.removeLast();
+ _globalState.topEventLoop.prequeue(event);
+ }
+ isPaused = false;
+ }
+ }
+
/**
* Run [code] in the context of the isolate represented by [this].
*/
@@ -257,15 +288,32 @@ class _IsolateContext implements IsolateContext {
JS_SET_CURRENT_ISOLATE(isolateStatics);
}
+ void handleControlMessage(message) {
+ switch (message[0]) {
+ case "pause":
+ addPause(message[1], message[2]);
+ break;
+ case "resume":
+ removePause(message[1]);
+ break;
+ default:
+ print("UNKOWN MESSAGE: $message");
+ }
+ }
+
/** Looks up a port registered for this isolate. */
RawReceivePortImpl lookup(int portId) => ports[portId];
- /** Registers a port on this isolate. */
- void register(int portId, RawReceivePortImpl port) {
+ void _addRegistration(int portId, RawReceivePortImpl port) {
if (ports.containsKey(portId)) {
throw new Exception("Registry: ports must be registered only once.");
}
ports[portId] = port;
+ }
+
+ /** Registers a port on this isolate. */
+ void register(int portId, RawReceivePortImpl port) {
+ _addRegistration(portId, port);
_updateGlobalState();
}
@@ -276,8 +324,7 @@ class _IsolateContext implements IsolateContext {
*/
void registerWeak(int portId, RawReceivePortImpl port) {
weakPorts.add(portId);
- // 'register' updates the global state.
- register(portId, port);
+ _addRegistration(portId, port);
}
_updateGlobalState() {
@@ -291,6 +338,7 @@ class _IsolateContext implements IsolateContext {
/** Unregister a port on this isolate. */
void unregister(int portId) {
ports.remove(portId);
+ weakPorts.remove(portId);
_updateGlobalState();
}
}
@@ -306,6 +354,10 @@ class _EventLoop {
events.addLast(new _IsolateEvent(isolate, fn, msg));
}
+ void prequeue(_IsolateEvent event) {
+ events.addFirst(event);
+ }
+
_IsolateEvent dequeue() {
if (events.isEmpty) return null;
return events.removeFirst();
@@ -383,6 +435,10 @@ class _IsolateEvent {
_IsolateEvent(this.isolate, this.fn, this.message);
void process() {
+ if (isolate.isPaused) {
+ isolate.delayedEvents.add(this);
+ return;
+ }
isolate.eval(fn);
}
}
@@ -582,7 +638,7 @@ class IsolateNatives {
return JS("", "new #()", ctor);
}
- static Future<SendPort> spawnFunction(void topLevelFunction(message),
+ static Future<List> spawnFunction(void topLevelFunction(message),
message) {
final name = _getJSFunctionName(topLevelFunction);
if (name == null) {
@@ -592,25 +648,25 @@ class IsolateNatives {
return spawn(name, null, null, message, false, false);
}
- static Future<SendPort> spawnUri(Uri uri, List<String> args, message) {
+ static Future<List> spawnUri(Uri uri, List<String> args, message) {
return spawn(null, uri.toString(), args, message, false, true);
}
// TODO(sigmund): clean up above, after we make the new API the default:
/// If [uri] is `null` it is replaced with the current script.
- static Future<SendPort> spawn(String functionName, String uri,
- List<String> args, message,
- bool isLight, bool isSpawnUri) {
+ static Future<List> spawn(String functionName, String uri,
+ List<String> args, message,
+ bool isLight, bool isSpawnUri) {
// Assume that the compiled version of the Dart file lives just next to the
// dart file.
// TODO(floitsch): support precompiled version of dart2js output.
if (uri != null && uri.endsWith(".dart")) uri += ".js";
ReceivePort port = new ReceivePort();
- Future<SendPort> result = port.first.then((msg) {
+ Future<List> result = port.first.then((msg) {
assert(msg[0] == _SPAWNED_SIGNAL);
- return msg[1];
+ return msg;
});
SendPort signalReply = port.sendPort;
@@ -666,8 +722,9 @@ class IsolateNatives {
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
Primitives.initializeStatics(context.id);
// The isolate's port does not keep the isolate open.
- controlPort = new ReceivePortImpl.weak();
- replyTo.send([_SPAWNED_SIGNAL, controlPort.sendPort]);
+ replyTo.send([_SPAWNED_SIGNAL,
+ context.controlPort.sendPort,
+ context.pauseCapability]);
if (!isSpawnUri) {
topLevel(message);
} else if (topLevel is _MainFunctionArgsMessage) {
@@ -763,6 +820,10 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
if (shouldSerialize) {
msg = _serializeMessage(msg);
}
+ if (isolate.controlPort == _receivePort) {
+ isolate.handleControlMessage(msg);
+ return;
+ }
_globalState.topEventLoop.enqueue(isolate, () {
if (!_receivePort._isClosed) {
if (shouldSerialize) {
@@ -823,18 +884,23 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
class RawReceivePortImpl implements RawReceivePort {
static int _nextFreeId = 1;
- final int _id = _nextFreeId++;
+ final int _id;
Function _handler;
bool _isClosed = false;
- RawReceivePortImpl(this._handler) {
+ RawReceivePortImpl(this._handler) : _id = _nextFreeId++ {
_globalState.currentContext.register(_id, this);
}
- RawReceivePortImpl.weak(this._handler) {
+ RawReceivePortImpl.weak(this._handler) : _id = _nextFreeId++ {
_globalState.currentContext.registerWeak(_id, this);
}
+ // Creates the control port of an isolate.
+ // This is created before the isolate context object itself,
+ // so it cannot access the static _nextFreeId field.
+ RawReceivePortImpl._controlPort() : _handler = null, _id = 0;
+
void set handler(Function newHandler) {
_handler = newHandler;
}
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698