Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: lib/src/stream_events.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.streams.stream_events;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import "subscription_stream.dart";
11 import "stream_completer.dart";
12
13 /// An asynchronous pull-based interface for accessing stream events.
14 ///
15 /// Wraps a stream and makes individual events available on request.
16 class StreamEvents<T> {
ahe 2015/06/08 16:55:16 It took me quite some time to realize this is the
Lasse Reichstein Nielsen 2015/06/09 07:33:53 This is a little hard to name well. PullStream wo
17 /// In the initial state, the stream has not been listened to yet.
18 /// It will be listened to when the first event is requested.
19 /// The `stateData` field holds the stream and the request queue is empty.
20 static const int _INITIAL = 0;
21
22 /// Listening on the stream.
23 /// If the request queue is empty and the subscription isn't done,
24 /// the subscription is paused.
25 /// The `stateData` field holds the subscription.
26 static const int _LISTENING = 1;
27
28 /// The stream has completed.
29 /// The `stateData` field is `null` and the request queue is empty.
30 static const int _DONE = 2;
31
32 /// Flag set when [close] is called.
33 /// The `StreamEvents` is closed and no further events can be requested.
34 /// While set, the last elmement of the request queue is an
35 /// [_EventCloseAction].
36 static const int _CLOSED = 8;
37
38 /// Flag set when [rest] is called.
39 /// Only used for error reporting, otherwise equivalent to [_CLOSED].
40 static const int _CLOSED_REST = 12;
41
42 /// Current state.
43 ///
44 /// Use getters below to check if the state is [_isListening] or [_isDone],
45 /// and whether the stream events object [_isClosed].
46 int _state = _INITIAL;
47
48 /// Value depending on state. Use getters below to get the value and assert
49 /// the expected state.
50 var _stateData;
51
52 /// Queue of pending requests while state is [_LISTENING].
53 /// Access through methods below to ensure consistency.
54 Queue<_EventAction> _requestQueue = new Queue();
55
56 StreamEvents(Stream source) : _stateData = source;
57
58 bool get _isListening => (_state & _LISTENING) != 0;
59 bool get _isClosed => (_state & _CLOSED) != 0;
60 bool get _isDone => (_state & _DONE) != 0;
61
62 /// Whether the underlying stream is spent.
63 ///
64 /// This may return true before all events have been delivered.
65 /// Requesting a new event when [isDone] returns true,
66 /// for example using [next], will always fail.
67 bool get isDone => _isDone;
68
69 /// Return the stream subscription while state is listening.
70 StreamSubscription get _subscription {
71 assert(_isListening);
72 return _stateData;
73 }
74
75 /// Return the source stream while state is initial.
76 Stream get _sourceStream {
77 assert(!_isListening);
78 assert(!_isDone);
79 return _stateData;
80 }
81
82 // Set the subscription and transition to listening state.
83 void set _subscription(StreamSubscription subscription) {
84 assert(!_isListening);
85 assert(!_isDone);
86 _stateData = subscription;
87 _state |= _LISTENING;
88 }
89
90 void _setDone() {
91 assert(!_isDone);
92 _state = (_state & _CLOSED_REST) | _DONE;
93 _stateData = null;
94 }
95
96 /// Request the next (yet unrequested) event from the stream.
97 ///
98 /// When the requested event arrives, the returned future is completed with
99 /// the event. This is independent of whether the event is a data event or
100 /// an error event.
101 ///
102 /// If the stream closed before an event arrives, the future is completed
103 /// with a [StateError].
104 Future<T> get next {
105 if (!_isClosed) {
106 Completer completer = new Completer<T>();
107 _addAction(new _NextAction(completer));
108 return completer.future;
109 }
110 throw _failClosed();
111 }
112
113 /// Request a stream of all the remaning events of the source stream.
114 ///
115 /// All requested [next], [skip] or [take] operations are completed
116 /// first, and then any remaining events are provided as events of
117 /// the returned stream.
118 ///
119 /// Using `rest` closes the stream events object. After getting the
120 /// `rest` it is no longer allowed to request other events, like
121 /// after calling [close].
122 Stream<T> get rest {
123 if (!_isClosed) {
124 _state |= _CLOSED_REST;
125 if (_isListening) {
126 // We have an active subscription that we want to take over.
127 var delayStream = new StreamCompleter<T>();
128 _addAction(new _RestAction<T>(delayStream));
129 return delayStream.stream;
130 }
131 assert(_requestQueue.isEmpty);
132 if (isDone) {
133 // TODO(lrn): Add Stream.empty() constructor.
134 return new Stream<T>.fromIterable(const []);
135 }
136 // We have never listened the source stream, so just return that directly.
137 Stream result = _sourceStream;
138 _setDone();
139 return result;
140 }
141 throw _failClosed();
142 }
143
144 /// Requests to skip the next [count] *data* events.
145 ///
146 /// The [count] value must be greater than zero.
147 ///
148 /// When successful, this is equivalent to using [take]
149 /// and ignoring the result.
150 ///
151 /// If an error occurs before `count` data events has
152 /// been skipped, the returned future completes with
153 /// that error instead.
154 ///
155 /// If the stream closes before `count` data events,
156 /// the remaining unskipped event count is returned.
157 /// If the returned future completes with the integer `0`,
158 /// then all events were succssfully skipped. If the value
159 /// is greater than zero then the stream ended early.
160 Future<int> skip(int count) {
161 if (count <= 0) throw new RangeError.range(count, 0, null, "count");
162 if (!_isClosed) {
163 Completer completer = new Completer<int>();
164 _addAction(new _SkipAction(completer, count));
165 return completer.future;
166 }
167 throw _failClosed();
168 }
169
170 /// Requests the next [count] data events as a list.
171 ///
172 /// The [count] value must be greater than zero.
173 ///
174 /// Equivalent to calling [next] `count` times and
175 /// storing the data values in a list.
176 ///
177 /// If an error occurs before `count` data events has
178 /// been collected, the returned future completes with
179 /// that error instead.
180 ///
181 /// If the stream closes before `count` data events,
182 /// the returned future completes with the list
183 /// of data collected so far. That is, the returned
184 /// list may have fewer than [count] elements.
185 Future<List<T>> take(int count) {
186 if (count <= 0) throw new RangeError.range(count, 0, null, "count");
187 if (!_isClosed) {
188 Completer completer = new Completer<List<T>>();
189 _addAction(new _TakeAction(completer, count));
190 return completer.future;
191 }
192 throw _failClosed();
193 }
194
195 /// Release the underlying stream subscription.
196 ///
197 /// The close operation waits until all previously requested
198 /// events have been processed, then it cancels the subscription
199 /// providing the events.
200 ///
201 /// The returned future completes with the result of calling
202 /// `cancel`.
203 ///
204 /// After calling `close`, no further events can be requested.
205 /// None of [next], [rest], [skip], [take] or [close] may be
206 /// called again.
207 Future close() {
208 if (!_isClosed) {
209 _state |= _CLOSED;
210 if (!_isListening) {
211 assert(_requestQueue.isEmpty);
212 if (!_isDone) _setDone();
213 return new Future.value();
214 }
215 Completer completer = new Completer();
216 _addAction(new _CloseAction(completer));
217 return completer.future;
218 }
219 throw _failClosed();
220 }
221
222 /// Reused error message.
223 Error _failClosed() {
224 String cause =
225 ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close";
226 return new StateError("Already closed by a call to $cause");
227 }
228
229 /// Called when requesting an event when the requeust queue is empty.
230 /// The underlying subscription is paused in that case, except the very
231 /// first time when the subscription needs to be created.
232 void _listen() {
233 if (_isListening) {
234 _subscription.resume();
235 } else if (!_isDone) {
236 _subscription =
237 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
238 }
239 }
240
241 /// Pauses the underlying subscription.
242 /// Called when the request queue is empty and the subscription isn't closed.
243 void _pause() {
244 assert(_isListening);
245 _subscription.pause();
246 }
247
248 // Callbacks receiving the events of the source stream.
249 void _onData(T data) {
250 assert(_requestQueue.isNotEmpty);
251 _EventAction action = _nextAction;
252 if (action.data(data)) {
253 _completeAction();
254 }
255 }
256
257 void _onError(error, StackTrace stack) {
258 assert(_requestQueue.isNotEmpty);
259 _EventAction action = _nextAction;
260 action.error(error, stack);
261 _completeAction();
262 }
263
264 void _onDone() {
265 _setDone();
266 _flushQueue();
267 }
268
269 // Request queue management.
270
271 /// Get the next action in the queue, but don't remove it.
272 _EventAction get _nextAction {
273 assert(_requestQueue.isNotEmpty);
274 return _requestQueue.first;
275 }
276
277 /// Add a new request to the queue.
278 void _addAction(_EventAction action) {
279 if (_isDone) {
280 action.done();
281 return;
282 }
283 if (_requestQueue.isEmpty) {
284 _listen();
285 }
286 _requestQueue.add(action);
287 _checkClose();
288 }
289
290 /// Remove all requests and call their `done` event.
291 ///
292 /// Used when the source stream dries up.
293 void _flushQueue() {
294 while (_requestQueue.isNotEmpty) {
295 _requestQueue.removeFirst().done();
296 }
297 }
298
299 /// Remove a completed action from the queue.
300 ///
301 /// An action is complete when its `data` or `error` handler
302 /// returns true. For actions that use multiple
303 void _completeAction() {
304 _requestQueue.removeFirst();
305 if (_requestQueue.isEmpty) {
306 _pause();
307 } else {
308 _checkClose();
309 }
310 }
311
312 /// Check whether the only remaining action in the queue is a close action.
313 ///
314 /// If so, pass the subscription to the action and let it take over.
315 void _checkClose() {
316 // Close-actions are executed immediately when they become the
317 // next (and last) event in the queue.
318 // When _isClosed and the queue is not empty, the last element
319 // of the queue is the close action.
320 if (_isClosed && _requestQueue.length == 1) {
321 _EventCloseAction action = _requestQueue.removeFirst();
322 StreamSubscription subscription = _subscription;
323 _setDone();
324 action.close(subscription);
325 }
326 }
327 }
328
329
330 /// Action to take when a requested event arrives.
331 abstract class _EventAction {
332 bool data(data);
333 void error(error, StackTrace stack);
334 void done();
335 }
336
337 /// Action to take when closing the [StreamEvents] class.
338 abstract class _EventCloseAction implements _EventAction {
339 void close(StreamSubscription subscription);
340 }
341
342
343 /// Action completing a [StreamEvents.next] request.
344 class _NextAction implements _EventAction {
345 Completer completer;
346 _NextAction(this.completer);
347
348 bool data(data) {
349 completer.complete(data);
350 return true;
351 }
352
353 void error(error, StackTrace stack) {
354 completer.completeError(error, stack);
355 }
356
357 void done() {
358 completer.completeError(new StateError("no elements"));
359 }
360 }
361
362 /// Action completing a [StreamEvents.skip] request.
363 class _SkipAction implements _EventAction {
364 Completer completer;
365 int count;
366 _SkipAction(this.completer, this.count);
367
368 bool data(data) {
369 count--;
370 if (count > 0) return false;
371 completer.complete(count);
372 return true;
373 }
374
375 void error(error, StackTrace stack) {
376 completer.completeError(error, stack);
377 }
378
379 void done() {
380 completer.complete(count);
381 }
382 }
383
384 /// Action completing a [StreamEvents.take] request.
385 class _TakeAction<T> implements _EventAction {
386 final Completer completer;
387 final int count;
388 List list = <T>[];
389 _TakeAction(this.completer, this.count);
390
391 bool data(data) {
392 list.add(data);
393 if (list.length < count) return false;
394 completer.complete(list);
395 return true;
396 }
397
398 void error(error, StackTrace stack) {
399 completer.completeError(error, stack);
400 }
401
402 void done() {
403 completer.complete(list);
404 }
405 }
406
407 /// Action completing a [StreamEvents.close] request.
408 class _CloseAction implements _EventCloseAction {
409 final Completer completer;
410
411 _CloseAction(this.completer);
412
413 bool data(data) {
414 throw new UnsupportedError("event");
415 }
416
417 void error(e, StackTrace stack) {
418 throw new UnsupportedError("event");
419 }
420
421 void done() {
422 completer.complete();
423 }
424
425 void close(StreamSubscription subscription) {
426 completer.complete(subscription.cancel());
427 }
428 }
429
430 /// Action completing a [StreamEvents.rest] request.
431 class _RestAction<T> implements _EventCloseAction {
432 final StreamCompleter delayStream;
433 _RestAction(this.delayStream);
434
435 bool data(data) {
436 throw new UnsupportedError("event");
437 }
438
439 void error(e, StackTrace stack) {
440 throw new UnsupportedError("event");
441 }
442
443 void done() {
444 delayStream.setEmpty();
445 }
446
447 void close(StreamSubscription subscription) {
448 delayStream.setSourceStream(new SubscriptionStream<T>(subscription));
449 }
450 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698