OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 part of core; | |
6 | |
7 class MojoEventStream extends Stream<List<int>> { | |
8 // The underlying Mojo handle. | |
9 MojoHandle _handle; | |
10 | |
11 // Providing our own stream controller allows us to take custom actions when | |
12 // listeners pause/resume/etc. their StreamSubscription. | |
13 StreamController _controller; | |
14 | |
15 // The send port that we give to the handle watcher to notify us of handle | |
16 // events. | |
17 SendPort _sendPort; | |
18 | |
19 // The receive port on which we listen and receive events from the handle | |
20 // watcher. | |
21 ReceivePort _receivePort; | |
22 | |
23 // The signals on this handle that we're interested in. | |
24 MojoHandleSignals _signals; | |
25 | |
26 // Whether listen has been called. | |
27 bool _isListening; | |
28 | |
29 MojoEventStream(MojoHandle handle, | |
30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) | |
31 : _handle = handle, | |
32 _signals = signals, | |
33 _isListening = false { | |
34 MojoResult result = MojoHandle.register(this); | |
35 if (!result.isOk) { | |
36 throw "Failed to register the MojoHandle: $result."; | |
37 } | |
38 } | |
39 | |
40 Future close({bool immediate: false}) { | |
41 if (_handle != null) { | |
42 if (_isListening) { | |
43 return _handleWatcherClose(immediate: immediate); | |
44 } else { | |
45 _localClose(); | |
46 return new Future.value(null); | |
47 } | |
48 } | |
49 } | |
50 | |
51 StreamSubscription<List<int>> listen(void onData(List event), | |
52 {Function onError, void onDone(), bool cancelOnError}) { | |
53 if (_isListening) { | |
54 throw "Listen has already been called: $_handle."; | |
55 } | |
56 _receivePort = new ReceivePort(); | |
57 _sendPort = _receivePort.sendPort; | |
58 _controller = new StreamController( | |
59 sync: true, | |
60 onListen: _onSubscriptionStateChange, | |
61 onCancel: _onSubscriptionStateChange, | |
62 onPause: _onPauseStateChange, | |
63 onResume: _onPauseStateChange); | |
64 _controller.addStream(_receivePort).whenComplete(_controller.close); | |
65 | |
66 if (_signals != MojoHandleSignals.NONE) { | |
67 var res = new MojoResult( | |
68 MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value)); | |
69 if (!res.isOk) { | |
70 throw "MojoHandleWatcher add failed: $res"; | |
71 } | |
72 } | |
73 | |
74 _isListening = true; | |
75 return _controller.stream.listen(onData, | |
76 onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
77 } | |
78 | |
79 void enableSignals(MojoHandleSignals signals) { | |
80 _signals = signals; | |
81 if (_isListening) { | |
82 var res = new MojoResult( | |
83 MojoHandleWatcher.add(_handle.h, _sendPort, signals.value)); | |
84 if (!res.isOk) { | |
85 throw "MojoHandleWatcher add failed: $res"; | |
86 } | |
87 } | |
88 } | |
89 | |
90 void enableReadEvents() => | |
91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); | |
92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); | |
93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); | |
94 | |
95 Future _handleWatcherClose({bool immediate: false}) { | |
96 assert(_handle != null); | |
97 assert(MojoHandle._removeUnclosedHandle(_handle)); | |
98 return MojoHandleWatcher.close(_handle.h, wait: !immediate).then((r) { | |
99 if (_receivePort != null) { | |
100 _receivePort.close(); | |
101 _receivePort = null; | |
102 } | |
103 return new MojoResult(r); | |
104 }); | |
105 } | |
106 | |
107 void _localClose() { | |
108 assert(_handle != null); | |
109 _handle.close(); | |
110 _handle = null; | |
111 if (_receivePort != null) { | |
112 _receivePort.close(); | |
113 _receivePort = null; | |
114 } | |
115 } | |
116 | |
117 void _onSubscriptionStateChange() { | |
118 if (!_controller.hasListener) { | |
119 // No one is listening, close it immediately. | |
120 close(immediate: true); | |
121 } | |
122 } | |
123 | |
124 void _onPauseStateChange() { | |
125 if (_controller.isPaused) { | |
126 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); | |
127 if (!res.isOk) { | |
128 throw "MojoHandleWatcher add failed: $res"; | |
129 } | |
130 } else { | |
131 var res = new MojoResult( | |
132 MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value)); | |
133 if (!res.isOk) { | |
134 throw "MojoHandleWatcher add failed: $res"; | |
135 } | |
136 } | |
137 } | |
138 | |
139 bool get readyRead => _handle.readyRead; | |
140 bool get readyWrite => _handle.readyWrite; | |
141 | |
142 String toString() => "$_handle"; | |
143 } | |
144 | |
145 typedef void ErrorHandler(); | |
146 | |
147 class MojoEventStreamListener { | |
148 MojoMessagePipeEndpoint _endpoint; | |
149 MojoEventStream _eventStream; | |
150 bool _isOpen = false; | |
151 bool _isInHandler = false; | |
152 StreamSubscription subscription; | |
153 ErrorHandler onError; | |
154 | |
155 MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint) | |
156 : _endpoint = endpoint, | |
157 _eventStream = new MojoEventStream(endpoint.handle), | |
158 _isOpen = false { | |
159 listen(); | |
160 } | |
161 | |
162 MojoEventStreamListener.fromHandle(MojoHandle handle) { | |
163 _endpoint = new MojoMessagePipeEndpoint(handle); | |
164 _eventStream = new MojoEventStream(handle); | |
165 _isOpen = false; | |
166 listen(); | |
167 } | |
168 | |
169 MojoEventStreamListener.unbound() | |
170 : _endpoint = null, | |
171 _eventStream = null, | |
172 _isOpen = false; | |
173 | |
174 void bind(MojoMessagePipeEndpoint endpoint) { | |
175 assert(!isBound); | |
176 _endpoint = endpoint; | |
177 _eventStream = new MojoEventStream(endpoint.handle); | |
178 _isOpen = false; | |
179 } | |
180 | |
181 void bindFromHandle(MojoHandle handle) { | |
182 assert(!isBound); | |
183 _endpoint = new MojoMessagePipeEndpoint(handle); | |
184 _eventStream = new MojoEventStream(handle); | |
185 _isOpen = false; | |
186 } | |
187 | |
188 StreamSubscription<List<int>> listen() { | |
189 assert(isBound && (subscription == null)); | |
190 _isOpen = true; | |
191 subscription = _eventStream.listen((List<int> event) { | |
192 if (!_isOpen) { | |
193 // The actual close of the underlying stream happens asynchronously | |
194 // after the call to close. However, we start to ignore incoming events | |
195 // immediately. | |
196 return; | |
197 } | |
198 var signalsWatched = new MojoHandleSignals(event[0]); | |
199 var signalsReceived = new MojoHandleSignals(event[1]); | |
200 _isInHandler = true; | |
201 if (signalsReceived.isReadable) { | |
202 assert(_eventStream.readyRead); | |
203 handleRead(); | |
204 } | |
205 if (signalsReceived.isWritable) { | |
206 assert(_eventStream.readyWrite); | |
207 handleWrite(); | |
208 } | |
209 if (!signalsReceived.isPeerClosed) { | |
210 _eventStream.enableSignals(signalsWatched); | |
211 } | |
212 _isInHandler = false; | |
213 if (signalsReceived.isPeerClosed) { | |
214 // immediate is true here because there is no need to wait to close | |
215 // until outstanding messages are sent. The other side is gone. | |
216 close(immediate: true).then((_) { | |
217 if (onError != null) { | |
218 onError(); | |
219 } | |
220 }); | |
221 } | |
222 }, onDone: close); | |
223 return subscription; | |
224 } | |
225 | |
226 Future close({bool immediate: false}) { | |
227 var result; | |
228 _isOpen = false; | |
229 _endpoint = null; | |
230 subscription = null; | |
231 if (_eventStream != null) { | |
232 result = _eventStream.close(immediate: immediate).then((_) { | |
233 _eventStream = null; | |
234 }); | |
235 } | |
236 return result != null ? result : new Future.value(null); | |
237 } | |
238 | |
239 void handleRead() {} | |
240 void handleWrite() {} | |
241 | |
242 MojoMessagePipeEndpoint get endpoint => _endpoint; | |
243 bool get isOpen => _isOpen; | |
244 bool get isInHandler => _isInHandler; | |
245 bool get isBound => _endpoint != null; | |
246 | |
247 String toString() => "MojoEventStreamListener(" | |
248 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; | |
249 } | |
OLD | NEW |