Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(571)

Side by Side Diff: mojo/public/dart/src/interface.dart

Issue 816113004: Dart: Adds a content handler and a test. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 part of bindings; 5 part of bindings;
6 6
7 abstract class Interface { 7 abstract class Interface {
8 core.MojoMessagePipeEndpoint _endpoint; 8 core.MojoMessagePipeEndpoint _endpoint;
9 core.MojoEventStream _eventStream; 9 core.MojoEventStream _eventStream;
10 List _sendQueue; 10 List _sendQueue;
11 bool _isOpen; 11 bool _isOpen;
12 bool _isClosing;
13 bool _isInHandler;
14 int _outstandingResponseFutures;
12 15
13 Future<Message> handleMessage(MessageReader reader); 16 Future<Message> handleMessage(MessageReader reader);
14 17
15 Interface(core.MojoMessagePipeEndpoint endpoint) : 18 Interface(core.MojoMessagePipeEndpoint endpoint) :
16 _endpoint = endpoint, 19 _endpoint = endpoint,
17 _sendQueue = [], 20 _sendQueue = [],
18 _eventStream = new core.MojoEventStream(endpoint.handle), 21 _eventStream = new core.MojoEventStream(endpoint.handle),
19 _isOpen = false; 22 _isOpen = false,
23 _isClosing = false,
24 _isInHandler = false,
25 _outstandingResponseFutures = 0;
20 26
21 Interface.fromHandle(int handle) { 27 Interface.fromHandle(int handle) {
22 _endpoint = 28 _endpoint =
23 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle)); 29 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle));
24 _sendQueue = []; 30 _sendQueue = [];
25 _eventStream = new core.MojoEventStream(_endpoint.handle); 31 _eventStream = new core.MojoEventStream(_endpoint.handle);
26 _isOpen = false; 32 _isOpen = false;
33 _isClosing = false;
34 _isInHandler = false;
35 _outstandingResponseFutures = 0;
27 } 36 }
28 37
29 void _doRead() { 38 void _doRead() {
30 assert(_eventStream.readyRead); 39 assert(_eventStream.readyRead);
31 40
32 // Query how many bytes are available. 41 // Query how many bytes are available.
33 var result = _endpoint.query(); 42 var result = _endpoint.query();
34 assert(result.status.isOk || result.status.isResourceExhausted); 43 assert(result.status.isOk || result.status.isResourceExhausted);
35 44
36 // Read the data and view as a message. 45 // Read the data and view as a message.
37 var bytes = new ByteData(result.bytesRead); 46 var bytes = new ByteData(result.bytesRead);
38 var handles = new List<core.MojoHandle>(result.handlesRead); 47 var handles = new List<core.MojoHandle>(result.handlesRead);
39 result = _endpoint.read(bytes, result.bytesRead, handles); 48 result = _endpoint.read(bytes, result.bytesRead, handles);
40 assert(result.status.isOk || result.status.isResourceExhausted); 49 assert(result.status.isOk || result.status.isResourceExhausted);
41 var message = new Message(bytes, handles); 50 var message = new Message(bytes, handles);
42 var reader = new MessageReader(message); 51 var reader = new MessageReader(message);
43 52
44 // Prepare the response. 53 // Prepare the response. Drop messages if we are closing.
45 var responseFuture = handleMessage(reader); 54 var responseFuture = _isClosing ? null : handleMessage(reader);
46 55
47 // If there's a response, queue it up for sending. 56 // If there's a response, queue it up for sending.
48 if (responseFuture != null) { 57 if (responseFuture != null) {
58 _outstandingResponseFutures++;
49 responseFuture.then((response) { 59 responseFuture.then((response) {
60 _outstandingResponseFutures--;
50 _sendQueue.add(response); 61 _sendQueue.add(response);
51 if (_sendQueue.length == 1) { 62 if (_sendQueue.length == 1) {
52 _eventStream.enableWriteEvents(); 63 _eventStream.enableWriteEvents();
53 } 64 }
54 }); 65 });
66 } else if (_isClosing &&
67 (_sendQueue.length == 0) &&
68 (_outstandingResponseFutures == 0)) {
69 // We are closing, there is no response to send for this message, the send
70 // queue is empty, and there are no outstanding futures. Do the close now.
71 _close();
55 } 72 }
56 } 73 }
57 74
58 void _doWrite() { 75 void _doWrite() {
59 if (_sendQueue.length > 0) { 76 if (_sendQueue.length > 0) {
60 assert(_eventStream.readyWrite); 77 assert(_eventStream.readyWrite);
61 var responseMessage = _sendQueue.removeAt(0); 78 var responseMessage = _sendQueue.removeAt(0);
62 _endpoint.write(responseMessage.buffer, 79 _endpoint.write(responseMessage.buffer,
63 responseMessage.buffer.lengthInBytes, 80 responseMessage.buffer.lengthInBytes,
64 responseMessage.handles); 81 responseMessage.handles);
65 if (!_endpoint.status.isOk) { 82 if (!_endpoint.status.isOk) {
66 throw "message pipe write failed: ${_endpoint.status}"; 83 throw "message pipe write failed: ${_endpoint.status}";
67 } 84 }
85 } else if (_isClosing && (_outstandingResponseFutures == 0)) {
86 // We are closing, the send queue is empty, and there are no outstanding
87 // response futures. Really do the close, now.
88 _close();
68 } 89 }
69 } 90 }
70 91
71 StreamSubscription<int> listen() { 92 StreamSubscription<int> listen() {
72 _isOpen = true; 93 _isOpen = true;
73 return _eventStream.listen((List<int> event) { 94 return _eventStream.listen((List<int> event) {
74 var signalsWatched = new core.MojoHandleSignals(event[0]); 95 var signalsWatched = new core.MojoHandleSignals(event[0]);
75 var signalsReceived = new core.MojoHandleSignals(event[1]); 96 var signalsReceived = new core.MojoHandleSignals(event[1]);
76 if (signalsReceived.isPeerClosed) { 97 if (signalsReceived.isPeerClosed) {
77 close(); 98 // If the peer is closed, we can close the interface immediately.
99 _close();
78 return; 100 return;
79 } 101 }
80 102
103 _isInHandler = true;
81 if (signalsReceived.isReadable) { 104 if (signalsReceived.isReadable) {
82 _doRead(); 105 _doRead();
83 } 106 }
84 107
85 if (signalsReceived.isWritable) { 108 if (signalsReceived.isWritable) {
86 _doWrite(); 109 _doWrite();
87 } 110 }
88 111
89 if (_sendQueue.length == 0) { 112 if (_sendQueue.length == 0) {
90 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; 113 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
91 _eventStream.enableSignals(withoutWritable); 114 _eventStream.enableSignals(withoutWritable);
92 } else { 115 } else {
93 _eventStream.enableSignals(signalsWatched); 116 _eventStream.enableSignals(signalsWatched);
94 } 117 }
118 _isInHandler = false;
95 }); 119 });
96 } 120 }
97 121
122 void _close() {
123 _eventStream.close();
124 _isClosing = false;
125 _isOpen = false;
126 _eventStream = null;
127 }
128
98 void close() { 129 void close() {
99 // TODO(zra): Cancel outstanding Futures started in _doRead? 130 if (!_isOpen) return;
100 if (_isOpen) { 131 if (_isInHandler || (_sendQueue.length > 0) ||
101 _eventStream.close(); 132 (_outstandingResponseFutures > 0)) {
102 _isOpen = false; 133 // If close is called from within the event handler, or if there are
103 _eventStream = null; 134 // outstanding responses to send, then defer the close call until all
135 // everything is finished.
136 _isClosing = true;
137 } else {
138 _close();
104 } 139 }
105 } 140 }
106 141
107 Message buildResponse(Type t, int name, Object response) { 142 Message buildResponse(Type t, int name, Object response) {
108 var builder = new MessageBuilder(name, align(getEncodedSize(t))); 143 var builder = new MessageBuilder(name, align(getEncodedSize(t)));
109 builder.encodeStruct(t, response); 144 builder.encodeStruct(t, response);
110 return builder.finish(); 145 return builder.finish();
111 } 146 }
112 147
113 Message buildResponseWithID( 148 Message buildResponseWithID(
(...skipping 13 matching lines...) Expand all
127 } 162 }
128 163
129 Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) { 164 Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) {
130 // TODO(zra): Is this correct? 165 // TODO(zra): Is this correct?
131 throw "The client interface should not expect a response"; 166 throw "The client interface should not expect a response";
132 } 167 }
133 168
134 bool get isOpen => _isOpen; 169 bool get isOpen => _isOpen;
135 core.MojoMessagePipeEndpoint get endpoint => _endpoint; 170 core.MojoMessagePipeEndpoint get endpoint => _endpoint;
136 } 171 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698