OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 part of core; | |
6 | |
7 class MojoEventStream extends Stream<int> { | |
8 // The underlying Mojo handle. | |
9 MojoHandle _handle; | |
10 | |
11 // Providing our own stream controller allows us to take custom actions when | |
12 // listeners pause/resume/etc. their StreamSubscription. | |
13 StreamController _controller; | |
14 | |
15 // The send port that we give to the handle watcher to notify us of handle | |
16 // events. | |
17 SendPort _sendPort; | |
18 | |
19 // The receive port on which we listen and receive events from the handle | |
20 // watcher. | |
21 ReceivePort _receivePort; | |
22 | |
23 // The signals on this handle that we're interested in. | |
24 MojoHandleSignals _signals; | |
25 | |
26 // Whether listen has been called. | |
27 bool _isListening; | |
28 | |
29 MojoEventStream(MojoHandle handle, | |
30 [MojoHandleSignals signals = MojoHandleSignals.READABLE]) : | |
31 _handle = handle, | |
32 _signals = signals, | |
33 _isListening = false { | |
34 MojoResult result = MojoHandle.register(this); | |
35 if (!result.isOk) { | |
36 throw "Failed to register the MojoHandle: $result."; | |
37 } | |
38 } | |
39 | |
40 void close() { | |
41 if (_handle != null) { | |
42 MojoHandleWatcher.close(_handle); | |
43 _handle = null; | |
44 } | |
45 if (_receivePort != null) { | |
46 _receivePort.close(); | |
47 _receivePort = null; | |
48 } | |
49 } | |
50 | |
51 StreamSubscription<List<int>> listen( | |
52 void onData(List event), | |
53 {Function onError, void onDone(), bool cancelOnError}) { | |
54 if (_isListening) { | |
55 throw "Listen has already been called: $_handle."; | |
56 } | |
57 _receivePort = new ReceivePort(); | |
58 _sendPort = _receivePort.sendPort; | |
59 _controller = new StreamController(sync: true, | |
60 onListen: _onSubscriptionStateChange, | |
61 onCancel: _onSubscriptionStateChange, | |
62 onPause: _onPauseStateChange, | |
63 onResume: _onPauseStateChange); | |
64 _controller.addStream(_receivePort); | |
65 | |
66 if (_signals != MojoHandleSignals.NONE) { | |
67 var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value); | |
68 if (!res.isOk) { | |
69 throw "MojoHandleWatcher add failed: $res"; | |
70 } | |
71 } | |
72 | |
73 _isListening = true; | |
74 return _controller.stream.listen( | |
75 onData, | |
76 onError: onError, | |
77 onDone: onDone, | |
78 cancelOnError: cancelOnError); | |
79 } | |
80 | |
81 void enableSignals(MojoHandleSignals signals) { | |
82 _signals = signals; | |
83 if (_isListening) { | |
84 var res = MojoHandleWatcher.add(_handle, _sendPort, signals.value); | |
85 if (!res.isOk) { | |
86 throw "MojoHandleWatcher add failed: $res"; | |
87 } | |
88 } | |
89 } | |
90 | |
91 void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE); | |
92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); | |
93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); | |
94 | |
95 void _onSubscriptionStateChange() { | |
96 if (!_controller.hasListener) { | |
97 close(); | |
98 } | |
99 } | |
100 | |
101 void _onPauseStateChange() { | |
102 if (_controller.isPaused) { | |
103 var res = MojoHandleWatcher.remove(_handle); | |
104 if (!res.isOk) { | |
105 throw "MojoHandleWatcher add failed: $res"; | |
106 } | |
107 } else { | |
108 var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value); | |
109 if (!res.isOk) { | |
110 throw "MojoHandleWatcher add failed: $res"; | |
111 } | |
112 } | |
113 } | |
114 | |
115 bool get readyRead => _handle.readyRead; | |
116 bool get readyWrite => _handle.readyWrite; | |
117 | |
118 String toString() => "$_handle"; | |
119 } | |
120 | |
121 | |
122 class MojoEventStreamListener { | |
123 MojoMessagePipeEndpoint _endpoint; | |
124 MojoEventStream _eventStream; | |
125 bool _isOpen = false; | |
126 bool _isInHandler = false; | |
127 | |
128 MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) : | |
129 _endpoint = endpoint, | |
130 _eventStream = new MojoEventStream(endpoint.handle), | |
131 _isOpen = false; | |
132 | |
133 MojoEventStreamListener.fromHandle(MojoHandle handle) { | |
134 _endpoint = new MojoMessagePipeEndpoint(handle); | |
135 _eventStream = new MojoEventStream(handle); | |
136 _isOpen = false; | |
137 } | |
138 | |
139 MojoEventStreamListener.unbound() : | |
140 _endpoint = null, | |
141 _eventStream = null, | |
142 _isOpen = false; | |
143 | |
144 void bind(MojoMessagePipeEndpoint endpoint) { | |
145 assert(!isBound); | |
146 _endpoint = endpoint; | |
147 _eventStream = new MojoEventStream(endpoint.handle); | |
148 _isOpen = false; | |
149 } | |
150 | |
151 void bindFromHandle(MojoHandle handle) { | |
152 assert(!isBound); | |
153 _endpoint = new MojoMessagePipeEndpoint(handle); | |
154 _eventStream = new MojoEventStream(handle); | |
155 _isOpen = false; | |
156 } | |
157 | |
158 StreamSubscription<int> listen() { | |
159 _isOpen = true; | |
160 return _eventStream.listen((List<int> event) { | |
161 var signalsWatched = new MojoHandleSignals(event[0]); | |
162 var signalsReceived = new MojoHandleSignals(event[1]); | |
163 if (signalsReceived.isPeerClosed) { | |
164 handlePeerClosed(); | |
165 // The peer being closed obviates any other signal we might | |
166 // have received since we won't be able to read or write the handle. | |
167 // Thus, we just return before invoking other handlers. | |
168 return; | |
169 } | |
170 _isInHandler = true; | |
171 if (signalsReceived.isReadable) { | |
172 assert(_eventStream.readyRead); | |
173 handleRead(); | |
174 } | |
175 if (signalsReceived.isWritable) { | |
176 assert(_eventStream.readyWrite); | |
177 handleWrite(); | |
178 } | |
179 _eventStream.enableSignals(enableSignals( | |
180 signalsWatched, signalsReceived)); | |
181 _isInHandler = false; | |
182 }); | |
183 } | |
184 | |
185 void close() { | |
186 if (_isOpen) { | |
187 _eventStream.close(); | |
188 _isOpen = false; | |
189 _eventStream = null; | |
190 _endpoint = null; | |
191 } | |
192 } | |
193 | |
194 void handleRead() {} | |
195 void handleWrite() {} | |
196 void handlePeerClosed() { | |
197 close(); | |
198 } | |
199 | |
200 MojoHandleSignals enableSignals(MojoHandleSignals watched, | |
201 MojoHandleSignals received) => watched; | |
202 | |
203 MojoMessagePipeEndpoint get endpoint => _endpoint; | |
204 bool get isOpen => _isOpen; | |
205 bool get isInHandler => _isInHandler; | |
206 bool get isBound => _endpoint != null; | |
207 } | |
OLD | NEW |