Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(306)

Side by Side Diff: lib/src/stream_events.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address remaining comments. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_events;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import "subscription_stream.dart";
11 import "stream_completer.dart";
12
13 /// An asynchronous pull-based interface for accessing stream events.
14 ///
15 /// Wraps a stream and makes individual events available on request.
16 ///
17 /// You can request (and reserve) one or more events from the stream,
18 /// and after all previous requestes have been fulfilled, stream events
19 /// go towards fulfilling your request.
20 ///
21 /// For example, if you ask for [next] two times, the returned futures
22 /// will be completed by the next two unreserved events from the stream.
23 ///
24 /// The stream subscription is paused when there are no active
25 /// requests.
26 ///
27 /// Some streams, including broadcast streams, will buffer
28 /// events while paused, so waiting too long between requests may
29 /// cause memory bloat somewhere else.
30 ///
31 /// The individual requests are served in the order they are requested,
32 /// and the stream subscription is paused when there are no active requests.
33 ///
34 /// This is similar to, but more convenient than, a [StreamIterator].
35 /// A `StreamIterator` requires you to manually check when a new event is
36 /// available and you can only access the value of that event until you
37 /// check for the next one. A `StreamEvents` allows you to request, for example,
38 /// three events at a time, either individually, as a group using [take]
39 /// or [skip], or in any combination.
40 ///
41 /// You can also ask to have the [rest] of the stream provided as
42 /// a new stream. This allows, for example, taking the first event
43 /// out of a stream and continue using the rest of the stream as a stream.
44 ///
45 /// Example:
46 ///
47 /// var events = new StreamEvents<String>(someStreamOfLines);
48 /// var first = await events.next;
49 /// while (first.startsWith('#')) {
50 /// // Skip comments.
51 /// first = await events.next;
52 /// }
53 ///
54 /// if (first.startsWith(MAGIC_MARKER)) {
55 /// var headerCount =
56 /// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
57 /// handleMessage(headers: await events.take(headerCount),
58 /// body: events.rest);
59 /// return;
60 /// }
61 /// // Error handling.
62 ///
63 /// When you need no further events the `StreamEvents` should be closed
64 /// using [cancel]. This releases the underlying stream subscription.
65 ///
66 /// The underlying stream subscription is paused when there
67 /// are no requeusts. Some subscriptions, including those of broadcast streams,
68 /// will still buffer events while paused. Creating a `StreamEvents` from
69 /// such a stream and stopping to request events, will cause memory to fill up
70 /// unnecessarily.
71 class StreamEvents<T> {
72 /// The initial state, where the stream has not been listened to yet.
73 ///
74 /// It will be listened to when the first event is requested.
75 /// The `stateData` field holds the stream and the request queue is empty.
76 static const int _stateInitial = 0;
77
78 /// Listening on a stream that hasn't completed yet.
79 ///
80 /// If the request queue is empty, the subscription is paused.
81 /// The `stateData` field holds the active subscription.
82 static const int _stateListening = 1;
83
84 /// The stream has completed.
85 ///
86 /// The `stateData` field is `null` and the request queue is empty.
87 static const int _stateDone = 2;
88
89 /// Flag set when [cancel] is called.
90 /// The `StreamEvents` is closed and no further events can be requested.
91 static const int _stateClosed = 4;
92
93 /// Flag combined with [_stateClosed] to say it was closed using [rest].
94 /// Only used for error reporting purposes.
95 static const int _restFlag = 8;
96
97 /// Flag set when [rest] is called.
98 /// Only used for error reporting, otherwise equivalent to [_stateClosed].
99 static const int _stateClosedRest = _stateClosed | _restFlag;
100
101 /// Current state.
102 ///
103 /// Use getters below to check if the state is [_isListening] or [_isDone],
104 /// and whether the stream events object [_isClosed].
105 int _state = _stateInitial;
106
107 /// Value depending on state. Use getters below to get the value and assert
108 /// the expected state.
109 var _stateData;
110
111 /// Queue of pending requests while state is [_stateListening].
112 /// Access through methods below to ensure consistency.
113 final Queue<_EventRequest> _requestQueue = new Queue();
114
115 StreamEvents(Stream source) : _stateData = source;
116
117 /// Whether we are currently listening on a stream subscription.
118 bool get _isListening => (_state & _stateListening) != 0;
119
120 /// Whether the underlying stream is done.
121 ///
122 /// This may return true before all events have been delivered.
123 /// Requesting a new event when [_isDone] returns true,
124 /// for example using [next], will always fail.
125 bool get _isDone => (_state & _stateDone) != 0;
126
127 /// Whether the stream events has been closed.
128 ///
129 /// While closed, no further requests can be made.
130 bool get _isClosed => (_state & _stateClosed) != 0;
131
132 /// Returns the stream subscription while in a listening state.
133 StreamSubscription get _subscription {
134 assert(_isListening);
135 return _stateData;
136 }
137
138 /// Returns the source stream while in the initial state.
139 Stream get _sourceStream {
140 assert(!_isListening);
141 assert(!_isDone);
142 return _stateData;
143 }
144
145 /// Sets the subscription and transitions to listening state.
146 void _setListening(StreamSubscription subscription) {
147 assert(!_isListening);
148 assert(!_isDone);
149 _stateData = subscription;
150 _state |= _stateListening;
151 }
152
153 /// Transitions to the done state.
154 ///
155 /// The stream is done, and no further events will arrive.
156 void _setDone() {
157 assert(!_isDone);
158 _state = (_state & _stateClosedRest) | _stateDone;
159 _stateData = null;
160 }
161
162 /// Request the next (yet unrequested) event from the stream.
163 ///
164 /// When the requested event arrives, the returned future is completed with
165 /// the event.
166 /// If the event is a data event, the returned future completes
167 /// with its value.
168 /// If the event is an error event, the returned future completes with
169 /// its error and stack trace.
170 /// If the stream closes before an event arrives, the returned future
171 /// completes with a [StateError].
172 ///
173 /// It's possible to have several pending [next] calls (or other requests),
174 /// and they will be completed in the order they were requested, by the
175 /// first events that were not used by previous requeusts.
176 Future<T> get next {
177 if (!_isClosed) {
178 _NextRequest nextRequest = new _NextRequest<T>();
179 _addRequest(nextRequest);
180 return nextRequest.future;
181 }
182 throw _failClosed();
183 }
184
185 /// Returns a stream of all the remaning events of the source stream.
186 ///
187 /// All requested [next], [skip] or [take] operations are completed
188 /// first, and then any remaining events are provided as events of
189 /// the returned stream.
190 ///
191 /// Using `rest` closes the stream events object. After getting the
192 /// `rest` the caller may no longer request other events, like
193 /// after calling [cancel].
194 Stream<T> get rest {
195 if (_isClosed) {
196 throw _failClosed();
197 }
198 _state |= _stateClosedRest;
199 if (_isListening) {
200 // We have an active subscription that we want to take over.
201 var request = new _RestRequest<T>(this);
202 _addRequest(request);
203 return request.stream;
204 }
205 assert(_requestQueue.isEmpty);
206 if (_isDone) {
207 // TODO(lrn): Add Stream.empty() constructor.
208 return new Stream<T>.fromIterable(const []);
209 }
210 // We have never listened to the source stream,
211 // so just return that directly.
212 Stream result = _sourceStream;
213 _setDone();
214 return result;
215 }
216
217 /// Skips the next [count] *data* events.
218 ///
219 /// The [count] must be non-negative.
220 ///
221 /// When successful, this is equivalent to using [take]
222 /// and ignoring the result.
223 ///
224 /// If an error occurs before `count` data events have been skipped,
225 /// the returned future completes with that error instead.
226 ///
227 /// If the stream closes before `count` data events,
228 /// the remaining unskipped event count is returned.
229 /// If the returned future completes with the integer `0`,
230 /// then all events were succssfully skipped. If the value
231 /// is greater than zero then the stream ended early.
232 Future<int> skip(int count) {
233 if (count < 0) throw new RangeError.range(count, 0, null, "count");
234 if (!_isClosed) {
235 var request = new _SkipRequest(count);
236 _addRequest(request);
237 return request.future;
238 }
239 throw _failClosed();
240 }
241
242 /// Requests the next [count] data events as a list.
243 ///
244 /// The [count] must be non-negative.
245 ///
246 /// Equivalent to calling [next] `count` times and
247 /// storing the data values in a list.
248 ///
249 /// If an error occurs before `count` data events has
250 /// been collected, the returned future completes with
251 /// that error instead.
252 ///
253 /// If the stream closes before `count` data events,
254 /// the returned future completes with the list
255 /// of data collected so far. That is, the returned
256 /// list may have fewer than [count] elements.
257 Future<List<T>> take(int count) {
258 if (count < 0) throw new RangeError.range(count, 0, null, "count");
259 if (!_isClosed) {
260 var request = new _TakeRequest<T>(count);
261 _addRequest(request);
262 return request.future;
263 }
264 throw _failClosed();
265 }
266
267 /// Cancels the underlying stream subscription.
268 ///
269 /// The cancel operation waits until all previously requested
270 /// events have been processed, then it cancels the subscription
271 /// providing the events.
272 ///
273 /// The returned future completes with the result of calling
274 /// `cancel`.
275 ///
276 /// After calling `cancel`, no further events can be requested.
277 /// None of [next], [rest], [skip], [take] or [cancel] may be
278 /// called again.
279 Future cancel() {
280 if (!_isClosed) {
281 _state |= _stateClosed;
282 if (!_isListening) {
283 // The request queue is only non-empty while we are listening.
284 assert(_requestQueue.isEmpty);
285 if (!_isDone) _setDone();
286 return new Future.value();
287 }
288 var request = new _CancelRequest(this);
289 _addRequest(request);
290 return request.future;
291 }
292 throw _failClosed();
293 }
294
295 /// Returns an error for when a request is made after cancel.
296 ///
297 /// Returns a [StateError] with a message saying that either
298 /// [cancel] or [rest] have already been called.
299 Error _failClosed() {
300 String cause =
301 ((_state & _stateClosedRest) == _stateClosedRest) ? "rest" : "cancel";
302 return new StateError("Already cancelled by a call to $cause");
303 }
304
305 // Callbacks receiving the events of the source stream.
306
307 void _onData(T data) {
308 assert(_requestQueue.isNotEmpty);
309 _EventRequest request = _nextRequest;
310 request.add(data);
311 _checkCompleted();
312 }
313
314 void _onError(error, StackTrace stack) {
315 assert(_requestQueue.isNotEmpty);
316 _EventRequest request = _nextRequest;
317 request.addError(error, stack);
318 _checkCompleted();
319 }
320
321 void _onDone() {
322 _setDone();
323 _closeAllRequests();
324 }
325
326 // Request queue management.
327
328 /// Returns the next request in the queue, but don't remove it.
329 _EventRequest get _nextRequest {
330 return _requestQueue.first;
331 }
332
333 /// Add a new request to the queue.
334 void _addRequest(_EventRequest request) {
335 if (_isDone) {
336 request.close();
337 return;
338 }
339 if (_requestQueue.isEmpty) {
340 if (request.isComplete) {
341 // Some requests are complete without receiving any events.
342 // This includes [cancel] and [rest] requests, as well as
343 // [take] and [skip] events with zero count.
344
345 // We can skip listening and jsut complete the request immediately.
346 request.close();
347 return;
348 }
349 // Continue listening on the source stream.
350 // The source stream is paused while the requeust queue is empty,
351 // except at the beginning when it hasn't been listened to at all.
352 if (_isListening) {
353 _subscription.resume();
354 } else if (!_isDone) {
355 _setListening(
356 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone));
357 }
358 }
359 _requestQueue.add(request);
360 }
361
362 /// Remove all requests and close them.
363 ///
364 /// Used when the source stream is done.
365 void _closeAllRequests() {
366 assert(_isDone);
367 while (_requestQueue.isNotEmpty) {
368 _requestQueue.removeFirst().close();
369 }
370 }
371
372 /// Check whether the next requests in the queue are complete.
373 ///
374 /// If so, remove them and call their `close` method.
375 void _checkCompleted() {
376 // Close-actions are executed immediately when they become the
377 // next (and last) event in the queue.
378 // When _isClosed and the queue is not empty, the last element
379 // of the queue is the close action.
380 while (_requestQueue.isNotEmpty) {
381 if (!_requestQueue.first.isComplete) {
382 return;
383 }
384 _requestQueue.removeFirst().close();
385 }
386 assert(_requestQueue.isEmpty);
387 if (!_isDone) {
388 // Pause the underlying subscription.
389 // Won't get here without adding a request, so we must be listening
390 // already.
391 assert(_isListening);
392 _subscription.pause();
393 }
394 }
395
396 /// Extracts the subscription and makes the events object unusable.
397 ///
398 /// Can only be used by the very last request.
399 StreamSubscription _dispose() {
400 assert(_isClosed);
401 assert(_isListening);
402 assert(_requestQueue.isEmpty);
403 StreamSubscription subscription = _subscription;
404 _setDone();
405 return subscription;
406 }
407 }
408
409 /// Request object that receives events when they arrive, until fulfilled.
410 ///
411 /// Each request that cannot be fulfilled immediately is represented by
412 /// an `_EventRequest` object in the request queue.
413 ///
414 /// Events from the source stream are sent to the first request in the
415 /// queue until it reports itself as [isComplete].
416 ///
417 /// When the first request in the queue `isComplete`, either when becoming
418 /// the first request or after receiving an event, its [close] methods is
419 /// called.
420 ///
421 /// The [close] method is also called immediately when the source stream
422 /// is done.
423 abstract class _EventRequest implements EventSink {
424 /// Handle a data event.
425 void add(data);
426
427 /// Handle an error event.
428 void addError(error, [StackTrace stackTrace]);
429
430 /// Complete the request.
431 ///
432 /// This may be called either when [isComplete] returns true,
433 /// or if the source stream is done.
434 void close();
435
436 /// Whether the request considers itself fulfilled.
437 ///
438 /// This is checked whenever a request becomes the first request
439 /// in the request queue, and after it receives an event.
440 ///
441 /// When a request is complete, its [close] method is called and
442 /// it's removed from the request queue.
443 bool get isComplete;
444 }
445
446 /// Request for a [StreamEvents.next] call.
447 ///
448 /// Completes the returned future when receiving the first event,
449 /// and is then complete.
450 class _NextRequest<T> implements _EventRequest {
451 /// Completer for the future returned by [StreamEvents.next].
452 ///
453 /// Set to `null` when it is completed, to mark it as already complete.
454 final Completer _completer;
455
456 _NextRequest() : _completer = new Completer<T>();
457
458 Future<T> get future => _completer.future;
459
460 void add(data) {
461 _completer.complete(data);
462 }
463
464 void addError(error, [StackTrace stack]) {
465 _completer.completeError(error, stack);
466 }
467
468 void close() {
469 if (!_completer.isCompleted) {
470 _completer.completeError(new StateError("no elements"));
471 }
472 }
473
474 bool get isComplete => _completer.isCompleted;
475 }
476
477 /// Request for a [StreamEvents.skip] call.
478 class _SkipRequest implements _EventRequest {
479 /// Completer for the future returned by the skip call.
480 final Completer _completer = new Completer<int>();
481
482 /// Number of remaining events to skip.
483 ///
484 /// The request [isComplete] when the values reaches zero.
485 ///
486 /// Decremented when an event is seen.
487 /// Set to zero when an error is seen since errors abort the skip request.
488 int _eventsToSkip;
489
490 _SkipRequest(this._eventsToSkip);
491
492 /// The future completed when the correct number of events have been skipped.
493 Future get future => _completer.future;
494
495 void add(data) {
496 assert(_eventsToSkip > 0);
497 _eventsToSkip--;
498 }
499
500 void addError(error, [StackTrace stackTrace]) {
501 _eventsToSkip = 0;
502 _completer.completeError(error, stackTrace);
503 }
504
505 void close() {
506 if (!_completer.isCompleted) {
507 _completer.complete(_eventsToSkip);
508 }
509 }
510
511 bool get isComplete => _eventsToSkip == 0;
512 }
513
514 /// Request for a [StreamEvents.take] call.
515 class _TakeRequest<T> implements _EventRequest {
516 /// Completer for the future returned by the take call.
517 final Completer _completer;
518
519 /// List collecting events until enough have been seen.
520 final List _list = <T>[];
521
522 /// Number of events to capture.
523 ///
524 /// The request [isComplete] when the length of [_list] reaches
525 /// this value.
526 final int _eventsToTake;
527
528 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
529
530 /// The future completed when the correct number of events have been captured.
531 Future get future => _completer.future;
532
533 void add(data) {
534 _list.add(data);
535 }
536
537 void addError(error, [StackTrace stack]) {
538 _completer.completeError(error, stack);
539 }
540
541 void close() {
542 if (!_completer.isCompleted) {
543 _completer.complete(_list);
544 }
545 }
546
547 bool get isComplete =>
548 _list.length == _eventsToTake || _completer.isCompleted;
549 }
550
551 /// Request for a [StreamEvents.cancel] call.
552 ///
553 /// The request is always complete, it just waits in the request queue
554 /// until all previous events are fulfilled, then it cancels the stream events
555 /// subscription.
556 class _CancelRequest implements _EventRequest {
557 /// Completer for the future returned by the `cancel` call.
558 final Completer _completer = new Completer();
559
560 /// The [StreamEvents] object that has this request queued.
561 ///
562 /// When the event is completed, it needs to cancel the active subscription
563 /// of the `StreamEvents` object, if any.
564 final StreamEvents _events;
565
566 _CancelRequest(this._events);
567
568 /// The future completed when the cancel request is completed.
569 Future get future => _completer.future;
570
571 void add(data) {
572 assert(false); // Unreachable.
573 }
574
575 void addError(error, [StackTrace stack]) {
576 assert(false); // Unreachable.
577 }
578
579 void close() {
580 if (_events._isListening) {
581 _completer.complete(_events._dispose().cancel());
582 } else {
583 _completer.complete();
584 }
585 }
586
587 bool get isComplete => true;
588 }
589
590 /// Request for a [StreamEvents.rest] call.
591 ///
592 /// The request is always complete, it just waits in the request queue
593 /// until all previous events are fulfilled, then it takes over the
594 /// stream events subscription and creates a stream from it.
595 class _RestRequest<T> implements _EventRequest {
596 /// Completer for the stream returned by the `rest` call.
597 final StreamCompleter _completer;
598
599 /// The [StreamEvents] object that has this request queued.
600 ///
601 /// When the event is completed, it needs to cancel the active subscription
602 /// of the `StreamEvents` object, if any.
603 final StreamEvents _events;
604 _RestRequest(this._events) : _completer = new StreamCompleter<T>();
605
606 /// The future which will contain the remaining events of [_events].
607 Stream<T> get stream => _completer.stream;
608
609 void add(data) {
610 assert(false); // Unreachable.
611 }
612
613 void addError(error, [StackTrace stack]) {
614 assert(false); // Unreachable.
615 }
616
617 void close() {
618 if (_events._isListening) {
619 StreamSubscription subscription = _events._dispose();
620 _completer.setSourceStream(new SubscriptionStream<T>(subscription));
621 if (subscription.isPaused) subscription.resume();
622 } else {
623 assert(_events._isDone);
624 _completer.setEmpty();
625 }
626 }
627
628 bool get isComplete => true;
629 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698