| Index: lib/src/stream_events.dart
|
| diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7f6bcd6904b41c302666706f5b954c31f41a1d0e
|
| --- /dev/null
|
| +++ b/lib/src/stream_events.dart
|
| @@ -0,0 +1,629 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library async.stream_events;
|
| +
|
| +import 'dart:async';
|
| +import 'dart:collection';
|
| +
|
| +import "subscription_stream.dart";
|
| +import "stream_completer.dart";
|
| +
|
| +/// An asynchronous pull-based interface for accessing stream events.
|
| +///
|
| +/// Wraps a stream and makes individual events available on request.
|
| +///
|
| +/// You can request (and reserve) one or more events from the stream,
|
| +/// and after all previous requestes have been fulfilled, stream events
|
| +/// go towards fulfilling your request.
|
| +///
|
| +/// For example, if you ask for [next] two times, the returned futures
|
| +/// will be completed by the next two unreserved events from the stream.
|
| +///
|
| +/// The stream subscription is paused when there are no active
|
| +/// requests.
|
| +///
|
| +/// Some streams, including broadcast streams, will buffer
|
| +/// events while paused, so waiting too long between requests may
|
| +/// cause memory bloat somewhere else.
|
| +///
|
| +/// The individual requests are served in the order they are requested,
|
| +/// and the stream subscription is paused when there are no active requests.
|
| +///
|
| +/// This is similar to, but more convenient than, a [StreamIterator].
|
| +/// A `StreamIterator` requires you to manually check when a new event is
|
| +/// available and you can only access the value of that event until you
|
| +/// check for the next one. A `StreamEvents` allows you to request, for example,
|
| +/// three events at a time, either individually, as a group using [take]
|
| +/// or [skip], or in any combination.
|
| +///
|
| +/// You can also ask to have the [rest] of the stream provided as
|
| +/// a new stream. This allows, for example, taking the first event
|
| +/// out of a stream and continue using the rest of the stream as a stream.
|
| +///
|
| +/// Example:
|
| +///
|
| +/// var events = new StreamEvents<String>(someStreamOfLines);
|
| +/// var first = await events.next;
|
| +/// while (first.startsWith('#')) {
|
| +/// // Skip comments.
|
| +/// first = await events.next;
|
| +/// }
|
| +///
|
| +/// if (first.startsWith(MAGIC_MARKER)) {
|
| +/// var headerCount =
|
| +/// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
|
| +/// handleMessage(headers: await events.take(headerCount),
|
| +/// body: events.rest);
|
| +/// return;
|
| +/// }
|
| +/// // Error handling.
|
| +///
|
| +/// When you need no further events the `StreamEvents` should be closed
|
| +/// using [cancel]. This releases the underlying stream subscription.
|
| +///
|
| +/// The underlying stream subscription is paused when there
|
| +/// are no requeusts. Some subscriptions, including those of broadcast streams,
|
| +/// will still buffer events while paused. Creating a `StreamEvents` from
|
| +/// such a stream and stopping to request events, will cause memory to fill up
|
| +/// unnecessarily.
|
| +class StreamEvents<T> {
|
| + /// The initial state, where the stream has not been listened to yet.
|
| + ///
|
| + /// It will be listened to when the first event is requested.
|
| + /// The `stateData` field holds the stream and the request queue is empty.
|
| + static const int _stateInitial = 0;
|
| +
|
| + /// Listening on a stream that hasn't completed yet.
|
| + ///
|
| + /// If the request queue is empty, the subscription is paused.
|
| + /// The `stateData` field holds the active subscription.
|
| + static const int _stateListening = 1;
|
| +
|
| + /// The stream has completed.
|
| + ///
|
| + /// The `stateData` field is `null` and the request queue is empty.
|
| + static const int _stateDone = 2;
|
| +
|
| + /// Flag set when [cancel] is called.
|
| + /// The `StreamEvents` is closed and no further events can be requested.
|
| + static const int _stateClosed = 4;
|
| +
|
| + /// Flag combined with [_stateClosed] to say it was closed using [rest].
|
| + /// Only used for error reporting purposes.
|
| + static const int _restFlag = 8;
|
| +
|
| + /// Flag set when [rest] is called.
|
| + /// Only used for error reporting, otherwise equivalent to [_stateClosed].
|
| + static const int _stateClosedRest = _stateClosed | _restFlag;
|
| +
|
| + /// Current state.
|
| + ///
|
| + /// Use getters below to check if the state is [_isListening] or [_isDone],
|
| + /// and whether the stream events object [_isClosed].
|
| + int _state = _stateInitial;
|
| +
|
| + /// Value depending on state. Use getters below to get the value and assert
|
| + /// the expected state.
|
| + var _stateData;
|
| +
|
| + /// Queue of pending requests while state is [_stateListening].
|
| + /// Access through methods below to ensure consistency.
|
| + final Queue<_EventRequest> _requestQueue = new Queue();
|
| +
|
| + StreamEvents(Stream source) : _stateData = source;
|
| +
|
| + /// Whether we are currently listening on a stream subscription.
|
| + bool get _isListening => (_state & _stateListening) != 0;
|
| +
|
| + /// Whether the underlying stream is done.
|
| + ///
|
| + /// This may return true before all events have been delivered.
|
| + /// Requesting a new event when [_isDone] returns true,
|
| + /// for example using [next], will always fail.
|
| + bool get _isDone => (_state & _stateDone) != 0;
|
| +
|
| + /// Whether the stream events has been closed.
|
| + ///
|
| + /// While closed, no further requests can be made.
|
| + bool get _isClosed => (_state & _stateClosed) != 0;
|
| +
|
| + /// Returns the stream subscription while in a listening state.
|
| + StreamSubscription get _subscription {
|
| + assert(_isListening);
|
| + return _stateData;
|
| + }
|
| +
|
| + /// Returns the source stream while in the initial state.
|
| + Stream get _sourceStream {
|
| + assert(!_isListening);
|
| + assert(!_isDone);
|
| + return _stateData;
|
| + }
|
| +
|
| + /// Sets the subscription and transitions to listening state.
|
| + void _setListening(StreamSubscription subscription) {
|
| + assert(!_isListening);
|
| + assert(!_isDone);
|
| + _stateData = subscription;
|
| + _state |= _stateListening;
|
| + }
|
| +
|
| + /// Transitions to the done state.
|
| + ///
|
| + /// The stream is done, and no further events will arrive.
|
| + void _setDone() {
|
| + assert(!_isDone);
|
| + _state = (_state & _stateClosedRest) | _stateDone;
|
| + _stateData = null;
|
| + }
|
| +
|
| + /// Request the next (yet unrequested) event from the stream.
|
| + ///
|
| + /// When the requested event arrives, the returned future is completed with
|
| + /// the event.
|
| + /// If the event is a data event, the returned future completes
|
| + /// with its value.
|
| + /// If the event is an error event, the returned future completes with
|
| + /// its error and stack trace.
|
| + /// If the stream closes before an event arrives, the returned future
|
| + /// completes with a [StateError].
|
| + ///
|
| + /// It's possible to have several pending [next] calls (or other requests),
|
| + /// and they will be completed in the order they were requested, by the
|
| + /// first events that were not used by previous requeusts.
|
| + Future<T> get next {
|
| + if (!_isClosed) {
|
| + _NextRequest nextRequest = new _NextRequest<T>();
|
| + _addRequest(nextRequest);
|
| + return nextRequest.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Returns a stream of all the remaning events of the source stream.
|
| + ///
|
| + /// All requested [next], [skip] or [take] operations are completed
|
| + /// first, and then any remaining events are provided as events of
|
| + /// the returned stream.
|
| + ///
|
| + /// Using `rest` closes the stream events object. After getting the
|
| + /// `rest` the caller may no longer request other events, like
|
| + /// after calling [cancel].
|
| + Stream<T> get rest {
|
| + if (_isClosed) {
|
| + throw _failClosed();
|
| + }
|
| + _state |= _stateClosedRest;
|
| + if (_isListening) {
|
| + // We have an active subscription that we want to take over.
|
| + var request = new _RestRequest<T>(this);
|
| + _addRequest(request);
|
| + return request.stream;
|
| + }
|
| + assert(_requestQueue.isEmpty);
|
| + if (_isDone) {
|
| + // TODO(lrn): Add Stream.empty() constructor.
|
| + return new Stream<T>.fromIterable(const []);
|
| + }
|
| + // We have never listened to the source stream,
|
| + // so just return that directly.
|
| + Stream result = _sourceStream;
|
| + _setDone();
|
| + return result;
|
| + }
|
| +
|
| + /// Skips the next [count] *data* events.
|
| + ///
|
| + /// The [count] must be non-negative.
|
| + ///
|
| + /// When successful, this is equivalent to using [take]
|
| + /// and ignoring the result.
|
| + ///
|
| + /// If an error occurs before `count` data events have been skipped,
|
| + /// the returned future completes with that error instead.
|
| + ///
|
| + /// If the stream closes before `count` data events,
|
| + /// the remaining unskipped event count is returned.
|
| + /// If the returned future completes with the integer `0`,
|
| + /// then all events were succssfully skipped. If the value
|
| + /// is greater than zero then the stream ended early.
|
| + Future<int> skip(int count) {
|
| + if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + if (!_isClosed) {
|
| + var request = new _SkipRequest(count);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Requests the next [count] data events as a list.
|
| + ///
|
| + /// The [count] must be non-negative.
|
| + ///
|
| + /// Equivalent to calling [next] `count` times and
|
| + /// storing the data values in a list.
|
| + ///
|
| + /// If an error occurs before `count` data events has
|
| + /// been collected, the returned future completes with
|
| + /// that error instead.
|
| + ///
|
| + /// If the stream closes before `count` data events,
|
| + /// the returned future completes with the list
|
| + /// of data collected so far. That is, the returned
|
| + /// list may have fewer than [count] elements.
|
| + Future<List<T>> take(int count) {
|
| + if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + if (!_isClosed) {
|
| + var request = new _TakeRequest<T>(count);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Cancels the underlying stream subscription.
|
| + ///
|
| + /// The cancel operation waits until all previously requested
|
| + /// events have been processed, then it cancels the subscription
|
| + /// providing the events.
|
| + ///
|
| + /// The returned future completes with the result of calling
|
| + /// `cancel`.
|
| + ///
|
| + /// After calling `cancel`, no further events can be requested.
|
| + /// None of [next], [rest], [skip], [take] or [cancel] may be
|
| + /// called again.
|
| + Future cancel() {
|
| + if (!_isClosed) {
|
| + _state |= _stateClosed;
|
| + if (!_isListening) {
|
| + // The request queue is only non-empty while we are listening.
|
| + assert(_requestQueue.isEmpty);
|
| + if (!_isDone) _setDone();
|
| + return new Future.value();
|
| + }
|
| + var request = new _CancelRequest(this);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Returns an error for when a request is made after cancel.
|
| + ///
|
| + /// Returns a [StateError] with a message saying that either
|
| + /// [cancel] or [rest] have already been called.
|
| + Error _failClosed() {
|
| + String cause =
|
| + ((_state & _stateClosedRest) == _stateClosedRest) ? "rest" : "cancel";
|
| + return new StateError("Already cancelled by a call to $cause");
|
| + }
|
| +
|
| + // Callbacks receiving the events of the source stream.
|
| +
|
| + void _onData(T data) {
|
| + assert(_requestQueue.isNotEmpty);
|
| + _EventRequest request = _nextRequest;
|
| + request.add(data);
|
| + _checkCompleted();
|
| + }
|
| +
|
| + void _onError(error, StackTrace stack) {
|
| + assert(_requestQueue.isNotEmpty);
|
| + _EventRequest request = _nextRequest;
|
| + request.addError(error, stack);
|
| + _checkCompleted();
|
| + }
|
| +
|
| + void _onDone() {
|
| + _setDone();
|
| + _closeAllRequests();
|
| + }
|
| +
|
| + // Request queue management.
|
| +
|
| + /// Returns the next request in the queue, but don't remove it.
|
| + _EventRequest get _nextRequest {
|
| + return _requestQueue.first;
|
| + }
|
| +
|
| + /// Add a new request to the queue.
|
| + void _addRequest(_EventRequest request) {
|
| + if (_isDone) {
|
| + request.close();
|
| + return;
|
| + }
|
| + if (_requestQueue.isEmpty) {
|
| + if (request.isComplete) {
|
| + // Some requests are complete without receiving any events.
|
| + // This includes [cancel] and [rest] requests, as well as
|
| + // [take] and [skip] events with zero count.
|
| +
|
| + // We can skip listening and jsut complete the request immediately.
|
| + request.close();
|
| + return;
|
| + }
|
| + // Continue listening on the source stream.
|
| + // The source stream is paused while the requeust queue is empty,
|
| + // except at the beginning when it hasn't been listened to at all.
|
| + if (_isListening) {
|
| + _subscription.resume();
|
| + } else if (!_isDone) {
|
| + _setListening(
|
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone));
|
| + }
|
| + }
|
| + _requestQueue.add(request);
|
| + }
|
| +
|
| + /// Remove all requests and close them.
|
| + ///
|
| + /// Used when the source stream is done.
|
| + void _closeAllRequests() {
|
| + assert(_isDone);
|
| + while (_requestQueue.isNotEmpty) {
|
| + _requestQueue.removeFirst().close();
|
| + }
|
| + }
|
| +
|
| + /// Check whether the next requests in the queue are complete.
|
| + ///
|
| + /// If so, remove them and call their `close` method.
|
| + void _checkCompleted() {
|
| + // Close-actions are executed immediately when they become the
|
| + // next (and last) event in the queue.
|
| + // When _isClosed and the queue is not empty, the last element
|
| + // of the queue is the close action.
|
| + while (_requestQueue.isNotEmpty) {
|
| + if (!_requestQueue.first.isComplete) {
|
| + return;
|
| + }
|
| + _requestQueue.removeFirst().close();
|
| + }
|
| + assert(_requestQueue.isEmpty);
|
| + if (!_isDone) {
|
| + // Pause the underlying subscription.
|
| + // Won't get here without adding a request, so we must be listening
|
| + // already.
|
| + assert(_isListening);
|
| + _subscription.pause();
|
| + }
|
| + }
|
| +
|
| + /// Extracts the subscription and makes the events object unusable.
|
| + ///
|
| + /// Can only be used by the very last request.
|
| + StreamSubscription _dispose() {
|
| + assert(_isClosed);
|
| + assert(_isListening);
|
| + assert(_requestQueue.isEmpty);
|
| + StreamSubscription subscription = _subscription;
|
| + _setDone();
|
| + return subscription;
|
| + }
|
| +}
|
| +
|
| +/// Request object that receives events when they arrive, until fulfilled.
|
| +///
|
| +/// Each request that cannot be fulfilled immediately is represented by
|
| +/// an `_EventRequest` object in the request queue.
|
| +///
|
| +/// Events from the source stream are sent to the first request in the
|
| +/// queue until it reports itself as [isComplete].
|
| +///
|
| +/// When the first request in the queue `isComplete`, either when becoming
|
| +/// the first request or after receiving an event, its [close] methods is
|
| +/// called.
|
| +///
|
| +/// The [close] method is also called immediately when the source stream
|
| +/// is done.
|
| +abstract class _EventRequest implements EventSink {
|
| + /// Handle a data event.
|
| + void add(data);
|
| +
|
| + /// Handle an error event.
|
| + void addError(error, [StackTrace stackTrace]);
|
| +
|
| + /// Complete the request.
|
| + ///
|
| + /// This may be called either when [isComplete] returns true,
|
| + /// or if the source stream is done.
|
| + void close();
|
| +
|
| + /// Whether the request considers itself fulfilled.
|
| + ///
|
| + /// This is checked whenever a request becomes the first request
|
| + /// in the request queue, and after it receives an event.
|
| + ///
|
| + /// When a request is complete, its [close] method is called and
|
| + /// it's removed from the request queue.
|
| + bool get isComplete;
|
| +}
|
| +
|
| +/// Request for a [StreamEvents.next] call.
|
| +///
|
| +/// Completes the returned future when receiving the first event,
|
| +/// and is then complete.
|
| +class _NextRequest<T> implements _EventRequest {
|
| + /// Completer for the future returned by [StreamEvents.next].
|
| + ///
|
| + /// Set to `null` when it is completed, to mark it as already complete.
|
| + final Completer _completer;
|
| +
|
| + _NextRequest() : _completer = new Completer<T>();
|
| +
|
| + Future<T> get future => _completer.future;
|
| +
|
| + void add(data) {
|
| + _completer.complete(data);
|
| + }
|
| +
|
| + void addError(error, [StackTrace stack]) {
|
| + _completer.completeError(error, stack);
|
| + }
|
| +
|
| + void close() {
|
| + if (!_completer.isCompleted) {
|
| + _completer.completeError(new StateError("no elements"));
|
| + }
|
| + }
|
| +
|
| + bool get isComplete => _completer.isCompleted;
|
| +}
|
| +
|
| +/// Request for a [StreamEvents.skip] call.
|
| +class _SkipRequest implements _EventRequest {
|
| + /// Completer for the future returned by the skip call.
|
| + final Completer _completer = new Completer<int>();
|
| +
|
| + /// Number of remaining events to skip.
|
| + ///
|
| + /// The request [isComplete] when the values reaches zero.
|
| + ///
|
| + /// Decremented when an event is seen.
|
| + /// Set to zero when an error is seen since errors abort the skip request.
|
| + int _eventsToSkip;
|
| +
|
| + _SkipRequest(this._eventsToSkip);
|
| +
|
| + /// The future completed when the correct number of events have been skipped.
|
| + Future get future => _completer.future;
|
| +
|
| + void add(data) {
|
| + assert(_eventsToSkip > 0);
|
| + _eventsToSkip--;
|
| + }
|
| +
|
| + void addError(error, [StackTrace stackTrace]) {
|
| + _eventsToSkip = 0;
|
| + _completer.completeError(error, stackTrace);
|
| + }
|
| +
|
| + void close() {
|
| + if (!_completer.isCompleted) {
|
| + _completer.complete(_eventsToSkip);
|
| + }
|
| + }
|
| +
|
| + bool get isComplete => _eventsToSkip == 0;
|
| +}
|
| +
|
| +/// Request for a [StreamEvents.take] call.
|
| +class _TakeRequest<T> implements _EventRequest {
|
| + /// Completer for the future returned by the take call.
|
| + final Completer _completer;
|
| +
|
| + /// List collecting events until enough have been seen.
|
| + final List _list = <T>[];
|
| +
|
| + /// Number of events to capture.
|
| + ///
|
| + /// The request [isComplete] when the length of [_list] reaches
|
| + /// this value.
|
| + final int _eventsToTake;
|
| +
|
| + _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
|
| +
|
| + /// The future completed when the correct number of events have been captured.
|
| + Future get future => _completer.future;
|
| +
|
| + void add(data) {
|
| + _list.add(data);
|
| + }
|
| +
|
| + void addError(error, [StackTrace stack]) {
|
| + _completer.completeError(error, stack);
|
| + }
|
| +
|
| + void close() {
|
| + if (!_completer.isCompleted) {
|
| + _completer.complete(_list);
|
| + }
|
| + }
|
| +
|
| + bool get isComplete =>
|
| + _list.length == _eventsToTake || _completer.isCompleted;
|
| +}
|
| +
|
| +/// Request for a [StreamEvents.cancel] call.
|
| +///
|
| +/// The request is always complete, it just waits in the request queue
|
| +/// until all previous events are fulfilled, then it cancels the stream events
|
| +/// subscription.
|
| +class _CancelRequest implements _EventRequest {
|
| + /// Completer for the future returned by the `cancel` call.
|
| + final Completer _completer = new Completer();
|
| +
|
| + /// The [StreamEvents] object that has this request queued.
|
| + ///
|
| + /// When the event is completed, it needs to cancel the active subscription
|
| + /// of the `StreamEvents` object, if any.
|
| + final StreamEvents _events;
|
| +
|
| + _CancelRequest(this._events);
|
| +
|
| + /// The future completed when the cancel request is completed.
|
| + Future get future => _completer.future;
|
| +
|
| + void add(data) {
|
| + assert(false); // Unreachable.
|
| + }
|
| +
|
| + void addError(error, [StackTrace stack]) {
|
| + assert(false); // Unreachable.
|
| + }
|
| +
|
| + void close() {
|
| + if (_events._isListening) {
|
| + _completer.complete(_events._dispose().cancel());
|
| + } else {
|
| + _completer.complete();
|
| + }
|
| + }
|
| +
|
| + bool get isComplete => true;
|
| +}
|
| +
|
| +/// Request for a [StreamEvents.rest] call.
|
| +///
|
| +/// The request is always complete, it just waits in the request queue
|
| +/// until all previous events are fulfilled, then it takes over the
|
| +/// stream events subscription and creates a stream from it.
|
| +class _RestRequest<T> implements _EventRequest {
|
| + /// Completer for the stream returned by the `rest` call.
|
| + final StreamCompleter _completer;
|
| +
|
| + /// The [StreamEvents] object that has this request queued.
|
| + ///
|
| + /// When the event is completed, it needs to cancel the active subscription
|
| + /// of the `StreamEvents` object, if any.
|
| + final StreamEvents _events;
|
| + _RestRequest(this._events) : _completer = new StreamCompleter<T>();
|
| +
|
| + /// The future which will contain the remaining events of [_events].
|
| + Stream<T> get stream => _completer.stream;
|
| +
|
| + void add(data) {
|
| + assert(false); // Unreachable.
|
| + }
|
| +
|
| + void addError(error, [StackTrace stack]) {
|
| + assert(false); // Unreachable.
|
| + }
|
| +
|
| + void close() {
|
| + if (_events._isListening) {
|
| + StreamSubscription subscription = _events._dispose();
|
| + _completer.setSourceStream(new SubscriptionStream<T>(subscription));
|
| + if (subscription.isPaused) subscription.resume();
|
| + } else {
|
| + assert(_events._isDone);
|
| + _completer.setEmpty();
|
| + }
|
| + }
|
| +
|
| + bool get isComplete => true;
|
| +}
|
|
|