| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 part of core; | 5 part of core; |
| 6 | 6 |
| 7 class MojoEventStream extends Stream<List<int>> { | 7 class MojoEventStream extends Stream<List<int>> { |
| 8 // The underlying Mojo handle. | 8 // The underlying Mojo handle. |
| 9 MojoHandle _handle; | 9 MojoHandle _handle; |
| 10 | 10 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) | 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) |
| 31 : _handle = handle, | 31 : _handle = handle, |
| 32 _signals = signals, | 32 _signals = signals, |
| 33 _isListening = false { | 33 _isListening = false { |
| 34 MojoResult result = MojoHandle.register(this); | 34 MojoResult result = MojoHandle.register(this); |
| 35 if (!result.isOk) { | 35 if (!result.isOk) { |
| 36 throw "Failed to register the MojoHandle: $result."; | 36 throw "Failed to register the MojoHandle: $result."; |
| 37 } | 37 } |
| 38 } | 38 } |
| 39 | 39 |
| 40 Future close() { | 40 Future close({bool immediate: false}) { |
| 41 if (_handle != null) { | 41 if (_handle != null) { |
| 42 if (_isListening) { | 42 if (_isListening) { |
| 43 return _handleWatcherClose(); | 43 return _handleWatcherClose(immediate: immediate); |
| 44 } else { | 44 } else { |
| 45 _localClose(); | 45 _localClose(); |
| 46 return new Future.value(null); | 46 return new Future.value(null); |
| 47 } | 47 } |
| 48 } | 48 } |
| 49 } | 49 } |
| 50 | 50 |
| 51 StreamSubscription<List<int>> listen(void onData(List event), | 51 StreamSubscription<List<int>> listen(void onData(List event), |
| 52 {Function onError, void onDone(), bool cancelOnError}) { | 52 {Function onError, void onDone(), bool cancelOnError}) { |
| 53 if (_isListening) { | 53 if (_isListening) { |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 85 throw "MojoHandleWatcher add failed: $res"; | 85 throw "MojoHandleWatcher add failed: $res"; |
| 86 } | 86 } |
| 87 } | 87 } |
| 88 } | 88 } |
| 89 | 89 |
| 90 void enableReadEvents() => | 90 void enableReadEvents() => |
| 91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); | 91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); |
| 92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); | 92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
| 93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); | 93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
| 94 | 94 |
| 95 Future _handleWatcherClose() { | 95 Future _handleWatcherClose({bool immediate: false}) { |
| 96 assert(_handle != null); | 96 assert(_handle != null); |
| 97 assert(MojoHandle._removeUnclosedHandle(_handle)); | 97 assert(MojoHandle._removeUnclosedHandle(_handle)); |
| 98 return MojoHandleWatcher.close(_handle.h, wait: true).then((r) { | 98 return MojoHandleWatcher.close(_handle.h, wait: !immediate).then((r) { |
| 99 if (_receivePort != null) { | 99 if (_receivePort != null) { |
| 100 _receivePort.close(); | 100 _receivePort.close(); |
| 101 _receivePort = null; | 101 _receivePort = null; |
| 102 } | 102 } |
| 103 return new MojoResult(r); | 103 return new MojoResult(r); |
| 104 }); | 104 }); |
| 105 } | 105 } |
| 106 | 106 |
| 107 void _localClose() { | 107 void _localClose() { |
| 108 assert(_handle != null); | 108 assert(_handle != null); |
| 109 _handle.close(); | 109 _handle.close(); |
| 110 _handle = null; | 110 _handle = null; |
| 111 if (_receivePort != null) { | 111 if (_receivePort != null) { |
| 112 _receivePort.close(); | 112 _receivePort.close(); |
| 113 _receivePort = null; | 113 _receivePort = null; |
| 114 } | 114 } |
| 115 } | 115 } |
| 116 | 116 |
| 117 void _onSubscriptionStateChange() { | 117 void _onSubscriptionStateChange() { |
| 118 if (!_controller.hasListener) { | 118 if (!_controller.hasListener) { |
| 119 close(); | 119 // No one is listening, close it immediately. |
| 120 close(immediate: true); |
| 120 } | 121 } |
| 121 } | 122 } |
| 122 | 123 |
| 123 void _onPauseStateChange() { | 124 void _onPauseStateChange() { |
| 124 if (_controller.isPaused) { | 125 if (_controller.isPaused) { |
| 125 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); | 126 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); |
| 126 if (!res.isOk) { | 127 if (!res.isOk) { |
| 127 throw "MojoHandleWatcher add failed: $res"; | 128 throw "MojoHandleWatcher add failed: $res"; |
| 128 } | 129 } |
| 129 } else { | 130 } else { |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 203 } | 204 } |
| 204 if (signalsReceived.isWritable) { | 205 if (signalsReceived.isWritable) { |
| 205 assert(_eventStream.readyWrite); | 206 assert(_eventStream.readyWrite); |
| 206 handleWrite(); | 207 handleWrite(); |
| 207 } | 208 } |
| 208 if (!signalsReceived.isPeerClosed) { | 209 if (!signalsReceived.isPeerClosed) { |
| 209 _eventStream.enableSignals(signalsWatched); | 210 _eventStream.enableSignals(signalsWatched); |
| 210 } | 211 } |
| 211 _isInHandler = false; | 212 _isInHandler = false; |
| 212 if (signalsReceived.isPeerClosed) { | 213 if (signalsReceived.isPeerClosed) { |
| 213 // nodefer is true here because there is no need to wait to close until | 214 // immediate is true here because there is no need to wait to close |
| 214 // outstanding messages are sent. The other side is gone. | 215 // until outstanding messages are sent. The other side is gone. |
| 215 close(nodefer: true).then((_) { | 216 close(immediate: true).then((_) { |
| 216 if (onError != null) { | 217 if (onError != null) { |
| 217 onError(); | 218 onError(); |
| 218 } | 219 } |
| 219 }); | 220 }); |
| 220 } | 221 } |
| 221 }, onDone: close); | 222 }, onDone: close); |
| 222 return subscription; | 223 return subscription; |
| 223 } | 224 } |
| 224 | 225 |
| 225 Future close({bool nodefer: false}) { | 226 Future close({bool immediate: false}) { |
| 226 var result; | 227 var result; |
| 227 _isOpen = false; | 228 _isOpen = false; |
| 228 _endpoint = null; | 229 _endpoint = null; |
| 229 subscription = null; | 230 subscription = null; |
| 230 if (_eventStream != null) { | 231 if (_eventStream != null) { |
| 231 result = _eventStream.close().then((_) { | 232 result = _eventStream.close(immediate: immediate).then((_) { |
| 232 _eventStream = null; | 233 _eventStream = null; |
| 233 }); | 234 }); |
| 234 } | 235 } |
| 235 return result != null ? result : new Future.value(null); | 236 return result != null ? result : new Future.value(null); |
| 236 } | 237 } |
| 237 | 238 |
| 238 void handleRead() {} | 239 void handleRead() {} |
| 239 void handleWrite() {} | 240 void handleWrite() {} |
| 240 | 241 |
| 241 MojoMessagePipeEndpoint get endpoint => _endpoint; | 242 MojoMessagePipeEndpoint get endpoint => _endpoint; |
| 242 bool get isOpen => _isOpen; | 243 bool get isOpen => _isOpen; |
| 243 bool get isInHandler => _isInHandler; | 244 bool get isInHandler => _isInHandler; |
| 244 bool get isBound => _endpoint != null; | 245 bool get isBound => _endpoint != null; |
| 245 | 246 |
| 246 String toString() => "MojoEventStreamListener(" | 247 String toString() => "MojoEventStreamListener(" |
| 247 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; | 248 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; |
| 248 } | 249 } |
| OLD | NEW |