Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(101)

Side by Side Diff: mojo/public/dart/src/event_stream.dart

Issue 1060193002: Provide mechanism to close immediately to Dart bindings (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/dart/src/application_connection.dart ('k') | mojo/public/dart/src/proxy.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 part of core; 5 part of core;
6 6
7 class MojoEventStream extends Stream<List<int>> { 7 class MojoEventStream extends Stream<List<int>> {
8 // The underlying Mojo handle. 8 // The underlying Mojo handle.
9 MojoHandle _handle; 9 MojoHandle _handle;
10 10
(...skipping 19 matching lines...) Expand all
30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
31 : _handle = handle, 31 : _handle = handle,
32 _signals = signals, 32 _signals = signals,
33 _isListening = false { 33 _isListening = false {
34 MojoResult result = MojoHandle.register(this); 34 MojoResult result = MojoHandle.register(this);
35 if (!result.isOk) { 35 if (!result.isOk) {
36 throw "Failed to register the MojoHandle: $result."; 36 throw "Failed to register the MojoHandle: $result.";
37 } 37 }
38 } 38 }
39 39
40 Future close() { 40 Future close({bool immediate: false}) {
41 if (_handle != null) { 41 if (_handle != null) {
42 if (_isListening) { 42 if (_isListening) {
43 return _handleWatcherClose(); 43 return _handleWatcherClose(immediate: immediate);
44 } else { 44 } else {
45 _localClose(); 45 _localClose();
46 return new Future.value(null); 46 return new Future.value(null);
47 } 47 }
48 } 48 }
49 } 49 }
50 50
51 StreamSubscription<List<int>> listen(void onData(List event), 51 StreamSubscription<List<int>> listen(void onData(List event),
52 {Function onError, void onDone(), bool cancelOnError}) { 52 {Function onError, void onDone(), bool cancelOnError}) {
53 if (_isListening) { 53 if (_isListening) {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
85 throw "MojoHandleWatcher add failed: $res"; 85 throw "MojoHandleWatcher add failed: $res";
86 } 86 }
87 } 87 }
88 } 88 }
89 89
90 void enableReadEvents() => 90 void enableReadEvents() =>
91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); 91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE);
92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); 92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); 93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
94 94
95 Future _handleWatcherClose() { 95 Future _handleWatcherClose({bool immediate: false}) {
96 assert(_handle != null); 96 assert(_handle != null);
97 assert(MojoHandle._removeUnclosedHandle(_handle)); 97 assert(MojoHandle._removeUnclosedHandle(_handle));
98 return MojoHandleWatcher.close(_handle.h, wait: true).then((r) { 98 return MojoHandleWatcher.close(_handle.h, wait: !immediate).then((r) {
99 if (_receivePort != null) { 99 if (_receivePort != null) {
100 _receivePort.close(); 100 _receivePort.close();
101 _receivePort = null; 101 _receivePort = null;
102 } 102 }
103 return new MojoResult(r); 103 return new MojoResult(r);
104 }); 104 });
105 } 105 }
106 106
107 void _localClose() { 107 void _localClose() {
108 assert(_handle != null); 108 assert(_handle != null);
109 _handle.close(); 109 _handle.close();
110 _handle = null; 110 _handle = null;
111 if (_receivePort != null) { 111 if (_receivePort != null) {
112 _receivePort.close(); 112 _receivePort.close();
113 _receivePort = null; 113 _receivePort = null;
114 } 114 }
115 } 115 }
116 116
117 void _onSubscriptionStateChange() { 117 void _onSubscriptionStateChange() {
118 if (!_controller.hasListener) { 118 if (!_controller.hasListener) {
119 close(); 119 // No one is listening, close it immediately.
120 close(immediate: true);
120 } 121 }
121 } 122 }
122 123
123 void _onPauseStateChange() { 124 void _onPauseStateChange() {
124 if (_controller.isPaused) { 125 if (_controller.isPaused) {
125 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); 126 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h));
126 if (!res.isOk) { 127 if (!res.isOk) {
127 throw "MojoHandleWatcher add failed: $res"; 128 throw "MojoHandleWatcher add failed: $res";
128 } 129 }
129 } else { 130 } else {
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
203 } 204 }
204 if (signalsReceived.isWritable) { 205 if (signalsReceived.isWritable) {
205 assert(_eventStream.readyWrite); 206 assert(_eventStream.readyWrite);
206 handleWrite(); 207 handleWrite();
207 } 208 }
208 if (!signalsReceived.isPeerClosed) { 209 if (!signalsReceived.isPeerClosed) {
209 _eventStream.enableSignals(signalsWatched); 210 _eventStream.enableSignals(signalsWatched);
210 } 211 }
211 _isInHandler = false; 212 _isInHandler = false;
212 if (signalsReceived.isPeerClosed) { 213 if (signalsReceived.isPeerClosed) {
213 // nodefer is true here because there is no need to wait to close until 214 // immediate is true here because there is no need to wait to close
214 // outstanding messages are sent. The other side is gone. 215 // until outstanding messages are sent. The other side is gone.
215 close(nodefer: true).then((_) { 216 close(immediate: true).then((_) {
216 if (onError != null) { 217 if (onError != null) {
217 onError(); 218 onError();
218 } 219 }
219 }); 220 });
220 } 221 }
221 }, onDone: close); 222 }, onDone: close);
222 return subscription; 223 return subscription;
223 } 224 }
224 225
225 Future close({bool nodefer: false}) { 226 Future close({bool immediate: false}) {
226 var result; 227 var result;
227 _isOpen = false; 228 _isOpen = false;
228 _endpoint = null; 229 _endpoint = null;
229 subscription = null; 230 subscription = null;
230 if (_eventStream != null) { 231 if (_eventStream != null) {
231 result = _eventStream.close().then((_) { 232 result = _eventStream.close(immediate: immediate).then((_) {
232 _eventStream = null; 233 _eventStream = null;
233 }); 234 });
234 } 235 }
235 return result != null ? result : new Future.value(null); 236 return result != null ? result : new Future.value(null);
236 } 237 }
237 238
238 void handleRead() {} 239 void handleRead() {}
239 void handleWrite() {} 240 void handleWrite() {}
240 241
241 MojoMessagePipeEndpoint get endpoint => _endpoint; 242 MojoMessagePipeEndpoint get endpoint => _endpoint;
242 bool get isOpen => _isOpen; 243 bool get isOpen => _isOpen;
243 bool get isInHandler => _isInHandler; 244 bool get isInHandler => _isInHandler;
244 bool get isBound => _endpoint != null; 245 bool get isBound => _endpoint != null;
245 246
246 String toString() => "MojoEventStreamListener(" 247 String toString() => "MojoEventStreamListener("
247 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; 248 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
248 } 249 }
OLDNEW
« no previous file with comments | « mojo/public/dart/src/application_connection.dart ('k') | mojo/public/dart/src/proxy.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698