Chromium Code Reviews| Index: lib/src/stream_events.dart |
| diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9cf8776b19f78310bf3a78b15e1c3d0dd72f1fd0 |
| --- /dev/null |
| +++ b/lib/src/stream_events.dart |
| @@ -0,0 +1,450 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.streams.stream_events; |
| + |
| +import 'dart:async'; |
| +import 'dart:collection'; |
| + |
| +import "subscription_stream.dart"; |
| +import "stream_completer.dart"; |
| + |
| +/// An asynchronous pull-based interface for accessing stream events. |
| +/// |
| +/// Wraps a stream and makes individual events available on request. |
| +class StreamEvents<T> { |
|
ahe
2015/06/08 16:55:16
It took me quite some time to realize this is the
Lasse Reichstein Nielsen
2015/06/09 07:33:53
This is a little hard to name well.
PullStream wo
|
| + /// In the initial state, the stream has not been listened to yet. |
| + /// It will be listened to when the first event is requested. |
| + /// The `stateData` field holds the stream and the request queue is empty. |
| + static const int _INITIAL = 0; |
| + |
| + /// Listening on the stream. |
| + /// If the request queue is empty and the subscription isn't done, |
| + /// the subscription is paused. |
| + /// The `stateData` field holds the subscription. |
| + static const int _LISTENING = 1; |
| + |
| + /// The stream has completed. |
| + /// The `stateData` field is `null` and the request queue is empty. |
| + static const int _DONE = 2; |
| + |
| + /// Flag set when [close] is called. |
| + /// The `StreamEvents` is closed and no further events can be requested. |
| + /// While set, the last elmement of the request queue is an |
| + /// [_EventCloseAction]. |
| + static const int _CLOSED = 8; |
| + |
| + /// Flag set when [rest] is called. |
| + /// Only used for error reporting, otherwise equivalent to [_CLOSED]. |
| + static const int _CLOSED_REST = 12; |
| + |
| + /// Current state. |
| + /// |
| + /// Use getters below to check if the state is [_isListening] or [_isDone], |
| + /// and whether the stream events object [_isClosed]. |
| + int _state = _INITIAL; |
| + |
| + /// Value depending on state. Use getters below to get the value and assert |
| + /// the expected state. |
| + var _stateData; |
| + |
| + /// Queue of pending requests while state is [_LISTENING]. |
| + /// Access through methods below to ensure consistency. |
| + Queue<_EventAction> _requestQueue = new Queue(); |
| + |
| + StreamEvents(Stream source) : _stateData = source; |
| + |
| + bool get _isListening => (_state & _LISTENING) != 0; |
| + bool get _isClosed => (_state & _CLOSED) != 0; |
| + bool get _isDone => (_state & _DONE) != 0; |
| + |
| + /// Whether the underlying stream is spent. |
| + /// |
| + /// This may return true before all events have been delivered. |
| + /// Requesting a new event when [isDone] returns true, |
| + /// for example using [next], will always fail. |
| + bool get isDone => _isDone; |
| + |
| + /// Return the stream subscription while state is listening. |
| + StreamSubscription get _subscription { |
| + assert(_isListening); |
| + return _stateData; |
| + } |
| + |
| + /// Return the source stream while state is initial. |
| + Stream get _sourceStream { |
| + assert(!_isListening); |
| + assert(!_isDone); |
| + return _stateData; |
| + } |
| + |
| + // Set the subscription and transition to listening state. |
| + void set _subscription(StreamSubscription subscription) { |
| + assert(!_isListening); |
| + assert(!_isDone); |
| + _stateData = subscription; |
| + _state |= _LISTENING; |
| + } |
| + |
| + void _setDone() { |
| + assert(!_isDone); |
| + _state = (_state & _CLOSED_REST) | _DONE; |
| + _stateData = null; |
| + } |
| + |
| + /// Request the next (yet unrequested) event from the stream. |
| + /// |
| + /// When the requested event arrives, the returned future is completed with |
| + /// the event. This is independent of whether the event is a data event or |
| + /// an error event. |
| + /// |
| + /// If the stream closed before an event arrives, the future is completed |
| + /// with a [StateError]. |
| + Future<T> get next { |
| + if (!_isClosed) { |
| + Completer completer = new Completer<T>(); |
| + _addAction(new _NextAction(completer)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Request a stream of all the remaning events of the source stream. |
| + /// |
| + /// All requested [next], [skip] or [take] operations are completed |
| + /// first, and then any remaining events are provided as events of |
| + /// the returned stream. |
| + /// |
| + /// Using `rest` closes the stream events object. After getting the |
| + /// `rest` it is no longer allowed to request other events, like |
| + /// after calling [close]. |
| + Stream<T> get rest { |
| + if (!_isClosed) { |
| + _state |= _CLOSED_REST; |
| + if (_isListening) { |
| + // We have an active subscription that we want to take over. |
| + var delayStream = new StreamCompleter<T>(); |
| + _addAction(new _RestAction<T>(delayStream)); |
| + return delayStream.stream; |
| + } |
| + assert(_requestQueue.isEmpty); |
| + if (isDone) { |
| + // TODO(lrn): Add Stream.empty() constructor. |
| + return new Stream<T>.fromIterable(const []); |
| + } |
| + // We have never listened the source stream, so just return that directly. |
| + Stream result = _sourceStream; |
| + _setDone(); |
| + return result; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Requests to skip the next [count] *data* events. |
| + /// |
| + /// The [count] value must be greater than zero. |
| + /// |
| + /// When successful, this is equivalent to using [take] |
| + /// and ignoring the result. |
| + /// |
| + /// If an error occurs before `count` data events has |
| + /// been skipped, the returned future completes with |
| + /// that error instead. |
| + /// |
| + /// If the stream closes before `count` data events, |
| + /// the remaining unskipped event count is returned. |
| + /// If the returned future completes with the integer `0`, |
| + /// then all events were succssfully skipped. If the value |
| + /// is greater than zero then the stream ended early. |
| + Future<int> skip(int count) { |
| + if (count <= 0) throw new RangeError.range(count, 0, null, "count"); |
| + if (!_isClosed) { |
| + Completer completer = new Completer<int>(); |
| + _addAction(new _SkipAction(completer, count)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Requests the next [count] data events as a list. |
| + /// |
| + /// The [count] value must be greater than zero. |
| + /// |
| + /// Equivalent to calling [next] `count` times and |
| + /// storing the data values in a list. |
| + /// |
| + /// If an error occurs before `count` data events has |
| + /// been collected, the returned future completes with |
| + /// that error instead. |
| + /// |
| + /// If the stream closes before `count` data events, |
| + /// the returned future completes with the list |
| + /// of data collected so far. That is, the returned |
| + /// list may have fewer than [count] elements. |
| + Future<List<T>> take(int count) { |
| + if (count <= 0) throw new RangeError.range(count, 0, null, "count"); |
| + if (!_isClosed) { |
| + Completer completer = new Completer<List<T>>(); |
| + _addAction(new _TakeAction(completer, count)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Release the underlying stream subscription. |
| + /// |
| + /// The close operation waits until all previously requested |
| + /// events have been processed, then it cancels the subscription |
| + /// providing the events. |
| + /// |
| + /// The returned future completes with the result of calling |
| + /// `cancel`. |
| + /// |
| + /// After calling `close`, no further events can be requested. |
| + /// None of [next], [rest], [skip], [take] or [close] may be |
| + /// called again. |
| + Future close() { |
| + if (!_isClosed) { |
| + _state |= _CLOSED; |
| + if (!_isListening) { |
| + assert(_requestQueue.isEmpty); |
| + if (!_isDone) _setDone(); |
| + return new Future.value(); |
| + } |
| + Completer completer = new Completer(); |
| + _addAction(new _CloseAction(completer)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Reused error message. |
| + Error _failClosed() { |
| + String cause = |
| + ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; |
| + return new StateError("Already closed by a call to $cause"); |
| + } |
| + |
| + /// Called when requesting an event when the requeust queue is empty. |
| + /// The underlying subscription is paused in that case, except the very |
| + /// first time when the subscription needs to be created. |
| + void _listen() { |
| + if (_isListening) { |
| + _subscription.resume(); |
| + } else if (!_isDone) { |
| + _subscription = |
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| + } |
| + } |
| + |
| + /// Pauses the underlying subscription. |
| + /// Called when the request queue is empty and the subscription isn't closed. |
| + void _pause() { |
| + assert(_isListening); |
| + _subscription.pause(); |
| + } |
| + |
| + // Callbacks receiving the events of the source stream. |
| + void _onData(T data) { |
| + assert(_requestQueue.isNotEmpty); |
| + _EventAction action = _nextAction; |
| + if (action.data(data)) { |
| + _completeAction(); |
| + } |
| + } |
| + |
| + void _onError(error, StackTrace stack) { |
| + assert(_requestQueue.isNotEmpty); |
| + _EventAction action = _nextAction; |
| + action.error(error, stack); |
| + _completeAction(); |
| + } |
| + |
| + void _onDone() { |
| + _setDone(); |
| + _flushQueue(); |
| + } |
| + |
| + // Request queue management. |
| + |
| + /// Get the next action in the queue, but don't remove it. |
| + _EventAction get _nextAction { |
| + assert(_requestQueue.isNotEmpty); |
| + return _requestQueue.first; |
| + } |
| + |
| + /// Add a new request to the queue. |
| + void _addAction(_EventAction action) { |
| + if (_isDone) { |
| + action.done(); |
| + return; |
| + } |
| + if (_requestQueue.isEmpty) { |
| + _listen(); |
| + } |
| + _requestQueue.add(action); |
| + _checkClose(); |
| + } |
| + |
| + /// Remove all requests and call their `done` event. |
| + /// |
| + /// Used when the source stream dries up. |
| + void _flushQueue() { |
| + while (_requestQueue.isNotEmpty) { |
| + _requestQueue.removeFirst().done(); |
| + } |
| + } |
| + |
| + /// Remove a completed action from the queue. |
| + /// |
| + /// An action is complete when its `data` or `error` handler |
| + /// returns true. For actions that use multiple |
| + void _completeAction() { |
| + _requestQueue.removeFirst(); |
| + if (_requestQueue.isEmpty) { |
| + _pause(); |
| + } else { |
| + _checkClose(); |
| + } |
| + } |
| + |
| + /// Check whether the only remaining action in the queue is a close action. |
| + /// |
| + /// If so, pass the subscription to the action and let it take over. |
| + void _checkClose() { |
| + // Close-actions are executed immediately when they become the |
| + // next (and last) event in the queue. |
| + // When _isClosed and the queue is not empty, the last element |
| + // of the queue is the close action. |
| + if (_isClosed && _requestQueue.length == 1) { |
| + _EventCloseAction action = _requestQueue.removeFirst(); |
| + StreamSubscription subscription = _subscription; |
| + _setDone(); |
| + action.close(subscription); |
| + } |
| + } |
| +} |
| + |
| + |
| +/// Action to take when a requested event arrives. |
| +abstract class _EventAction { |
| + bool data(data); |
| + void error(error, StackTrace stack); |
| + void done(); |
| +} |
| + |
| +/// Action to take when closing the [StreamEvents] class. |
| +abstract class _EventCloseAction implements _EventAction { |
| + void close(StreamSubscription subscription); |
| +} |
| + |
| + |
| +/// Action completing a [StreamEvents.next] request. |
| +class _NextAction implements _EventAction { |
| + Completer completer; |
| + _NextAction(this.completer); |
| + |
| + bool data(data) { |
| + completer.complete(data); |
| + return true; |
| + } |
| + |
| + void error(error, StackTrace stack) { |
| + completer.completeError(error, stack); |
| + } |
| + |
| + void done() { |
| + completer.completeError(new StateError("no elements")); |
| + } |
| +} |
| + |
| +/// Action completing a [StreamEvents.skip] request. |
| +class _SkipAction implements _EventAction { |
| + Completer completer; |
| + int count; |
| + _SkipAction(this.completer, this.count); |
| + |
| + bool data(data) { |
| + count--; |
| + if (count > 0) return false; |
| + completer.complete(count); |
| + return true; |
| + } |
| + |
| + void error(error, StackTrace stack) { |
| + completer.completeError(error, stack); |
| + } |
| + |
| + void done() { |
| + completer.complete(count); |
| + } |
| +} |
| + |
| +/// Action completing a [StreamEvents.take] request. |
| +class _TakeAction<T> implements _EventAction { |
| + final Completer completer; |
| + final int count; |
| + List list = <T>[]; |
| + _TakeAction(this.completer, this.count); |
| + |
| + bool data(data) { |
| + list.add(data); |
| + if (list.length < count) return false; |
| + completer.complete(list); |
| + return true; |
| + } |
| + |
| + void error(error, StackTrace stack) { |
| + completer.completeError(error, stack); |
| + } |
| + |
| + void done() { |
| + completer.complete(list); |
| + } |
| +} |
| + |
| +/// Action completing a [StreamEvents.close] request. |
| +class _CloseAction implements _EventCloseAction { |
| + final Completer completer; |
| + |
| + _CloseAction(this.completer); |
| + |
| + bool data(data) { |
| + throw new UnsupportedError("event"); |
| + } |
| + |
| + void error(e, StackTrace stack) { |
| + throw new UnsupportedError("event"); |
| + } |
| + |
| + void done() { |
| + completer.complete(); |
| + } |
| + |
| + void close(StreamSubscription subscription) { |
| + completer.complete(subscription.cancel()); |
| + } |
| +} |
| + |
| +/// Action completing a [StreamEvents.rest] request. |
| +class _RestAction<T> implements _EventCloseAction { |
| + final StreamCompleter delayStream; |
| + _RestAction(this.delayStream); |
| + |
| + bool data(data) { |
| + throw new UnsupportedError("event"); |
| + } |
| + |
| + void error(e, StackTrace stack) { |
| + throw new UnsupportedError("event"); |
| + } |
| + |
| + void done() { |
| + delayStream.setEmpty(); |
| + } |
| + |
| + void close(StreamSubscription subscription) { |
| + delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); |
| + } |
| +} |