| Index: mojo/public/dart/src/interface.dart
|
| diff --git a/mojo/public/dart/src/interface.dart b/mojo/public/dart/src/interface.dart
|
| index 0a8cfd4be6163af9bb275a3c9b895410bb51025b..37331a6fc0e04e724afd1bac37c60bc15db88386 100644
|
| --- a/mojo/public/dart/src/interface.dart
|
| +++ b/mojo/public/dart/src/interface.dart
|
| @@ -9,6 +9,9 @@ abstract class Interface {
|
| core.MojoEventStream _eventStream;
|
| List _sendQueue;
|
| bool _isOpen;
|
| + bool _isClosing;
|
| + bool _isInHandler;
|
| + int _outstandingResponseFutures;
|
|
|
| Future<Message> handleMessage(MessageReader reader);
|
|
|
| @@ -16,7 +19,10 @@ abstract class Interface {
|
| _endpoint = endpoint,
|
| _sendQueue = [],
|
| _eventStream = new core.MojoEventStream(endpoint.handle),
|
| - _isOpen = false;
|
| + _isOpen = false,
|
| + _isClosing = false,
|
| + _isInHandler = false,
|
| + _outstandingResponseFutures = 0;
|
|
|
| Interface.fromHandle(int handle) {
|
| _endpoint =
|
| @@ -24,6 +30,9 @@ abstract class Interface {
|
| _sendQueue = [];
|
| _eventStream = new core.MojoEventStream(_endpoint.handle);
|
| _isOpen = false;
|
| + _isClosing = false;
|
| + _isInHandler = false;
|
| + _outstandingResponseFutures = 0;
|
| }
|
|
|
| void _doRead() {
|
| @@ -41,17 +50,25 @@ abstract class Interface {
|
| var message = new Message(bytes, handles);
|
| var reader = new MessageReader(message);
|
|
|
| - // Prepare the response.
|
| - var responseFuture = handleMessage(reader);
|
| + // Prepare the response. Drop messages if we are closing.
|
| + var responseFuture = _isClosing ? null : handleMessage(reader);
|
|
|
| // If there's a response, queue it up for sending.
|
| if (responseFuture != null) {
|
| + _outstandingResponseFutures++;
|
| responseFuture.then((response) {
|
| + _outstandingResponseFutures--;
|
| _sendQueue.add(response);
|
| if (_sendQueue.length == 1) {
|
| _eventStream.enableWriteEvents();
|
| }
|
| });
|
| + } else if (_isClosing &&
|
| + (_sendQueue.length == 0) &&
|
| + (_outstandingResponseFutures == 0)) {
|
| + // We are closing, there is no response to send for this message, the send
|
| + // queue is empty, and there are no outstanding futures. Do the close now.
|
| + _close();
|
| }
|
| }
|
|
|
| @@ -65,6 +82,10 @@ abstract class Interface {
|
| if (!_endpoint.status.isOk) {
|
| throw "message pipe write failed: ${_endpoint.status}";
|
| }
|
| + } else if (_isClosing && (_outstandingResponseFutures == 0)) {
|
| + // We are closing, the send queue is empty, and there are no outstanding
|
| + // response futures. Really do the close, now.
|
| + _close();
|
| }
|
| }
|
|
|
| @@ -74,10 +95,12 @@ abstract class Interface {
|
| var signalsWatched = new core.MojoHandleSignals(event[0]);
|
| var signalsReceived = new core.MojoHandleSignals(event[1]);
|
| if (signalsReceived.isPeerClosed) {
|
| - close();
|
| + // If the peer is closed, we can close the interface immediately.
|
| + _close();
|
| return;
|
| }
|
|
|
| + _isInHandler = true;
|
| if (signalsReceived.isReadable) {
|
| _doRead();
|
| }
|
| @@ -92,15 +115,27 @@ abstract class Interface {
|
| } else {
|
| _eventStream.enableSignals(signalsWatched);
|
| }
|
| + _isInHandler = false;
|
| });
|
| }
|
|
|
| + void _close() {
|
| + _eventStream.close();
|
| + _isClosing = false;
|
| + _isOpen = false;
|
| + _eventStream = null;
|
| + }
|
| +
|
| void close() {
|
| - // TODO(zra): Cancel outstanding Futures started in _doRead?
|
| - if (_isOpen) {
|
| - _eventStream.close();
|
| - _isOpen = false;
|
| - _eventStream = null;
|
| + if (!_isOpen) return;
|
| + if (_isInHandler || (_sendQueue.length > 0) ||
|
| + (_outstandingResponseFutures > 0)) {
|
| + // If close is called from within the event handler, or if there are
|
| + // outstanding responses to send, then defer the close call until all
|
| + // everything is finished.
|
| + _isClosing = true;
|
| + } else {
|
| + _close();
|
| }
|
| }
|
|
|
|
|