| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 part of core; | 5 part of core; |
| 6 | 6 |
| 7 class MojoEventStream extends Stream<List<int>> { | 7 class MojoEventStream extends Stream<List<int>> { |
| 8 // The underlying Mojo handle. | 8 // The underlying Mojo handle. |
| 9 MojoHandle _handle; | 9 MojoHandle _handle; |
| 10 | 10 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) | 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) |
| 31 : _handle = handle, | 31 : _handle = handle, |
| 32 _signals = signals, | 32 _signals = signals, |
| 33 _isListening = false { | 33 _isListening = false { |
| 34 MojoResult result = MojoHandle.register(this); | 34 MojoResult result = MojoHandle.register(this); |
| 35 if (!result.isOk) { | 35 if (!result.isOk) { |
| 36 throw "Failed to register the MojoHandle: $result."; | 36 throw "Failed to register the MojoHandle: $result."; |
| 37 } | 37 } |
| 38 } | 38 } |
| 39 | 39 |
| 40 void close() { | 40 Future close() { |
| 41 if (_handle != null) { | 41 if (_handle != null) { |
| 42 if (_isListening) { | 42 if (_isListening) { |
| 43 MojoHandleWatcher.close(_handle); | 43 return _handleWatcherClose(); |
| 44 } else { | 44 } else { |
| 45 _handle.close(); | 45 _localClose(); |
| 46 return new Future.value(null); |
| 46 } | 47 } |
| 47 _handle = null; | |
| 48 } | |
| 49 if (_receivePort != null) { | |
| 50 _receivePort.close(); | |
| 51 _receivePort = null; | |
| 52 } | 48 } |
| 53 } | 49 } |
| 54 | 50 |
| 55 StreamSubscription<List<int>> listen(void onData(List event), | 51 StreamSubscription<List<int>> listen(void onData(List event), |
| 56 {Function onError, void onDone(), bool cancelOnError}) { | 52 {Function onError, void onDone(), bool cancelOnError}) { |
| 57 if (_isListening) { | 53 if (_isListening) { |
| 58 throw "Listen has already been called: $_handle."; | 54 throw "Listen has already been called: $_handle."; |
| 59 } | 55 } |
| 60 _receivePort = new ReceivePort(); | 56 _receivePort = new ReceivePort(); |
| 61 _sendPort = _receivePort.sendPort; | 57 _sendPort = _receivePort.sendPort; |
| (...skipping 25 matching lines...) Expand all Loading... |
| 87 throw "MojoHandleWatcher add failed: $res"; | 83 throw "MojoHandleWatcher add failed: $res"; |
| 88 } | 84 } |
| 89 } | 85 } |
| 90 } | 86 } |
| 91 | 87 |
| 92 void enableReadEvents() => | 88 void enableReadEvents() => |
| 93 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); | 89 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); |
| 94 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); | 90 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
| 95 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); | 91 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
| 96 | 92 |
| 93 Future _handleWatcherClose() { |
| 94 assert(_handle != null); |
| 95 return MojoHandleWatcher.close(_handle, wait: true).then((_) { |
| 96 if (_receivePort != null) { |
| 97 _receivePort.close(); |
| 98 _receivePort = null; |
| 99 } |
| 100 }); |
| 101 } |
| 102 |
| 103 void _localClose() { |
| 104 assert(_handle != null); |
| 105 _handle.close(); |
| 106 _handle = null; |
| 107 if (_receivePort != null) { |
| 108 _receivePort.close(); |
| 109 _receivePort = null; |
| 110 } |
| 111 } |
| 112 |
| 97 void _onSubscriptionStateChange() { | 113 void _onSubscriptionStateChange() { |
| 98 if (!_controller.hasListener) { | 114 if (!_controller.hasListener) { |
| 99 close(); | 115 close(); |
| 100 } | 116 } |
| 101 } | 117 } |
| 102 | 118 |
| 103 void _onPauseStateChange() { | 119 void _onPauseStateChange() { |
| 104 if (_controller.isPaused) { | 120 if (_controller.isPaused) { |
| 105 var res = MojoHandleWatcher.remove(_handle); | 121 var res = MojoHandleWatcher.remove(_handle); |
| 106 if (!res.isOk) { | 122 if (!res.isOk) { |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 160 assert(!isBound); | 176 assert(!isBound); |
| 161 _endpoint = new MojoMessagePipeEndpoint(handle); | 177 _endpoint = new MojoMessagePipeEndpoint(handle); |
| 162 _eventStream = new MojoEventStream(handle); | 178 _eventStream = new MojoEventStream(handle); |
| 163 _isOpen = false; | 179 _isOpen = false; |
| 164 } | 180 } |
| 165 | 181 |
| 166 StreamSubscription<List<int>> listen() { | 182 StreamSubscription<List<int>> listen() { |
| 167 assert(isBound && (subscription == null)); | 183 assert(isBound && (subscription == null)); |
| 168 _isOpen = true; | 184 _isOpen = true; |
| 169 subscription = _eventStream.listen((List<int> event) { | 185 subscription = _eventStream.listen((List<int> event) { |
| 186 if (!_isOpen) { |
| 187 // The actual close of the underlying stream happens asynchronously |
| 188 // after the call to close. However, we start to ignore incoming events |
| 189 // immediately. |
| 190 return; |
| 191 } |
| 170 var signalsWatched = new MojoHandleSignals(event[0]); | 192 var signalsWatched = new MojoHandleSignals(event[0]); |
| 171 var signalsReceived = new MojoHandleSignals(event[1]); | 193 var signalsReceived = new MojoHandleSignals(event[1]); |
| 172 _isInHandler = true; | 194 _isInHandler = true; |
| 173 if (signalsReceived.isReadable) { | 195 if (signalsReceived.isReadable) { |
| 174 assert(_eventStream.readyRead); | 196 assert(_eventStream.readyRead); |
| 175 handleRead(); | 197 handleRead(); |
| 176 } | 198 } |
| 177 if (signalsReceived.isWritable) { | 199 if (signalsReceived.isWritable) { |
| 178 assert(_eventStream.readyWrite); | 200 assert(_eventStream.readyWrite); |
| 179 handleWrite(); | 201 handleWrite(); |
| 180 } | 202 } |
| 181 if (_isOpen) { | 203 if (_isOpen) { |
| 182 _eventStream.enableSignals(signalsWatched); | 204 _eventStream.enableSignals(signalsWatched); |
| 183 } | 205 } |
| 184 _isInHandler = false; | 206 _isInHandler = false; |
| 185 if (signalsReceived.isPeerClosed) { | 207 if (signalsReceived.isPeerClosed) { |
| 186 if (onError != null) onError(); | 208 if (onError != null) { |
| 209 onError(); |
| 210 } |
| 187 close(); | 211 close(); |
| 188 // The peer being closed obviates any other signal we might | |
| 189 // have received since we won't be able to read or write the handle. | |
| 190 // Thus, we just return before invoking other handlers. | |
| 191 return; | |
| 192 } | 212 } |
| 193 }, onDone: close); | 213 }, onDone: close); |
| 194 return subscription; | 214 return subscription; |
| 195 } | 215 } |
| 196 | 216 |
| 197 void close() { | 217 Future close() { |
| 218 var result; |
| 198 _isOpen = false; | 219 _isOpen = false; |
| 199 _endpoint = null; | 220 _endpoint = null; |
| 200 subscription = null; | 221 subscription = null; |
| 201 if (_eventStream != null) { | 222 if (_eventStream != null) { |
| 202 _eventStream.close(); | 223 result = _eventStream.close().then((_) { |
| 203 _eventStream = null; | 224 _eventStream = null; |
| 225 }); |
| 204 } | 226 } |
| 227 return result != null ? result : new Future.value(null); |
| 205 } | 228 } |
| 206 | 229 |
| 207 void handleRead() {} | 230 void handleRead() {} |
| 208 void handleWrite() {} | 231 void handleWrite() {} |
| 209 | 232 |
| 210 MojoMessagePipeEndpoint get endpoint => _endpoint; | 233 MojoMessagePipeEndpoint get endpoint => _endpoint; |
| 211 bool get isOpen => _isOpen; | 234 bool get isOpen => _isOpen; |
| 212 bool get isInHandler => _isInHandler; | 235 bool get isInHandler => _isInHandler; |
| 213 bool get isBound => _endpoint != null; | 236 bool get isBound => _endpoint != null; |
| 214 | 237 |
| 215 String toString() => "MojoEventStreamListener(" | 238 String toString() => "MojoEventStreamListener(" |
| 216 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; | 239 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; |
| 217 } | 240 } |
| OLD | NEW |