Index: sdk/lib/_internal/lib/isolate_helper.dart |
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
index 05208648a9a3f0e46e579ed2cf703337350684ee..cd3ee4ad56d5108a7bea28b98c0d61e420773756 100644 |
--- a/sdk/lib/_internal/lib/isolate_helper.dart |
+++ b/sdk/lib/_internal/lib/isolate_helper.dart |
@@ -22,8 +22,6 @@ import 'dart:_foreign_helper' show DART_CLOSURE_TO_JS, |
IsolateContext; |
import 'dart:_interceptors' show JSExtendableArray; |
-ReceivePort controlPort; |
- |
/** |
* Called by the compiler to support switching |
* between isolates when we get a callback from the DOM. |
@@ -236,6 +234,39 @@ class _IsolateContext implements IsolateContext { |
// native object containing all globals of an isolate. |
final isolateStatics = JS_CREATE_ISOLATE(); |
+ final RawReceivePortImpl controlPort = new RawReceivePortImpl._controlPort(); |
+ |
+ final Capability pauseCapability = new Capability(); |
+ |
+ // TODO(lrn): Store these in single "pausestate" object, so they don't take |
+ // up as much room when not pausing. |
+ bool isPaused = false; |
+ List<_IsolateEvent> delayedEvents = []; |
+ Set<Capability> pauseTokens = new Set(); |
+ |
+ _IsolateContext() { |
+ this.registerWeak(controlPort._id, controlPort); |
+ } |
+ |
+ void addPause(Capability authentification, Capability resume) { |
+ if (pauseCapability != authentification) return; |
+ if (pauseTokens.add(resume) && !isPaused) { |
+ isPaused = true; |
+ } |
+ } |
+ |
+ void removePause(Capability resume) { |
+ if (!isPaused) return; |
+ pauseTokens.remove(resume); |
+ if (pauseTokens.isEmpty) { |
+ while(delayedEvents.isNotEmpty) { |
+ _IsolateEvent event = delayedEvents.removeLast(); |
+ _globalState.topEventLoop.prequeue(event); |
+ } |
+ isPaused = false; |
+ } |
+ } |
+ |
/** |
* Run [code] in the context of the isolate represented by [this]. |
*/ |
@@ -257,15 +288,32 @@ class _IsolateContext implements IsolateContext { |
JS_SET_CURRENT_ISOLATE(isolateStatics); |
} |
+ void handleControlMessage(message) { |
+ switch (message[0]) { |
+ case "pause": |
+ addPause(message[1], message[2]); |
+ break; |
+ case "resume": |
+ removePause(message[1]); |
+ break; |
+ default: |
+ print("UNKOWN MESSAGE: $message"); |
+ } |
+ } |
+ |
/** Looks up a port registered for this isolate. */ |
RawReceivePortImpl lookup(int portId) => ports[portId]; |
- /** Registers a port on this isolate. */ |
- void register(int portId, RawReceivePortImpl port) { |
+ void _addRegistration(int portId, RawReceivePortImpl port) { |
if (ports.containsKey(portId)) { |
throw new Exception("Registry: ports must be registered only once."); |
} |
ports[portId] = port; |
+ } |
+ |
+ /** Registers a port on this isolate. */ |
+ void register(int portId, RawReceivePortImpl port) { |
+ _addRegistration(portId, port); |
_updateGlobalState(); |
} |
@@ -276,8 +324,7 @@ class _IsolateContext implements IsolateContext { |
*/ |
void registerWeak(int portId, RawReceivePortImpl port) { |
weakPorts.add(portId); |
- // 'register' updates the global state. |
- register(portId, port); |
+ _addRegistration(portId, port); |
} |
_updateGlobalState() { |
@@ -291,6 +338,7 @@ class _IsolateContext implements IsolateContext { |
/** Unregister a port on this isolate. */ |
void unregister(int portId) { |
ports.remove(portId); |
+ weakPorts.remove(portId); |
_updateGlobalState(); |
} |
} |
@@ -306,6 +354,10 @@ class _EventLoop { |
events.addLast(new _IsolateEvent(isolate, fn, msg)); |
} |
+ void prequeue(_IsolateEvent event) { |
+ events.addFirst(event); |
+ } |
+ |
_IsolateEvent dequeue() { |
if (events.isEmpty) return null; |
return events.removeFirst(); |
@@ -383,6 +435,10 @@ class _IsolateEvent { |
_IsolateEvent(this.isolate, this.fn, this.message); |
void process() { |
+ if (isolate.isPaused) { |
+ isolate.delayedEvents.add(this); |
+ return; |
+ } |
isolate.eval(fn); |
} |
} |
@@ -582,7 +638,7 @@ class IsolateNatives { |
return JS("", "new #()", ctor); |
} |
- static Future<SendPort> spawnFunction(void topLevelFunction(message), |
+ static Future<List> spawnFunction(void topLevelFunction(message), |
message) { |
final name = _getJSFunctionName(topLevelFunction); |
if (name == null) { |
@@ -592,25 +648,25 @@ class IsolateNatives { |
return spawn(name, null, null, message, false, false); |
} |
- static Future<SendPort> spawnUri(Uri uri, List<String> args, message) { |
+ static Future<List> spawnUri(Uri uri, List<String> args, message) { |
return spawn(null, uri.toString(), args, message, false, true); |
} |
// TODO(sigmund): clean up above, after we make the new API the default: |
/// If [uri] is `null` it is replaced with the current script. |
- static Future<SendPort> spawn(String functionName, String uri, |
- List<String> args, message, |
- bool isLight, bool isSpawnUri) { |
+ static Future<List> spawn(String functionName, String uri, |
+ List<String> args, message, |
+ bool isLight, bool isSpawnUri) { |
// Assume that the compiled version of the Dart file lives just next to the |
// dart file. |
// TODO(floitsch): support precompiled version of dart2js output. |
if (uri != null && uri.endsWith(".dart")) uri += ".js"; |
ReceivePort port = new ReceivePort(); |
- Future<SendPort> result = port.first.then((msg) { |
+ Future<List> result = port.first.then((msg) { |
assert(msg[0] == _SPAWNED_SIGNAL); |
- return msg[1]; |
+ return msg; |
}); |
SendPort signalReply = port.sendPort; |
@@ -666,8 +722,9 @@ class IsolateNatives { |
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); |
Primitives.initializeStatics(context.id); |
// The isolate's port does not keep the isolate open. |
- controlPort = new ReceivePortImpl.weak(); |
- replyTo.send([_SPAWNED_SIGNAL, controlPort.sendPort]); |
+ replyTo.send([_SPAWNED_SIGNAL, |
+ context.controlPort.sendPort, |
+ context.pauseCapability]); |
if (!isSpawnUri) { |
topLevel(message); |
} else if (topLevel is _MainFunctionArgsMessage) { |
@@ -763,6 +820,10 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
if (shouldSerialize) { |
msg = _serializeMessage(msg); |
} |
+ if (isolate.controlPort == _receivePort) { |
+ isolate.handleControlMessage(msg); |
+ return; |
+ } |
_globalState.topEventLoop.enqueue(isolate, () { |
if (!_receivePort._isClosed) { |
if (shouldSerialize) { |
@@ -823,18 +884,23 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort { |
class RawReceivePortImpl implements RawReceivePort { |
static int _nextFreeId = 1; |
- final int _id = _nextFreeId++; |
+ final int _id; |
Function _handler; |
bool _isClosed = false; |
- RawReceivePortImpl(this._handler) { |
+ RawReceivePortImpl(this._handler) : _id = _nextFreeId++ { |
_globalState.currentContext.register(_id, this); |
} |
- RawReceivePortImpl.weak(this._handler) { |
+ RawReceivePortImpl.weak(this._handler) : _id = _nextFreeId++ { |
_globalState.currentContext.registerWeak(_id, this); |
} |
+ // Creates the control port of an isolate. |
+ // This is created before the isolate context object itself, |
+ // so it cannot access the static _nextFreeId field. |
+ RawReceivePortImpl._controlPort() : _handler = null, _id = 0; |
+ |
void set handler(Function newHandler) { |
_handler = newHandler; |
} |