Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(185)

Side by Side Diff: mojo/public/dart/src/event_stream.dart

Issue 1132063007: Rationalize Dart mojo and sky package structure (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/dart/src/drain_data.dart ('k') | mojo/public/dart/src/handle.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 part of core;
6
7 class MojoEventStream extends Stream<List<int>> {
8 // The underlying Mojo handle.
9 MojoHandle _handle;
10
11 // Providing our own stream controller allows us to take custom actions when
12 // listeners pause/resume/etc. their StreamSubscription.
13 StreamController _controller;
14
15 // The send port that we give to the handle watcher to notify us of handle
16 // events.
17 SendPort _sendPort;
18
19 // The receive port on which we listen and receive events from the handle
20 // watcher.
21 ReceivePort _receivePort;
22
23 // The signals on this handle that we're interested in.
24 MojoHandleSignals _signals;
25
26 // Whether listen has been called.
27 bool _isListening;
28
29 MojoEventStream(MojoHandle handle,
30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
31 : _handle = handle,
32 _signals = signals,
33 _isListening = false {
34 MojoResult result = MojoHandle.register(this);
35 if (!result.isOk) {
36 throw "Failed to register the MojoHandle: $result.";
37 }
38 }
39
40 Future close({bool immediate: false}) {
41 if (_handle != null) {
42 if (_isListening) {
43 return _handleWatcherClose(immediate: immediate);
44 } else {
45 _localClose();
46 return new Future.value(null);
47 }
48 }
49 }
50
51 StreamSubscription<List<int>> listen(void onData(List event),
52 {Function onError, void onDone(), bool cancelOnError}) {
53 if (_isListening) {
54 throw "Listen has already been called: $_handle.";
55 }
56 _receivePort = new ReceivePort();
57 _sendPort = _receivePort.sendPort;
58 _controller = new StreamController(
59 sync: true,
60 onListen: _onSubscriptionStateChange,
61 onCancel: _onSubscriptionStateChange,
62 onPause: _onPauseStateChange,
63 onResume: _onPauseStateChange);
64 _controller.addStream(_receivePort).whenComplete(_controller.close);
65
66 if (_signals != MojoHandleSignals.NONE) {
67 var res = new MojoResult(
68 MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
69 if (!res.isOk) {
70 throw "MojoHandleWatcher add failed: $res";
71 }
72 }
73
74 _isListening = true;
75 return _controller.stream.listen(onData,
76 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
77 }
78
79 void enableSignals(MojoHandleSignals signals) {
80 _signals = signals;
81 if (_isListening) {
82 var res = new MojoResult(
83 MojoHandleWatcher.add(_handle.h, _sendPort, signals.value));
84 if (!res.isOk) {
85 throw "MojoHandleWatcher add failed: $res";
86 }
87 }
88 }
89
90 void enableReadEvents() =>
91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE);
92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
94
95 Future _handleWatcherClose({bool immediate: false}) {
96 assert(_handle != null);
97 assert(MojoHandle._removeUnclosedHandle(_handle));
98 return MojoHandleWatcher.close(_handle.h, wait: !immediate).then((r) {
99 if (_receivePort != null) {
100 _receivePort.close();
101 _receivePort = null;
102 }
103 return new MojoResult(r);
104 });
105 }
106
107 void _localClose() {
108 assert(_handle != null);
109 _handle.close();
110 _handle = null;
111 if (_receivePort != null) {
112 _receivePort.close();
113 _receivePort = null;
114 }
115 }
116
117 void _onSubscriptionStateChange() {
118 if (!_controller.hasListener) {
119 // No one is listening, close it immediately.
120 close(immediate: true);
121 }
122 }
123
124 void _onPauseStateChange() {
125 if (_controller.isPaused) {
126 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h));
127 if (!res.isOk) {
128 throw "MojoHandleWatcher add failed: $res";
129 }
130 } else {
131 var res = new MojoResult(
132 MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
133 if (!res.isOk) {
134 throw "MojoHandleWatcher add failed: $res";
135 }
136 }
137 }
138
139 bool get readyRead => _handle.readyRead;
140 bool get readyWrite => _handle.readyWrite;
141
142 String toString() => "$_handle";
143 }
144
145 typedef void ErrorHandler();
146
147 class MojoEventStreamListener {
148 MojoMessagePipeEndpoint _endpoint;
149 MojoEventStream _eventStream;
150 bool _isOpen = false;
151 bool _isInHandler = false;
152 StreamSubscription subscription;
153 ErrorHandler onError;
154
155 MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint)
156 : _endpoint = endpoint,
157 _eventStream = new MojoEventStream(endpoint.handle),
158 _isOpen = false {
159 listen();
160 }
161
162 MojoEventStreamListener.fromHandle(MojoHandle handle) {
163 _endpoint = new MojoMessagePipeEndpoint(handle);
164 _eventStream = new MojoEventStream(handle);
165 _isOpen = false;
166 listen();
167 }
168
169 MojoEventStreamListener.unbound()
170 : _endpoint = null,
171 _eventStream = null,
172 _isOpen = false;
173
174 void bind(MojoMessagePipeEndpoint endpoint) {
175 assert(!isBound);
176 _endpoint = endpoint;
177 _eventStream = new MojoEventStream(endpoint.handle);
178 _isOpen = false;
179 }
180
181 void bindFromHandle(MojoHandle handle) {
182 assert(!isBound);
183 _endpoint = new MojoMessagePipeEndpoint(handle);
184 _eventStream = new MojoEventStream(handle);
185 _isOpen = false;
186 }
187
188 StreamSubscription<List<int>> listen() {
189 assert(isBound && (subscription == null));
190 _isOpen = true;
191 subscription = _eventStream.listen((List<int> event) {
192 if (!_isOpen) {
193 // The actual close of the underlying stream happens asynchronously
194 // after the call to close. However, we start to ignore incoming events
195 // immediately.
196 return;
197 }
198 var signalsWatched = new MojoHandleSignals(event[0]);
199 var signalsReceived = new MojoHandleSignals(event[1]);
200 _isInHandler = true;
201 if (signalsReceived.isReadable) {
202 assert(_eventStream.readyRead);
203 handleRead();
204 }
205 if (signalsReceived.isWritable) {
206 assert(_eventStream.readyWrite);
207 handleWrite();
208 }
209 if (!signalsReceived.isPeerClosed) {
210 _eventStream.enableSignals(signalsWatched);
211 }
212 _isInHandler = false;
213 if (signalsReceived.isPeerClosed) {
214 // immediate is true here because there is no need to wait to close
215 // until outstanding messages are sent. The other side is gone.
216 close(immediate: true).then((_) {
217 if (onError != null) {
218 onError();
219 }
220 });
221 }
222 }, onDone: close);
223 return subscription;
224 }
225
226 Future close({bool immediate: false}) {
227 var result;
228 _isOpen = false;
229 _endpoint = null;
230 subscription = null;
231 if (_eventStream != null) {
232 result = _eventStream.close(immediate: immediate).then((_) {
233 _eventStream = null;
234 });
235 }
236 return result != null ? result : new Future.value(null);
237 }
238
239 void handleRead() {}
240 void handleWrite() {}
241
242 MojoMessagePipeEndpoint get endpoint => _endpoint;
243 bool get isOpen => _isOpen;
244 bool get isInHandler => _isInHandler;
245 bool get isBound => _endpoint != null;
246
247 String toString() => "MojoEventStreamListener("
248 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
249 }
OLDNEW
« no previous file with comments | « mojo/public/dart/src/drain_data.dart ('k') | mojo/public/dart/src/handle.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698