OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 part of bindings; | 5 part of bindings; |
6 | 6 |
7 abstract class Interface { | 7 abstract class Interface { |
8 core.MojoMessagePipeEndpoint _endpoint; | 8 core.MojoMessagePipeEndpoint _endpoint; |
9 core.MojoHandle _handle; | 9 core.MojoEventStream _eventStream; |
10 List _sendQueue; | 10 List _sendQueue; |
11 bool _isOpen; | 11 bool _isOpen; |
12 | 12 |
13 Message handleMessage(MessageReader reader); | 13 Future<Message> handleMessage(ServiceMessage message); |
14 | 14 |
15 Interface(this._endpoint) { | 15 Interface(core.MojoMessagePipeEndpoint endpoint) : |
| 16 _endpoint = endpoint, |
| 17 _sendQueue = [], |
| 18 _eventStream = new core.MojoEventStream(endpoint.handle), |
| 19 _isOpen = false; |
| 20 |
| 21 Interface.fromHandle(int handle) { |
| 22 _endpoint = |
| 23 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle)); |
16 _sendQueue = []; | 24 _sendQueue = []; |
17 _handle = new core.MojoHandle(_endpoint.handle); | 25 _eventStream = new core.MojoEventStream(_endpoint.handle); |
18 _isOpen = false; | 26 _isOpen = false; |
19 } | 27 } |
20 | 28 |
| 29 void _doRead() { |
| 30 assert(_eventStream.readyRead); |
| 31 |
| 32 // Query how many bytes are available. |
| 33 var result = _endpoint.query(); |
| 34 assert(result.status.isOk || result.status.isResourceExhausted); |
| 35 |
| 36 // Read the data and view as a message. |
| 37 var bytes = new ByteData(result.bytesRead); |
| 38 var handles = new List<core.MojoHandle>(result.handlesRead); |
| 39 result = _endpoint.read(bytes, result.bytesRead, handles); |
| 40 assert(result.status.isOk || result.status.isResourceExhausted); |
| 41 |
| 42 // Prepare the response. |
| 43 var message = new ServiceMessage.fromMessage(new Message(bytes, handles)); |
| 44 var responseFuture = handleMessage(message); |
| 45 |
| 46 // If there's a response, queue it up for sending. |
| 47 if (responseFuture != null) { |
| 48 responseFuture.then((response) { |
| 49 _sendQueue.add(response); |
| 50 if (_sendQueue.length == 1) { |
| 51 _eventStream.enableWriteEvents(); |
| 52 } |
| 53 }); |
| 54 } |
| 55 } |
| 56 |
| 57 void _doWrite() { |
| 58 if (_sendQueue.length > 0) { |
| 59 assert(_eventStream.readyWrite); |
| 60 var responseMessage = _sendQueue.removeAt(0); |
| 61 _endpoint.write(responseMessage.buffer, |
| 62 responseMessage.buffer.lengthInBytes, |
| 63 responseMessage.handles); |
| 64 if (!_endpoint.status.isOk) { |
| 65 throw "message pipe write failed: ${_endpoint.status}"; |
| 66 } |
| 67 } |
| 68 } |
| 69 |
21 StreamSubscription<int> listen() { | 70 StreamSubscription<int> listen() { |
22 _isOpen = true; | 71 _isOpen = true; |
23 return _handle.listen((int mojoSignal) { | 72 return _eventStream.listen((List<int> event) { |
24 if (core.MojoHandleSignals.isReadable(mojoSignal)) { | 73 var signalsWatched = new core.MojoHandleSignals(event[0]); |
25 // Query how many bytes are available. | 74 var signalsReceived = new core.MojoHandleSignals(event[1]); |
26 var result = _endpoint.query(); | 75 if (signalsReceived.isPeerClosed) { |
27 if (!result.status.isOk && !result.status.isResourceExhausted) { | 76 close(); |
28 // If something else happens, it means the handle wasn't really ready | 77 return; |
29 // for reading, which indicates a bug in MojoHandle or the | 78 } |
30 // event listener. | |
31 throw new Exception("message pipe query failed: ${result.status}"); | |
32 } | |
33 | 79 |
34 // Read the data and view as a message. | 80 if (signalsReceived.isReadable) { |
35 var bytes = new ByteData(result.bytesRead); | 81 _doRead(); |
36 var handles = new List<core.RawMojoHandle>(result.handlesRead); | 82 } |
37 result = _endpoint.read(bytes, result.bytesRead, handles); | |
38 if (!result.status.isOk && !result.status.isResourceExhausted) { | |
39 // If something else happens, it means the handle wasn't really ready | |
40 // for reading, which indicates a bug in MojoHandle or the | |
41 // event listener. | |
42 throw new Exception("message pipe read failed: ${result.status}"); | |
43 } | |
44 var message = new Message(bytes, handles); | |
45 var reader = new MessageReader(message); | |
46 | 83 |
47 // Prepare the response. | 84 if (signalsReceived.isWritable) { |
48 var responseMessage = handleMessage(reader); | 85 _doWrite(); |
49 // If there's a response, queue it up for sending. | |
50 if (responseMessage != null) { | |
51 _sendQueue.add(responseMessage); | |
52 if ((_sendQueue.length > 0) && !_handle.writeEnabled()) { | |
53 _handle.enableWriteEvents(); | |
54 } | |
55 } | |
56 } | 86 } |
57 if (core.MojoHandleSignals.isWritable(mojoSignal)) { | 87 |
58 if (_sendQueue.length > 0) { | 88 if (_sendQueue.length == 0) { |
59 var responseMessage = _sendQueue.removeAt(0); | 89 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
60 _endpoint.write(responseMessage.buffer, | 90 _eventStream.enableSignals(withoutWritable); |
61 responseMessage.buffer.lengthInBytes, | 91 } else { |
62 responseMessage.handles); | 92 _eventStream.enableSignals(signalsWatched); |
63 if (!_endpoint.status.isOk) { | |
64 throw new Exception("message pipe write failed"); | |
65 } | |
66 } | |
67 if ((_sendQueue.length == 0) && _handle.writeEnabled()) { | |
68 _handle.disableWriteEvents(); | |
69 } | |
70 } | |
71 if (core.MojoHandleSignals.isNone(mojoSignal)) { | |
72 // The handle watcher will send MojoHandleSignals.NONE when the other | |
73 // endpoint of the pipe is closed. | |
74 _handle.close(); | |
75 } | 93 } |
76 }); | 94 }); |
77 } | 95 } |
78 | 96 |
79 Message buildResponse(Type t, int name, Object response) { | 97 void close() { |
80 var builder = new MessageBuilder(name, align(getEncodedSize(t))); | 98 // TODO(zra): Cancel outstanding Futures started in _doRead? |
81 builder.encodeStruct(t, response); | 99 if (_isOpen) { |
82 return builder.finish(); | 100 _eventStream.close(); |
83 } | 101 _isOpen = false; |
84 | 102 _eventStream = null; |
85 Message buildResponseWithID( | |
86 Type t, int name, int id, int flags, Object response) { | |
87 var builder = new MessageWithRequestIDBuilder( | |
88 name, align(getEncodedSize(t)), id, flags); | |
89 builder.encodeStruct(t, response); | |
90 return builder.finish(); | |
91 } | |
92 | |
93 void enqueueMessage(Type t, int name, Object msg) { | |
94 var builder = new MessageBuilder(name, align(getEncodedSize(t))); | |
95 builder.encodeStruct(t, msg); | |
96 var message = builder.finish(); | |
97 _sendQueue.add(message); | |
98 if (!_handle.writeEnabled()) { | |
99 _handle.enableWriteEvents(); | |
100 } | 103 } |
101 } | 104 } |
102 | 105 |
103 Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) { | 106 Message buildResponse(Struct response, int name) { |
104 throw new Exception("The client Mixin should not expect a response"); | 107 var header = new MessageHeader(name); |
| 108 return response.serializeWithHeader(header); |
| 109 } |
| 110 |
| 111 Message buildResponseWithId(Struct response, int name, int id, int flags) { |
| 112 var header = new MessageHeader.withRequestId(name, flags, id); |
| 113 return response.serializeWithHeader(header); |
| 114 } |
| 115 |
| 116 void enqueueMessage(Struct message, int name) { |
| 117 var header = new MessageHeader(name); |
| 118 var serviceMessage = message.serializeWithHeader(header); |
| 119 _sendQueue.add(serviceMessage); |
| 120 _eventStream.enableWriteEvents(); |
| 121 } |
| 122 |
| 123 Future enqueueMessageWithRequestId(Struct response, int name, int id) { |
| 124 // TODO(zra): Is this correct? |
| 125 throw "The client interface should not expect a response"; |
105 } | 126 } |
106 | 127 |
107 bool get isOpen => _isOpen; | 128 bool get isOpen => _isOpen; |
| 129 core.MojoMessagePipeEndpoint get endpoint => _endpoint; |
108 } | 130 } |
OLD | NEW |