Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1409)

Unified Diff: lib/src/stream_events.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Address remaining comments. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_events.dart
diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart
new file mode 100644
index 0000000000000000000000000000000000000000..7f6bcd6904b41c302666706f5b954c31f41a1d0e
--- /dev/null
+++ b/lib/src/stream_events.dart
@@ -0,0 +1,629 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_events;
+
+import 'dart:async';
+import 'dart:collection';
+
+import "subscription_stream.dart";
+import "stream_completer.dart";
+
+/// An asynchronous pull-based interface for accessing stream events.
+///
+/// Wraps a stream and makes individual events available on request.
+///
+/// You can request (and reserve) one or more events from the stream,
+/// and after all previous requestes have been fulfilled, stream events
+/// go towards fulfilling your request.
+///
+/// For example, if you ask for [next] two times, the returned futures
+/// will be completed by the next two unreserved events from the stream.
+///
+/// The stream subscription is paused when there are no active
+/// requests.
+///
+/// Some streams, including broadcast streams, will buffer
+/// events while paused, so waiting too long between requests may
+/// cause memory bloat somewhere else.
+///
+/// The individual requests are served in the order they are requested,
+/// and the stream subscription is paused when there are no active requests.
+///
+/// This is similar to, but more convenient than, a [StreamIterator].
+/// A `StreamIterator` requires you to manually check when a new event is
+/// available and you can only access the value of that event until you
+/// check for the next one. A `StreamEvents` allows you to request, for example,
+/// three events at a time, either individually, as a group using [take]
+/// or [skip], or in any combination.
+///
+/// You can also ask to have the [rest] of the stream provided as
+/// a new stream. This allows, for example, taking the first event
+/// out of a stream and continue using the rest of the stream as a stream.
+///
+/// Example:
+///
+/// var events = new StreamEvents<String>(someStreamOfLines);
+/// var first = await events.next;
+/// while (first.startsWith('#')) {
+/// // Skip comments.
+/// first = await events.next;
+/// }
+///
+/// if (first.startsWith(MAGIC_MARKER)) {
+/// var headerCount =
+/// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
+/// handleMessage(headers: await events.take(headerCount),
+/// body: events.rest);
+/// return;
+/// }
+/// // Error handling.
+///
+/// When you need no further events the `StreamEvents` should be closed
+/// using [cancel]. This releases the underlying stream subscription.
+///
+/// The underlying stream subscription is paused when there
+/// are no requeusts. Some subscriptions, including those of broadcast streams,
+/// will still buffer events while paused. Creating a `StreamEvents` from
+/// such a stream and stopping to request events, will cause memory to fill up
+/// unnecessarily.
+class StreamEvents<T> {
+ /// The initial state, where the stream has not been listened to yet.
+ ///
+ /// It will be listened to when the first event is requested.
+ /// The `stateData` field holds the stream and the request queue is empty.
+ static const int _stateInitial = 0;
+
+ /// Listening on a stream that hasn't completed yet.
+ ///
+ /// If the request queue is empty, the subscription is paused.
+ /// The `stateData` field holds the active subscription.
+ static const int _stateListening = 1;
+
+ /// The stream has completed.
+ ///
+ /// The `stateData` field is `null` and the request queue is empty.
+ static const int _stateDone = 2;
+
+ /// Flag set when [cancel] is called.
+ /// The `StreamEvents` is closed and no further events can be requested.
+ static const int _stateClosed = 4;
+
+ /// Flag combined with [_stateClosed] to say it was closed using [rest].
+ /// Only used for error reporting purposes.
+ static const int _restFlag = 8;
+
+ /// Flag set when [rest] is called.
+ /// Only used for error reporting, otherwise equivalent to [_stateClosed].
+ static const int _stateClosedRest = _stateClosed | _restFlag;
+
+ /// Current state.
+ ///
+ /// Use getters below to check if the state is [_isListening] or [_isDone],
+ /// and whether the stream events object [_isClosed].
+ int _state = _stateInitial;
+
+ /// Value depending on state. Use getters below to get the value and assert
+ /// the expected state.
+ var _stateData;
+
+ /// Queue of pending requests while state is [_stateListening].
+ /// Access through methods below to ensure consistency.
+ final Queue<_EventRequest> _requestQueue = new Queue();
+
+ StreamEvents(Stream source) : _stateData = source;
+
+ /// Whether we are currently listening on a stream subscription.
+ bool get _isListening => (_state & _stateListening) != 0;
+
+ /// Whether the underlying stream is done.
+ ///
+ /// This may return true before all events have been delivered.
+ /// Requesting a new event when [_isDone] returns true,
+ /// for example using [next], will always fail.
+ bool get _isDone => (_state & _stateDone) != 0;
+
+ /// Whether the stream events has been closed.
+ ///
+ /// While closed, no further requests can be made.
+ bool get _isClosed => (_state & _stateClosed) != 0;
+
+ /// Returns the stream subscription while in a listening state.
+ StreamSubscription get _subscription {
+ assert(_isListening);
+ return _stateData;
+ }
+
+ /// Returns the source stream while in the initial state.
+ Stream get _sourceStream {
+ assert(!_isListening);
+ assert(!_isDone);
+ return _stateData;
+ }
+
+ /// Sets the subscription and transitions to listening state.
+ void _setListening(StreamSubscription subscription) {
+ assert(!_isListening);
+ assert(!_isDone);
+ _stateData = subscription;
+ _state |= _stateListening;
+ }
+
+ /// Transitions to the done state.
+ ///
+ /// The stream is done, and no further events will arrive.
+ void _setDone() {
+ assert(!_isDone);
+ _state = (_state & _stateClosedRest) | _stateDone;
+ _stateData = null;
+ }
+
+ /// Request the next (yet unrequested) event from the stream.
+ ///
+ /// When the requested event arrives, the returned future is completed with
+ /// the event.
+ /// If the event is a data event, the returned future completes
+ /// with its value.
+ /// If the event is an error event, the returned future completes with
+ /// its error and stack trace.
+ /// If the stream closes before an event arrives, the returned future
+ /// completes with a [StateError].
+ ///
+ /// It's possible to have several pending [next] calls (or other requests),
+ /// and they will be completed in the order they were requested, by the
+ /// first events that were not used by previous requeusts.
+ Future<T> get next {
+ if (!_isClosed) {
+ _NextRequest nextRequest = new _NextRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Returns a stream of all the remaning events of the source stream.
+ ///
+ /// All requested [next], [skip] or [take] operations are completed
+ /// first, and then any remaining events are provided as events of
+ /// the returned stream.
+ ///
+ /// Using `rest` closes the stream events object. After getting the
+ /// `rest` the caller may no longer request other events, like
+ /// after calling [cancel].
+ Stream<T> get rest {
+ if (_isClosed) {
+ throw _failClosed();
+ }
+ _state |= _stateClosedRest;
+ if (_isListening) {
+ // We have an active subscription that we want to take over.
+ var request = new _RestRequest<T>(this);
+ _addRequest(request);
+ return request.stream;
+ }
+ assert(_requestQueue.isEmpty);
+ if (_isDone) {
+ // TODO(lrn): Add Stream.empty() constructor.
+ return new Stream<T>.fromIterable(const []);
+ }
+ // We have never listened to the source stream,
+ // so just return that directly.
+ Stream result = _sourceStream;
+ _setDone();
+ return result;
+ }
+
+ /// Skips the next [count] *data* events.
+ ///
+ /// The [count] must be non-negative.
+ ///
+ /// When successful, this is equivalent to using [take]
+ /// and ignoring the result.
+ ///
+ /// If an error occurs before `count` data events have been skipped,
+ /// the returned future completes with that error instead.
+ ///
+ /// If the stream closes before `count` data events,
+ /// the remaining unskipped event count is returned.
+ /// If the returned future completes with the integer `0`,
+ /// then all events were succssfully skipped. If the value
+ /// is greater than zero then the stream ended early.
+ Future<int> skip(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ var request = new _SkipRequest(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Requests the next [count] data events as a list.
+ ///
+ /// The [count] must be non-negative.
+ ///
+ /// Equivalent to calling [next] `count` times and
+ /// storing the data values in a list.
+ ///
+ /// If an error occurs before `count` data events has
+ /// been collected, the returned future completes with
+ /// that error instead.
+ ///
+ /// If the stream closes before `count` data events,
+ /// the returned future completes with the list
+ /// of data collected so far. That is, the returned
+ /// list may have fewer than [count] elements.
+ Future<List<T>> take(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ var request = new _TakeRequest<T>(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Cancels the underlying stream subscription.
+ ///
+ /// The cancel operation waits until all previously requested
+ /// events have been processed, then it cancels the subscription
+ /// providing the events.
+ ///
+ /// The returned future completes with the result of calling
+ /// `cancel`.
+ ///
+ /// After calling `cancel`, no further events can be requested.
+ /// None of [next], [rest], [skip], [take] or [cancel] may be
+ /// called again.
+ Future cancel() {
+ if (!_isClosed) {
+ _state |= _stateClosed;
+ if (!_isListening) {
+ // The request queue is only non-empty while we are listening.
+ assert(_requestQueue.isEmpty);
+ if (!_isDone) _setDone();
+ return new Future.value();
+ }
+ var request = new _CancelRequest(this);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+
+ /// Returns an error for when a request is made after cancel.
+ ///
+ /// Returns a [StateError] with a message saying that either
+ /// [cancel] or [rest] have already been called.
+ Error _failClosed() {
+ String cause =
+ ((_state & _stateClosedRest) == _stateClosedRest) ? "rest" : "cancel";
+ return new StateError("Already cancelled by a call to $cause");
+ }
+
+ // Callbacks receiving the events of the source stream.
+
+ void _onData(T data) {
+ assert(_requestQueue.isNotEmpty);
+ _EventRequest request = _nextRequest;
+ request.add(data);
+ _checkCompleted();
+ }
+
+ void _onError(error, StackTrace stack) {
+ assert(_requestQueue.isNotEmpty);
+ _EventRequest request = _nextRequest;
+ request.addError(error, stack);
+ _checkCompleted();
+ }
+
+ void _onDone() {
+ _setDone();
+ _closeAllRequests();
+ }
+
+ // Request queue management.
+
+ /// Returns the next request in the queue, but don't remove it.
+ _EventRequest get _nextRequest {
+ return _requestQueue.first;
+ }
+
+ /// Add a new request to the queue.
+ void _addRequest(_EventRequest request) {
+ if (_isDone) {
+ request.close();
+ return;
+ }
+ if (_requestQueue.isEmpty) {
+ if (request.isComplete) {
+ // Some requests are complete without receiving any events.
+ // This includes [cancel] and [rest] requests, as well as
+ // [take] and [skip] events with zero count.
+
+ // We can skip listening and jsut complete the request immediately.
+ request.close();
+ return;
+ }
+ // Continue listening on the source stream.
+ // The source stream is paused while the requeust queue is empty,
+ // except at the beginning when it hasn't been listened to at all.
+ if (_isListening) {
+ _subscription.resume();
+ } else if (!_isDone) {
+ _setListening(
+ _sourceStream.listen(_onData, onError: _onError, onDone: _onDone));
+ }
+ }
+ _requestQueue.add(request);
+ }
+
+ /// Remove all requests and close them.
+ ///
+ /// Used when the source stream is done.
+ void _closeAllRequests() {
+ assert(_isDone);
+ while (_requestQueue.isNotEmpty) {
+ _requestQueue.removeFirst().close();
+ }
+ }
+
+ /// Check whether the next requests in the queue are complete.
+ ///
+ /// If so, remove them and call their `close` method.
+ void _checkCompleted() {
+ // Close-actions are executed immediately when they become the
+ // next (and last) event in the queue.
+ // When _isClosed and the queue is not empty, the last element
+ // of the queue is the close action.
+ while (_requestQueue.isNotEmpty) {
+ if (!_requestQueue.first.isComplete) {
+ return;
+ }
+ _requestQueue.removeFirst().close();
+ }
+ assert(_requestQueue.isEmpty);
+ if (!_isDone) {
+ // Pause the underlying subscription.
+ // Won't get here without adding a request, so we must be listening
+ // already.
+ assert(_isListening);
+ _subscription.pause();
+ }
+ }
+
+ /// Extracts the subscription and makes the events object unusable.
+ ///
+ /// Can only be used by the very last request.
+ StreamSubscription _dispose() {
+ assert(_isClosed);
+ assert(_isListening);
+ assert(_requestQueue.isEmpty);
+ StreamSubscription subscription = _subscription;
+ _setDone();
+ return subscription;
+ }
+}
+
+/// Request object that receives events when they arrive, until fulfilled.
+///
+/// Each request that cannot be fulfilled immediately is represented by
+/// an `_EventRequest` object in the request queue.
+///
+/// Events from the source stream are sent to the first request in the
+/// queue until it reports itself as [isComplete].
+///
+/// When the first request in the queue `isComplete`, either when becoming
+/// the first request or after receiving an event, its [close] methods is
+/// called.
+///
+/// The [close] method is also called immediately when the source stream
+/// is done.
+abstract class _EventRequest implements EventSink {
+ /// Handle a data event.
+ void add(data);
+
+ /// Handle an error event.
+ void addError(error, [StackTrace stackTrace]);
+
+ /// Complete the request.
+ ///
+ /// This may be called either when [isComplete] returns true,
+ /// or if the source stream is done.
+ void close();
+
+ /// Whether the request considers itself fulfilled.
+ ///
+ /// This is checked whenever a request becomes the first request
+ /// in the request queue, and after it receives an event.
+ ///
+ /// When a request is complete, its [close] method is called and
+ /// it's removed from the request queue.
+ bool get isComplete;
+}
+
+/// Request for a [StreamEvents.next] call.
+///
+/// Completes the returned future when receiving the first event,
+/// and is then complete.
+class _NextRequest<T> implements _EventRequest {
+ /// Completer for the future returned by [StreamEvents.next].
+ ///
+ /// Set to `null` when it is completed, to mark it as already complete.
+ final Completer _completer;
+
+ _NextRequest() : _completer = new Completer<T>();
+
+ Future<T> get future => _completer.future;
+
+ void add(data) {
+ _completer.complete(data);
+ }
+
+ void addError(error, [StackTrace stack]) {
+ _completer.completeError(error, stack);
+ }
+
+ void close() {
+ if (!_completer.isCompleted) {
+ _completer.completeError(new StateError("no elements"));
+ }
+ }
+
+ bool get isComplete => _completer.isCompleted;
+}
+
+/// Request for a [StreamEvents.skip] call.
+class _SkipRequest implements _EventRequest {
+ /// Completer for the future returned by the skip call.
+ final Completer _completer = new Completer<int>();
+
+ /// Number of remaining events to skip.
+ ///
+ /// The request [isComplete] when the values reaches zero.
+ ///
+ /// Decremented when an event is seen.
+ /// Set to zero when an error is seen since errors abort the skip request.
+ int _eventsToSkip;
+
+ _SkipRequest(this._eventsToSkip);
+
+ /// The future completed when the correct number of events have been skipped.
+ Future get future => _completer.future;
+
+ void add(data) {
+ assert(_eventsToSkip > 0);
+ _eventsToSkip--;
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ _eventsToSkip = 0;
+ _completer.completeError(error, stackTrace);
+ }
+
+ void close() {
+ if (!_completer.isCompleted) {
+ _completer.complete(_eventsToSkip);
+ }
+ }
+
+ bool get isComplete => _eventsToSkip == 0;
+}
+
+/// Request for a [StreamEvents.take] call.
+class _TakeRequest<T> implements _EventRequest {
+ /// Completer for the future returned by the take call.
+ final Completer _completer;
+
+ /// List collecting events until enough have been seen.
+ final List _list = <T>[];
+
+ /// Number of events to capture.
+ ///
+ /// The request [isComplete] when the length of [_list] reaches
+ /// this value.
+ final int _eventsToTake;
+
+ _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
+
+ /// The future completed when the correct number of events have been captured.
+ Future get future => _completer.future;
+
+ void add(data) {
+ _list.add(data);
+ }
+
+ void addError(error, [StackTrace stack]) {
+ _completer.completeError(error, stack);
+ }
+
+ void close() {
+ if (!_completer.isCompleted) {
+ _completer.complete(_list);
+ }
+ }
+
+ bool get isComplete =>
+ _list.length == _eventsToTake || _completer.isCompleted;
+}
+
+/// Request for a [StreamEvents.cancel] call.
+///
+/// The request is always complete, it just waits in the request queue
+/// until all previous events are fulfilled, then it cancels the stream events
+/// subscription.
+class _CancelRequest implements _EventRequest {
+ /// Completer for the future returned by the `cancel` call.
+ final Completer _completer = new Completer();
+
+ /// The [StreamEvents] object that has this request queued.
+ ///
+ /// When the event is completed, it needs to cancel the active subscription
+ /// of the `StreamEvents` object, if any.
+ final StreamEvents _events;
+
+ _CancelRequest(this._events);
+
+ /// The future completed when the cancel request is completed.
+ Future get future => _completer.future;
+
+ void add(data) {
+ assert(false); // Unreachable.
+ }
+
+ void addError(error, [StackTrace stack]) {
+ assert(false); // Unreachable.
+ }
+
+ void close() {
+ if (_events._isListening) {
+ _completer.complete(_events._dispose().cancel());
+ } else {
+ _completer.complete();
+ }
+ }
+
+ bool get isComplete => true;
+}
+
+/// Request for a [StreamEvents.rest] call.
+///
+/// The request is always complete, it just waits in the request queue
+/// until all previous events are fulfilled, then it takes over the
+/// stream events subscription and creates a stream from it.
+class _RestRequest<T> implements _EventRequest {
+ /// Completer for the stream returned by the `rest` call.
+ final StreamCompleter _completer;
+
+ /// The [StreamEvents] object that has this request queued.
+ ///
+ /// When the event is completed, it needs to cancel the active subscription
+ /// of the `StreamEvents` object, if any.
+ final StreamEvents _events;
+ _RestRequest(this._events) : _completer = new StreamCompleter<T>();
+
+ /// The future which will contain the remaining events of [_events].
+ Stream<T> get stream => _completer.stream;
+
+ void add(data) {
+ assert(false); // Unreachable.
+ }
+
+ void addError(error, [StackTrace stack]) {
+ assert(false); // Unreachable.
+ }
+
+ void close() {
+ if (_events._isListening) {
+ StreamSubscription subscription = _events._dispose();
+ _completer.setSourceStream(new SubscriptionStream<T>(subscription));
+ if (subscription.isPaused) subscription.resume();
+ } else {
+ assert(_events._isDone);
+ _completer.setEmpty();
+ }
+ }
+
+ bool get isComplete => true;
+}

Powered by Google App Engine
This is Rietveld 408576698