Index: mojo/public/dart/src/handle_watcher.dart |
diff --git a/mojo/public/dart/src/handle_watcher.dart b/mojo/public/dart/src/handle_watcher.dart |
index 459e705acb93fbbbf1cee561f99cbfb9e1663652..297f72d8fe6416725a89e812260f15400cff64da 100644 |
--- a/mojo/public/dart/src/handle_watcher.dart |
+++ b/mojo/public/dart/src/handle_watcher.dart |
@@ -33,23 +33,18 @@ class _MojoHandleWatcherNatives { |
// from the set of handles it watches. This allows the application isolate |
// to, e.g., pause the stream of events. |
// |
-// toggleWrite(handle) - Modifies the set of signals that an application isolate |
-// wishes to be notified about. Whether the application isolate should be |
-// be notified about a handle ready for writing is made the opposite. |
-// |
// close(handle) - Notifies the HandleWatcherIsolate that a handle it is |
// watching should be removed from its set and closed. |
class MojoHandleWatcher { |
// Control commands. |
static const int ADD = 0; |
static const int REMOVE = 1; |
- static const int TOGGLE_WRITE = 2; |
- static const int CLOSE = 3; |
- static const int TIMER = 4; |
- static const int SHUTDOWN = 5; |
+ static const int CLOSE = 2; |
+ static const int TIMER = 3; |
+ static const int SHUTDOWN = 4; |
static int _encodeCommand(int cmd, [int signals = 0]) => |
- (cmd << 2) | (signals & MojoHandleSignals.READWRITE); |
+ (cmd << 2) | (signals & MojoHandleSignals.kReadWrite); |
static int _decodeCommand(int cmd) => cmd >> 2; |
// The Mojo handle over which control messages are sent. |
@@ -73,8 +68,8 @@ class MojoHandleWatcher { |
Map<int, int> _handleIndices; |
// Since we are not storing wrapped handles, a dummy handle for when we need |
- // a RawMojoHandle. |
- RawMojoHandle _tempHandle; |
+ // a MojoHandle. |
+ MojoHandle _tempHandle; |
// Priority queue of timers registered with the watcher. |
TimerQueue _timerQueue; |
@@ -86,12 +81,12 @@ class MojoHandleWatcher { |
_signals = new List<int>(), |
_handleIndices = new Map<int, int>(), |
_handleCount = 1, |
- _tempHandle = new RawMojoHandle(RawMojoHandle.INVALID), |
+ _tempHandle = new MojoHandle(MojoHandle.INVALID), |
_timerQueue = new TimerQueue() { |
// Setup control handle. |
_handles.add(_controlHandle); |
_ports.add(null); // There is no port for the control handle. |
- _signals.add(MojoHandleSignals.READABLE); |
+ _signals.add(MojoHandleSignals.kReadable); |
_handleIndices[_controlHandle] = 0; |
} |
@@ -99,43 +94,38 @@ class MojoHandleWatcher { |
MojoHandleWatcher watcher = new MojoHandleWatcher(consumerHandle); |
while (!watcher._shutdown) { |
int deadline = watcher._processTimerDeadlines(); |
- MojoWaitManyResult res = RawMojoHandle.waitMany(watcher._handles, |
- watcher._signals, |
- deadline); |
- if (res.result.isOk && res.index == 0) { |
+ MojoWaitManyResult mwmr = MojoHandle.waitMany( |
+ watcher._handles, watcher._signals, deadline); |
+ if (mwmr.result.isOk && mwmr.index == 0) { |
watcher._handleControlMessage(); |
- } else if (res.result.isOk && res.index > 0) { |
- int handle = watcher._handles[res.index]; |
+ } else if (mwmr.result.isOk && (mwmr.index > 0)) { |
+ int handle = watcher._handles[mwmr.index]; |
// Route event. |
- watcher._routeEvent(res.index); |
+ watcher._routeEvent(mwmr.index); |
// Remove the handle from the list. |
watcher._removeHandle(handle); |
- } else if (res.areSignalStatesValid) { |
+ } else if (!mwmr.result.isDeadlineExceeded) { |
// Some handle was closed, but not by us. |
// Find it and close it on our side. |
- watcher._pruneClosedHandles(res.states); |
+ watcher._pruneClosedHandles(mwmr.states); |
} |
} |
} |
void _routeEvent(int idx) { |
int client_handle = _handles[idx]; |
- int signals = _signals[idx]; |
+ var signals = new MojoHandleSignals(_signals[idx]); |
SendPort port = _ports[idx]; |
_tempHandle.h = client_handle; |
- bool readyWrite = |
- MojoHandleSignals.isWritable(signals) && _tempHandle.readyWrite(); |
- bool readyRead = |
- MojoHandleSignals.isReadable(signals) && _tempHandle.readyRead(); |
- if (readyRead && readyWrite) { |
- port.send(MojoHandleSignals.READWRITE); |
- } else if (readyWrite) { |
- port.send(MojoHandleSignals.WRITABLE); |
- } else if (readyRead) { |
- port.send(MojoHandleSignals.READABLE); |
- } |
- _tempHandle.h = RawMojoHandle.INVALID; |
+ bool readyWrite = signals.isWritable && _tempHandle.readyWrite; |
+ bool readyRead = signals.isReadable && _tempHandle.readyRead; |
+ _tempHandle.h = MojoHandle.INVALID; |
+ |
+ var event = MojoHandleSignals.NONE; |
+ event += readyRead ? MojoHandleSignals.READABLE : MojoHandleSignals.NONE; |
+ event += readyWrite ? MojoHandleSignals.WRITABLE : MojoHandleSignals.NONE; |
+ port.send([signals.value, event.value]); |
} |
void _handleControlMessage() { |
@@ -144,7 +134,8 @@ class MojoHandleWatcher { |
// result[1] = SendPort if any. |
// result[2] = command << 2 | WRITABLE | READABLE |
- int signals = result[2] & MojoHandleSignals.READWRITE; |
+ var signals = new MojoHandleSignals( |
+ result[2] & MojoHandleSignals.kReadWrite); |
int command = _decodeCommand(result[2]); |
switch (command) { |
case ADD: |
@@ -153,9 +144,6 @@ class MojoHandleWatcher { |
case REMOVE: |
_removeHandle(result[0]); |
break; |
- case TOGGLE_WRITE: |
- _toggleWrite(result[0]); |
- break; |
case CLOSE: |
_close(result[0]); |
break; |
@@ -163,29 +151,36 @@ class MojoHandleWatcher { |
_timer(result[1], result[0]); |
break; |
case SHUTDOWN: |
- _shutdownHandleWatcher(); |
+ _shutdownHandleWatcher(result[1]); |
break; |
default: |
- throw new Exception("Invalid Command: $command"); |
+ throw "Invalid Command: $command"; |
break; |
} |
} |
- void _addHandle(int mojoHandle, SendPort port, int signals) { |
- _handles.add(mojoHandle); |
- _ports.add(port); |
- _signals.add(signals & MojoHandleSignals.READWRITE); |
- _handleIndices[mojoHandle] = _handleCount; |
- _handleCount++; |
+ void _addHandle(int mojoHandle, SendPort port, MojoHandleSignals signals) { |
+ int idx = _handleIndices[mojoHandle]; |
+ if (idx == null) { |
+ _handles.add(mojoHandle); |
+ _ports.add(port); |
+ _signals.add(signals.value); |
+ _handleIndices[mojoHandle] = _handleCount; |
+ _handleCount++; |
+ } else { |
+ assert(_ports[idx] == port); |
+ assert(_handles[idx] == mojoHandle); |
+ _signals[idx] |= signals.value; |
+ } |
} |
void _removeHandle(int mojoHandle) { |
int idx = _handleIndices[mojoHandle]; |
if (idx == null) { |
- throw new Exception("Remove on a non-existent handle: idx = $idx."); |
+ throw "Remove on a non-existent handle: idx = $idx."; |
} |
if (idx == 0) { |
- throw new Exception("The control handle (idx = 0) cannot be removed."); |
+ throw "The control handle (idx = 0) cannot be removed."; |
} |
// We don't use List.removeAt so that we know how to fix-up _handleIndices. |
if (idx == _handleCount - 1) { |
@@ -197,6 +192,7 @@ class MojoHandleWatcher { |
_handleCount--; |
} else { |
int last = _handleCount - 1; |
+ _handleIndices[_handles[idx]] = null; |
_handles[idx] = _handles[last]; |
_signals[idx] = _signals[last]; |
_ports[idx] = _ports[last]; |
@@ -211,18 +207,21 @@ class MojoHandleWatcher { |
void _close(int mojoHandle, {bool pruning : false}) { |
int idx = _handleIndices[mojoHandle]; |
if (idx == null) { |
- throw new Exception("Close on a non-existent handle: idx = $idx."); |
+ // A client may request to close a handle that has already been closed on |
+ // the other side and pruned, but before receiving notification from the |
+ // handle watcher. |
+ return; |
} |
if (idx == 0) { |
- throw new Exception("The control handle (idx = 0) cannot be closed."); |
+ throw "The control handle (idx = 0) cannot be closed."; |
} |
_tempHandle.h = _handles[idx]; |
_tempHandle.close(); |
- _tempHandle.h = RawMojoHandle.INVALID; |
+ _tempHandle.h = MojoHandle.INVALID; |
if (pruning) { |
// If this handle is being pruned, notify the application isolate |
- // by sending MojoHandleSignals.NONE. |
- _ports[idx].send(MojoHandleSignals.NONE); |
+ // by sending MojoHandleSignals.PEER_CLOSED. |
+ _ports[idx].send([_signals[idx], MojoHandleSignals.kPeerClosed]); |
} |
_removeHandle(mojoHandle); |
} |
@@ -236,55 +235,54 @@ class MojoHandleWatcher { |
now = (new DateTime.now()).millisecondsSinceEpoch; |
} |
return _timerQueue.hasTimer ? (_timerQueue.currentTimeout - now) * 1000 |
- : RawMojoHandle.DEADLINE_INDEFINITE; |
+ : MojoHandle.DEADLINE_INDEFINITE; |
} |
void _timer(SendPort port, int deadline) { |
_timerQueue.updateTimer(port, deadline); |
} |
- void _toggleWrite(int mojoHandle) { |
- int idx = _handleIndices[mojoHandle]; |
- if (idx == null) { |
- throw new Exception( |
- "Toggle write on a non-existent handle: $mojoHandle."); |
- } |
- if (idx == 0) { |
- throw new Exception("The control handle (idx = 0) cannot be toggled."); |
- } |
- _signals[idx] = MojoHandleSignals.toggleWrite(_signals[idx]); |
- } |
- |
void _pruneClosedHandles(List<MojoHandleSignalsState> states) { |
List<int> closed = new List(); |
- for (var i=0; i<_handles.length; i++) { |
- if (MojoHandleSignals.isPeerClosed(states[i].satisfied_signals)) { |
- closed.add(_handles[i]); |
+ for (var i = 0; i < _handles.length; i++) { |
+ if (states != null) { |
+ var signals = new MojoHandleSignals(states[i].satisfied_signals); |
+ if (signals.isPeerClosed) { |
+ closed.add(_handles[i]); |
+ } |
+ } else { |
+ _tempHandle.h = h; |
+ MojoResult res = _tempHandle.wait(MojoHandleSignals.kReadWrite, 0); |
+ if ((!res.isOk) && (!res.isDeadlineExceeded)) { |
+ closed.add(h); |
+ } |
+ _tempHandle.h = MojoHandle.INVALID; |
} |
} |
for (var h in closed) { |
- _close(h, pruning: true); |
+ _close(h, pruning: true); |
} |
// '_close' updated the '_handles' array, so at this point the '_handles' |
// array and the caller's 'states' array are mismatched. |
} |
- void _shutdownHandleWatcher() { |
+ void _shutdownHandleWatcher(SendPort shutdownSendPort) { |
_shutdown = true; |
_tempHandle.h = _controlHandle; |
_tempHandle.close(); |
- _tempHandle.h = RawMojoHandle.INVALID; |
+ _tempHandle.h = MojoHandle.INVALID; |
+ shutdownSendPort.send(null); |
} |
- static MojoResult _sendControlData(RawMojoHandle mojoHandle, |
+ static MojoResult _sendControlData(MojoHandle mojoHandle, |
SendPort port, |
int data) { |
int controlHandle = _MojoHandleWatcherNatives.getControlHandle(); |
- if (controlHandle == RawMojoHandle.INVALID) { |
- throw new Exception("Found invalid control handle"); |
+ if (controlHandle == MojoHandle.INVALID) { |
+ return MojoResult.FAILED_PRECONDITION; |
} |
- int rawHandle = RawMojoHandle.INVALID; |
+ int rawHandle = MojoHandle.INVALID; |
if (mojoHandle != null) { |
rawHandle = mojoHandle.h; |
} |
@@ -300,6 +298,7 @@ class MojoHandleWatcher { |
int producerHandle = pipe.endpoints[1].handle.h; |
// Call setControlHandle with the other end. |
+ assert(producerHandle != MojoHandle.INVALID); |
_MojoHandleWatcherNatives.setControlHandle(producerHandle); |
// Spawn the handle watcher isolate with the MojoHandleWatcher, |
@@ -307,37 +306,42 @@ class MojoHandleWatcher { |
} |
static void Stop() { |
+ // Create a port for notification that the handle watcher has shutdown. |
+ var shutdownReceivePort = new ReceivePort(); |
+ var shutdownSendPort = shutdownReceivePort.sendPort; |
+ |
// Send the shutdown command. |
- _sendControlData(null, null, _encodeCommand(SHUTDOWN)); |
+ _sendControlData(null, shutdownSendPort, _encodeCommand(SHUTDOWN)); |
// Close the control handle. |
int controlHandle = _MojoHandleWatcherNatives.getControlHandle(); |
- var handle = new RawMojoHandle(controlHandle); |
+ var handle = new MojoHandle(controlHandle); |
handle.close(); |
// Invalidate the control handle. |
- _MojoHandleWatcherNatives.setControlHandle(RawMojoHandle.INVALID); |
- } |
+ _MojoHandleWatcherNatives.setControlHandle(MojoHandle.INVALID); |
- static MojoResult close(RawMojoHandle mojoHandle) { |
- return _sendControlData(mojoHandle, null, _encodeCommand(CLOSE)); |
+ // Wait for the handle watcher isolate to exit. |
+ shutdownReceivePort.first.then((_) { |
+ shutdownReceivePort.close(); |
+ }); |
} |
- static MojoResult toggleWrite(RawMojoHandle mojoHandle) { |
- return _sendControlData(mojoHandle, null, _encodeCommand(TOGGLE_WRITE)); |
+ static MojoResult close(MojoHandle mojoHandle) { |
+ return _sendControlData(mojoHandle, null, _encodeCommand(CLOSE)); |
} |
- static MojoResult add(RawMojoHandle mojoHandle, SendPort port, int signals) { |
+ static MojoResult add(MojoHandle mojoHandle, SendPort port, int signals) { |
return _sendControlData(mojoHandle, port, _encodeCommand(ADD, signals)); |
} |
- static MojoResult remove(RawMojoHandle mojoHandle) { |
+ static MojoResult remove(MojoHandle mojoHandle) { |
return _sendControlData(mojoHandle, null, _encodeCommand(REMOVE)); |
} |
static MojoResult timer(SendPort port, int deadline) { |
// The deadline will be unwrapped before sending to the handle watcher. |
return _sendControlData( |
- new RawMojoHandle(deadline), port, _encodeCommand(TIMER)); |
+ new MojoHandle(deadline), port, _encodeCommand(TIMER)); |
} |
} |