Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Unified Diff: lib/src/stream_events.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: more docs and tests. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_events.dart
diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart
new file mode 100644
index 0000000000000000000000000000000000000000..848f6318511c9ecff5cf4de8e0b38a815536316f
--- /dev/null
+++ b/lib/src/stream_events.dart
@@ -0,0 +1,490 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.streams.stream_events;
+
+import 'dart:async';
+import 'dart:collection';
+
+import "subscription_stream.dart";
+import "stream_completer.dart";
+
+/// An asynchronous pull-based interface for accessing stream events.
+///
+/// Wraps a stream and makes individual events available on request.
+///
+/// The individual requests are served in the order they are requested.
nweiz 2015/06/12 01:24:25 Explain how this is different from a [StreamIterat
Lasse Reichstein Nielsen 2015/06/17 11:08:29 Done.
+///
+/// Example:
+///
+/// var events = new StreamEvents<String>(someStreamOfLines);
+/// var first = await events.next;
+/// while (first.startsWith('#')) {
Søren Gjesse 2015/06/11 07:57:06 Maybe add an isDone check in this sample.
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Hmm. Not good for the flow, but I guess I can try
+/// // Skip comments.
+/// first = await events.next;
+/// }
+///
+/// if (first.startsWith(MAGIC_MARKER)) {
+/// var headerCount =
+/// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
+/// handleMessage(headers: await events.take(headerCount),
+/// body: events.rest);
+/// return;
+/// }
+/// // Error handling.
+///
+class StreamEvents<T> {
nweiz 2015/06/12 01:24:26 Plural class names always seem weird to me. What d
Lasse Reichstein Nielsen 2015/06/12 13:04:22 It's not a stream, so "PullStream" won't work. The
nweiz 2015/06/16 01:05:23 Bob suggested "StreamPump", using the water analog
Lasse Reichstein Nielsen 2015/06/16 13:05:45 I'm not a great fan of analogies in nameing - it o
nweiz 2015/06/16 22:34:11 All names in programming are analogies at some lev
Lasse Reichstein Nielsen 2015/06/17 11:08:29 I'm trying out StreamQueue now. It is slightly irk
+ /// In the initial state, the stream has not been listened to yet.
nweiz 2015/06/12 01:24:25 Nit: The first paragraph of a doc comment should b
+ /// It will be listened to when the first event is requested.
+ /// The `stateData` field holds the stream and the request queue is empty.
+ static const int _INITIAL = 0;
nweiz 2015/06/12 01:24:25 Nit: since these fields aren't right up next to on
Lasse Reichstein Nielsen 2015/06/12 13:04:22 In my editor it still works very well to show the
+
+ /// Listening on the stream.
+ /// If the request queue is empty and the subscription isn't done,
+ /// the subscription is paused.
+ /// The `stateData` field holds the subscription.
+ static const int _LISTENING = 1;
+
+ /// The stream has completed.
+ /// The `stateData` field is `null` and the request queue is empty.
+ static const int _DONE = 2;
+
Søren Gjesse 2015/06/11 07:57:06 Maybe put an additional comment that 4 is reserved
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Will do.
+ /// Flag set when [close] is called.
+ /// The `StreamEvents` is closed and no further events can be requested.
+ /// The last request in the queue, if any,
+ /// is completed and is witing to clean up.
nweiz 2015/06/12 01:24:24 "witing" -> "waiting"
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
+ static const int _CLOSED = 8;
+
+ /// Flag set when [rest] is called.
+ /// Only used for error reporting, otherwise equivalent to [_CLOSED].
+ static const int _CLOSED_REST = 12;
+
+ /// Current state.
+ ///
+ /// Use getters below to check if the state is [_isListening] or [_isDone],
+ /// and whether the stream events object [_isClosed].
+ int _state = _INITIAL;
+
+ /// Value depending on state. Use getters below to get the value and assert
+ /// the expected state.
+ var _stateData;
+
+ /// Queue of pending requests while state is [_LISTENING].
+ /// Access through methods below to ensure consistency.
+ Queue<_EventRequest> _requestQueue = new Queue();
nweiz 2015/06/12 01:24:25 Make this final. Personally, I prefer to omit the
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Made final. I prefer type annotations on all field
+
+ StreamEvents(Stream source) : _stateData = source;
+
+ bool get _isListening => (_state & _LISTENING) != 0;
+ bool get _isClosed => (_state & _CLOSED) != 0;
+ bool get _isDone => (_state & _DONE) != 0;
+
+ /// Whether the underlying stream is spent.
nweiz 2015/06/12 01:24:24 "spent" -> "done" Just to use consistent terminol
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ ///
+ /// This may return true before all events have been delivered.
+ /// Requesting a new event when [isDone] returns true,
+ /// for example using [next], will always fail.
+ bool get isDone => _isDone;
nweiz 2015/06/12 01:24:25 What's the use case for exposing this to the user?
Lasse Reichstein Nielsen 2015/06/12 13:04:23 It might be a little speculative because it doesn'
nweiz 2015/06/16 01:05:23 The version of this I wrote for ScheduledTest has
Lasse Reichstein Nielsen 2015/06/16 13:05:45 It doesn't solve the problem of what to do with th
nweiz 2015/06/16 22:34:11 It's unlikely that someone will call [hasNext] fol
+
+ /// Return the stream subscription while state is listening.
nweiz 2015/06/12 01:24:25 Nit: use present tense (e.g. "Returns" rather than
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ StreamSubscription get _subscription {
+ assert(_isListening);
+ return _stateData;
+ }
+
+ /// Return the source stream while state is initial.
+ Stream get _sourceStream {
+ assert(!_isListening);
+ assert(!_isDone);
+ return _stateData;
+ }
+
+ // Set the subscription and transition to listening state.
+ void set _subscription(StreamSubscription subscription) {
+ assert(!_isListening);
+ assert(!_isDone);
+ _stateData = subscription;
+ _state |= _LISTENING;
+ }
+
+ void _setDone() {
+ assert(!_isDone);
+ _state = (_state & _CLOSED_REST) | _DONE;
+ _stateData = null;
+ }
+
+ /// Request the next (yet unrequested) event from the stream.
nweiz 2015/06/12 01:24:26 Clarify in this documentation that it's valid to h
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
+ ///
+ /// When the requested event arrives, the returned future is completed with
+ /// the event. This is independent of whether the event is a data event or
+ /// an error event.
nweiz 2015/06/12 01:24:26 I'd write this as: "If the event is a value event,
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ ///
+ /// If the stream closed before an event arrives, the future is completed
nweiz 2015/06/12 01:24:24 "closed" -> "closes", "is completed" -> "completes
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ /// with a [StateError].
+ Future<T> get next {
+ if (!_isClosed) {
+ Completer completer = new Completer<T>();
+ _addRequest(new _NextRequest(completer));
+ return completer.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Request a stream of all the remaning events of the source stream.
nweiz 2015/06/12 01:24:25 "Request" -> "Returns"
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
+ ///
+ /// All requested [next], [skip] or [take] operations are completed
+ /// first, and then any remaining events are provided as events of
+ /// the returned stream.
+ ///
+ /// Using `rest` closes the stream events object. After getting the
+ /// `rest` it is no longer allowed to request other events, like
nweiz 2015/06/12 01:24:25 "it is no longer allowed to" -> "the caller may no
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ /// after calling [close].
+ Stream<T> get rest {
+ if (!_isClosed) {
nweiz 2015/06/12 01:24:25 Nit: Short-circuit if it's closed to save on inden
Lasse Reichstein Nielsen 2015/06/12 13:04:22 ACK. This is not going to be in any inner loops, s
+ _state |= _CLOSED_REST;
+ if (_isListening) {
+ // We have an active subscription that we want to take over.
+ var delayStream = new StreamCompleter<T>();
+ _addRequest(new _RestRequest<T>(delayStream, this));
+ return delayStream.stream;
+ }
+ assert(_requestQueue.isEmpty);
+ if (isDone) {
+ // TODO(lrn): Add Stream.empty() constructor.
+ return new Stream<T>.fromIterable(const []);
+ }
+ // We have never listened the source stream, so just return that directly.
nweiz 2015/06/12 01:24:26 "listened to"
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ Stream result = _sourceStream;
+ _setDone();
+ return result;
+ }
+ throw _failClosed();
+ }
+
+ /// Requests to skip the next [count] *data* events.
nweiz 2015/06/12 01:24:24 "Requests to skip" -> "Skips"
+ ///
+ /// The [count] value must be non-negative.
+ ///
+ /// When successful, this is equivalent to using [take]
+ /// and ignoring the result.
+ ///
+ /// If an error occurs before `count` data events has
nweiz 2015/06/12 01:24:25 "has" -> "have"
+ /// been skipped, the returned future completes with
+ /// that error instead.
+ ///
+ /// If the stream closes before `count` data events,
+ /// the remaining unskipped event count is returned.
+ /// If the returned future completes with the integer `0`,
+ /// then all events were succssfully skipped. If the value
+ /// is greater than zero then the stream ended early.
+ Future<int> skip(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ Completer completer = new Completer<int>();
+ _addRequest(new _SkipRequest(completer, count));
+ return completer.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Requests the next [count] data events as a list.
+ ///
+ /// The [count] value must be non-negative.
nweiz 2015/06/12 01:24:25 "The [count] value" -> "[count]"
Lasse Reichstein Nielsen 2015/06/12 13:04:21 -> "The [count]". Otherwise it would start the sen
+ ///
+ /// Equivalent to calling [next] `count` times and
+ /// storing the data values in a list.
+ ///
+ /// If an error occurs before `count` data events has
+ /// been collected, the returned future completes with
+ /// that error instead.
+ ///
+ /// If the stream closes before `count` data events,
+ /// the returned future completes with the list
+ /// of data collected so far. That is, the returned
+ /// list may have fewer than [count] elements.
+ Future<List<T>> take(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ Completer completer = new Completer<List<T>>();
+ _addRequest(new _TakeRequest(completer, count));
+ return completer.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Release the underlying stream subscription.
nweiz 2015/06/12 01:24:26 "Release" -> "Cancels"
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ ///
+ /// The close operation waits until all previously requested
+ /// events have been processed, then it cancels the subscription
+ /// providing the events.
+ ///
+ /// The returned future completes with the result of calling
+ /// `cancel`.
+ ///
+ /// After calling `close`, no further events can be requested.
+ /// None of [next], [rest], [skip], [take] or [close] may be
+ /// called again.
+ Future close() {
nweiz 2015/06/12 01:24:24 Consider calling this [cancel], to match [StreamSu
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Good idea!
+ if (!_isClosed) {
+ _state |= _CLOSED;
+ if (!_isListening) {
+ assert(_requestQueue.isEmpty);
nweiz 2015/06/12 01:24:24 Add a comment explaining why the request queue wil
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ if (!_isDone) _setDone();
+ return new Future.value();
+ }
+ Completer completer = new Completer();
+ _addRequest(new _CloseRequest(completer, this));
+ return completer.future;
+ }
+ throw _failClosed();
nweiz 2015/06/12 01:24:26 Everything in the core libraries that's closable a
Lasse Reichstein Nielsen 2015/06/12 13:04:22 True. One reason why I didn't allow that was that
+ }
+
+ /// Reused error message.
nweiz 2015/06/12 01:24:24 Expand on this.
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
+ Error _failClosed() {
+ String cause =
+ ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close";
+ return new StateError("Already closed by a call to $cause");
+ }
+
+ /// Called when requesting an event when the requeust queue is empty.
+ /// The underlying subscription is paused in that case, except the very
+ /// first time when the subscription needs to be created.
+ void _listen() {
nweiz 2015/06/12 01:24:24 Methods like this and [_pause] that are only calle
Lasse Reichstein Nielsen 2015/06/12 13:04:23 Done.
+ if (_isListening) {
+ _subscription.resume();
+ } else if (!_isDone) {
+ _subscription =
+ _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
+ }
+ }
+
+ /// Pauses the underlying subscription.
+ /// Called when the request queue is empty and the subscription isn't closed.
+ void _pause() {
+ assert(_isListening);
+ _subscription.pause();
+ }
+
+ // Callbacks receiving the events of the source stream.
nweiz 2015/06/12 01:24:25 Nit: If this is referring to a group of methods, a
Lasse Reichstein Nielsen 2015/06/12 13:04:21 Done.
+ void _onData(T data) {
+ assert(_requestQueue.isNotEmpty);
+ _EventRequest action = _nextAction;
+ action.add(data);
+ _checkCompleted();
+ }
+
+ void _onError(error, StackTrace stack) {
+ assert(_requestQueue.isNotEmpty);
+ _EventRequest action = _nextAction;
+ action.addError(error, stack);
+ _checkCompleted();
+ }
+
+ void _onDone() {
+ _setDone();
+ _closeAllRequests();
+ }
+
+ // Request queue management.
+
+ /// Get the next action in the queue, but don't remove it.
+ _EventRequest get _nextAction {
+ assert(_requestQueue.isNotEmpty);
nweiz 2015/06/12 01:24:24 This assertion seems redundant, since the line bel
Lasse Reichstein Nielsen 2015/06/12 13:04:22 True.
+ return _requestQueue.first;
+ }
+
+ /// Add a new request to the queue.
+ void _addRequest(_EventRequest action) {
+ if (_isDone) {
+ action.close();
+ return;
+ }
+ if (_requestQueue.isEmpty) {
+ if (action.isComplete) {
nweiz 2015/06/12 01:24:24 Explain under what circumstances an action that wa
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Done.
+ // Skip listening and complete this immediately.
+ action.close();
+ return;
+ }
+ _listen();
+ }
+ _requestQueue.add(action);
+ }
+
+ /// Remove all requests and call their `done` event.
+ ///
+ /// Used when the source stream dries up.
+ void _closeAllRequests() {
+ assert(_isDone);
+ while (_requestQueue.isNotEmpty) {
+ _requestQueue.removeFirst().close();
+ }
+ }
+
+ /// Check whether the next actions in the queue are complete.
+ ///
+ /// If so, remove them and call their `complete` method.
+ void _checkCompleted() {
+ // Close-actions are executed immediately when they become the
+ // next (and last) event in the queue.
+ // When _isClosed and the queue is not empty, the last element
+ // of the queue is the close action.
+ while (_requestQueue.isNotEmpty) {
+ if (!_requestQueue.first.isComplete) {
+ return;
+ }
+ _requestQueue.removeFirst().close();
+ }
+ assert(_requestQueue.isEmpty);
+ if (!_isDone) _pause();
+ }
+
+ /// Extracts the subscription and makes the events object unusable.
+ ///
+ /// Can only be used by the very last request.
+ StreamSubscription _dispose() {
+ assert(_isClosed);
+ assert(_isListening);
+ assert(_requestQueue.isEmpty);
+ StreamSubscription subscription = _subscription;
+ _setDone();
+ return subscription;
+ }
+}
+
nweiz 2015/06/12 01:24:25 Nit: extra newline.
+
+/// Action to take when a requested event arrives.
+abstract class _EventRequest implements EventSink {
nweiz 2015/06/12 01:24:25 It's kind of weird that the documentation for thes
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Agree, I switched to "request" halfway through, an
+ bool get isComplete;
Søren Gjesse 2015/06/11 07:57:06 Add close method here as well I think the 'close'
nweiz 2015/06/12 01:24:25 This should also have [add] and [addError] abstrac
Lasse Reichstein Nielsen 2015/06/12 13:04:22 It's inherited from EventSink. I picked EventSink
+}
+
+/// Action completing a [StreamEvents.next] request.
+class _NextRequest implements _EventRequest {
+ Completer completer;
nweiz 2015/06/12 01:24:25 Even though it doesn't do anything at the language
+ _NextRequest(this.completer);
nweiz 2015/06/12 01:24:24 Rather than passing in a completer, consider makin
Lasse Reichstein Nielsen 2015/06/12 13:04:23 Good idea. Will do. The type parameter needs to go
+
+ void add(data) {
+ completer.complete(data);
+ completer = null;
nweiz 2015/06/12 01:24:24 Does nulling this out buy you anything? It seems l
Lasse Reichstein Nielsen 2015/06/12 13:04:23 We have an isCompleted getter? Whoa! I'm too used
+ }
+
+ void addError(error, [StackTrace stack]) {
+ completer.completeError(error, stack);
+ completer = null;
+ }
+
+ void close() {
+ if (!isComplete) {
+ completer.completeError(new StateError("no elements"));
nweiz 2015/06/12 01:24:25 If you do continue to check against null, null out
Lasse Reichstein Nielsen 2015/06/12 13:04:22 Technically not necessary since "close" is always
+ }
+ }
+
+ bool get isComplete => completer == null;
+}
+
+/// Action completing a [StreamEvents.skip] request.
+class _SkipRequest implements _EventRequest {
+ final Completer completer;
+ int count;
nweiz 2015/06/12 01:24:24 Document this. Also consider calling it something
Lasse Reichstein Nielsen 2015/06/15 15:46:23 Done.
+ _SkipRequest(this.completer, this.count);
+
+ void add(data) {
+ count--;
+ }
+
+ void addError(error, [StackTrace stack]) {
+ completer.completeError(error, stack);
+ count = 0;
+ }
+
+ void close() {
+ if (!completer.isCompleted) {
+ completer.complete(count);
+ count = 0;
+ }
+ }
+
+ bool get isComplete {
+ return count == 0;
+ }
+}
+
+/// Action completing a [StreamEvents.take] request.
+class _TakeRequest<T> implements _EventRequest {
+ final Completer completer;
+ final List list = <T>[];
+ int count;
+ _TakeRequest(this.completer, this.count);
+
+ void add(data) {
+ list.add(data);
+ count--;
+ }
+
+ void addError(error, [StackTrace stack]) {
+ completer.completeError(error, stack);
+ count = 0;
+ }
+
+ void close() {
+ if (!completer.isCompleted) {
+ completer.complete(list);
+ }
+ }
+
+ bool get isComplete => count == 0;
+}
+
+/// Action completing a [StreamEvents.close] request.
+class _CloseRequest implements _EventRequest {
+ final Completer completer;
+ StreamEvents events;
+
+ _CloseRequest(this.completer, this.events);
+
+ void add(data) {
+ throw new UnsupportedError("event");
+ }
+
+ void addError(error, [StackTrace stack]) {
+ throw new UnsupportedError("event");
nweiz 2015/06/12 01:24:24 Shouldn't this be "error"?
Lasse Reichstein Nielsen 2015/06/12 13:04:22 No, we don't support receiving events, that's the
+ }
+
+ void close() {
+ if (events._isListening) {
+ completer.complete(events._dispose().cancel());
+ } else {
+ completer.complete();
+ }
+ }
+
+ bool get isComplete => true;
+}
+
+/// Action completing a [StreamEvents.rest] request.
+class _RestRequest<T> implements _EventRequest {
+ final StreamCompleter delayStream;
+ final StreamEvents events;
+ _RestRequest(this.delayStream, this.events);
+
+ void add(data) {
+ throw new UnsupportedError("event");
+ }
+
+ void addError(error, [StackTrace stack]) {
+ throw new UnsupportedError("event");
nweiz 2015/06/12 01:24:26 Ditto.
Lasse Reichstein Nielsen 2015/06/15 15:46:23 Done.
+ }
+
+ void close() {
+ if (events._isListening) {
+ StreamSubscription subscription = events._dispose();
+ delayStream.setSourceStream(new SubscriptionStream<T>(subscription));
+ if (subscription.isPaused) subscription.resume();
+ } else {
+ assert(events._isDone);
+ delayStream.setEmpty();
+ }
+ }
+
+ bool get isComplete => true;
+}

Powered by Google App Engine
This is Rietveld 408576698