Index: mojo/public/dart/src/interface.dart |
diff --git a/mojo/public/dart/src/interface.dart b/mojo/public/dart/src/interface.dart |
index 0a8cfd4be6163af9bb275a3c9b895410bb51025b..37331a6fc0e04e724afd1bac37c60bc15db88386 100644 |
--- a/mojo/public/dart/src/interface.dart |
+++ b/mojo/public/dart/src/interface.dart |
@@ -9,6 +9,9 @@ abstract class Interface { |
core.MojoEventStream _eventStream; |
List _sendQueue; |
bool _isOpen; |
+ bool _isClosing; |
+ bool _isInHandler; |
+ int _outstandingResponseFutures; |
Future<Message> handleMessage(MessageReader reader); |
@@ -16,7 +19,10 @@ abstract class Interface { |
_endpoint = endpoint, |
_sendQueue = [], |
_eventStream = new core.MojoEventStream(endpoint.handle), |
- _isOpen = false; |
+ _isOpen = false, |
+ _isClosing = false, |
+ _isInHandler = false, |
+ _outstandingResponseFutures = 0; |
Interface.fromHandle(int handle) { |
_endpoint = |
@@ -24,6 +30,9 @@ abstract class Interface { |
_sendQueue = []; |
_eventStream = new core.MojoEventStream(_endpoint.handle); |
_isOpen = false; |
+ _isClosing = false; |
+ _isInHandler = false; |
+ _outstandingResponseFutures = 0; |
} |
void _doRead() { |
@@ -41,17 +50,25 @@ abstract class Interface { |
var message = new Message(bytes, handles); |
var reader = new MessageReader(message); |
- // Prepare the response. |
- var responseFuture = handleMessage(reader); |
+ // Prepare the response. Drop messages if we are closing. |
+ var responseFuture = _isClosing ? null : handleMessage(reader); |
// If there's a response, queue it up for sending. |
if (responseFuture != null) { |
+ _outstandingResponseFutures++; |
responseFuture.then((response) { |
+ _outstandingResponseFutures--; |
_sendQueue.add(response); |
if (_sendQueue.length == 1) { |
_eventStream.enableWriteEvents(); |
} |
}); |
+ } else if (_isClosing && |
+ (_sendQueue.length == 0) && |
+ (_outstandingResponseFutures == 0)) { |
+ // We are closing, there is no response to send for this message, the send |
+ // queue is empty, and there are no outstanding futures. Do the close now. |
+ _close(); |
} |
} |
@@ -65,6 +82,10 @@ abstract class Interface { |
if (!_endpoint.status.isOk) { |
throw "message pipe write failed: ${_endpoint.status}"; |
} |
+ } else if (_isClosing && (_outstandingResponseFutures == 0)) { |
+ // We are closing, the send queue is empty, and there are no outstanding |
+ // response futures. Really do the close, now. |
+ _close(); |
} |
} |
@@ -74,10 +95,12 @@ abstract class Interface { |
var signalsWatched = new core.MojoHandleSignals(event[0]); |
var signalsReceived = new core.MojoHandleSignals(event[1]); |
if (signalsReceived.isPeerClosed) { |
- close(); |
+ // If the peer is closed, we can close the interface immediately. |
+ _close(); |
return; |
} |
+ _isInHandler = true; |
if (signalsReceived.isReadable) { |
_doRead(); |
} |
@@ -92,15 +115,27 @@ abstract class Interface { |
} else { |
_eventStream.enableSignals(signalsWatched); |
} |
+ _isInHandler = false; |
}); |
} |
+ void _close() { |
+ _eventStream.close(); |
+ _isClosing = false; |
+ _isOpen = false; |
+ _eventStream = null; |
+ } |
+ |
void close() { |
- // TODO(zra): Cancel outstanding Futures started in _doRead? |
- if (_isOpen) { |
- _eventStream.close(); |
- _isOpen = false; |
- _eventStream = null; |
+ if (!_isOpen) return; |
+ if (_isInHandler || (_sendQueue.length > 0) || |
+ (_outstandingResponseFutures > 0)) { |
+ // If close is called from within the event handler, or if there are |
+ // outstanding responses to send, then defer the close call until all |
+ // everything is finished. |
+ _isClosing = true; |
+ } else { |
+ _close(); |
} |
} |