Index: lib/src/stream_events.dart |
diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9cf8776b19f78310bf3a78b15e1c3d0dd72f1fd0 |
--- /dev/null |
+++ b/lib/src/stream_events.dart |
@@ -0,0 +1,450 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.streams.stream_events; |
+ |
+import 'dart:async'; |
+import 'dart:collection'; |
+ |
+import "subscription_stream.dart"; |
+import "stream_completer.dart"; |
+ |
+/// An asynchronous pull-based interface for accessing stream events. |
+/// |
+/// Wraps a stream and makes individual events available on request. |
+class StreamEvents<T> { |
ahe
2015/06/08 16:55:16
It took me quite some time to realize this is the
Lasse Reichstein Nielsen
2015/06/09 07:33:53
This is a little hard to name well.
PullStream wo
|
+ /// In the initial state, the stream has not been listened to yet. |
+ /// It will be listened to when the first event is requested. |
+ /// The `stateData` field holds the stream and the request queue is empty. |
+ static const int _INITIAL = 0; |
+ |
+ /// Listening on the stream. |
+ /// If the request queue is empty and the subscription isn't done, |
+ /// the subscription is paused. |
+ /// The `stateData` field holds the subscription. |
+ static const int _LISTENING = 1; |
+ |
+ /// The stream has completed. |
+ /// The `stateData` field is `null` and the request queue is empty. |
+ static const int _DONE = 2; |
+ |
+ /// Flag set when [close] is called. |
+ /// The `StreamEvents` is closed and no further events can be requested. |
+ /// While set, the last elmement of the request queue is an |
+ /// [_EventCloseAction]. |
+ static const int _CLOSED = 8; |
+ |
+ /// Flag set when [rest] is called. |
+ /// Only used for error reporting, otherwise equivalent to [_CLOSED]. |
+ static const int _CLOSED_REST = 12; |
+ |
+ /// Current state. |
+ /// |
+ /// Use getters below to check if the state is [_isListening] or [_isDone], |
+ /// and whether the stream events object [_isClosed]. |
+ int _state = _INITIAL; |
+ |
+ /// Value depending on state. Use getters below to get the value and assert |
+ /// the expected state. |
+ var _stateData; |
+ |
+ /// Queue of pending requests while state is [_LISTENING]. |
+ /// Access through methods below to ensure consistency. |
+ Queue<_EventAction> _requestQueue = new Queue(); |
+ |
+ StreamEvents(Stream source) : _stateData = source; |
+ |
+ bool get _isListening => (_state & _LISTENING) != 0; |
+ bool get _isClosed => (_state & _CLOSED) != 0; |
+ bool get _isDone => (_state & _DONE) != 0; |
+ |
+ /// Whether the underlying stream is spent. |
+ /// |
+ /// This may return true before all events have been delivered. |
+ /// Requesting a new event when [isDone] returns true, |
+ /// for example using [next], will always fail. |
+ bool get isDone => _isDone; |
+ |
+ /// Return the stream subscription while state is listening. |
+ StreamSubscription get _subscription { |
+ assert(_isListening); |
+ return _stateData; |
+ } |
+ |
+ /// Return the source stream while state is initial. |
+ Stream get _sourceStream { |
+ assert(!_isListening); |
+ assert(!_isDone); |
+ return _stateData; |
+ } |
+ |
+ // Set the subscription and transition to listening state. |
+ void set _subscription(StreamSubscription subscription) { |
+ assert(!_isListening); |
+ assert(!_isDone); |
+ _stateData = subscription; |
+ _state |= _LISTENING; |
+ } |
+ |
+ void _setDone() { |
+ assert(!_isDone); |
+ _state = (_state & _CLOSED_REST) | _DONE; |
+ _stateData = null; |
+ } |
+ |
+ /// Request the next (yet unrequested) event from the stream. |
+ /// |
+ /// When the requested event arrives, the returned future is completed with |
+ /// the event. This is independent of whether the event is a data event or |
+ /// an error event. |
+ /// |
+ /// If the stream closed before an event arrives, the future is completed |
+ /// with a [StateError]. |
+ Future<T> get next { |
+ if (!_isClosed) { |
+ Completer completer = new Completer<T>(); |
+ _addAction(new _NextAction(completer)); |
+ return completer.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Request a stream of all the remaning events of the source stream. |
+ /// |
+ /// All requested [next], [skip] or [take] operations are completed |
+ /// first, and then any remaining events are provided as events of |
+ /// the returned stream. |
+ /// |
+ /// Using `rest` closes the stream events object. After getting the |
+ /// `rest` it is no longer allowed to request other events, like |
+ /// after calling [close]. |
+ Stream<T> get rest { |
+ if (!_isClosed) { |
+ _state |= _CLOSED_REST; |
+ if (_isListening) { |
+ // We have an active subscription that we want to take over. |
+ var delayStream = new StreamCompleter<T>(); |
+ _addAction(new _RestAction<T>(delayStream)); |
+ return delayStream.stream; |
+ } |
+ assert(_requestQueue.isEmpty); |
+ if (isDone) { |
+ // TODO(lrn): Add Stream.empty() constructor. |
+ return new Stream<T>.fromIterable(const []); |
+ } |
+ // We have never listened the source stream, so just return that directly. |
+ Stream result = _sourceStream; |
+ _setDone(); |
+ return result; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Requests to skip the next [count] *data* events. |
+ /// |
+ /// The [count] value must be greater than zero. |
+ /// |
+ /// When successful, this is equivalent to using [take] |
+ /// and ignoring the result. |
+ /// |
+ /// If an error occurs before `count` data events has |
+ /// been skipped, the returned future completes with |
+ /// that error instead. |
+ /// |
+ /// If the stream closes before `count` data events, |
+ /// the remaining unskipped event count is returned. |
+ /// If the returned future completes with the integer `0`, |
+ /// then all events were succssfully skipped. If the value |
+ /// is greater than zero then the stream ended early. |
+ Future<int> skip(int count) { |
+ if (count <= 0) throw new RangeError.range(count, 0, null, "count"); |
+ if (!_isClosed) { |
+ Completer completer = new Completer<int>(); |
+ _addAction(new _SkipAction(completer, count)); |
+ return completer.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Requests the next [count] data events as a list. |
+ /// |
+ /// The [count] value must be greater than zero. |
+ /// |
+ /// Equivalent to calling [next] `count` times and |
+ /// storing the data values in a list. |
+ /// |
+ /// If an error occurs before `count` data events has |
+ /// been collected, the returned future completes with |
+ /// that error instead. |
+ /// |
+ /// If the stream closes before `count` data events, |
+ /// the returned future completes with the list |
+ /// of data collected so far. That is, the returned |
+ /// list may have fewer than [count] elements. |
+ Future<List<T>> take(int count) { |
+ if (count <= 0) throw new RangeError.range(count, 0, null, "count"); |
+ if (!_isClosed) { |
+ Completer completer = new Completer<List<T>>(); |
+ _addAction(new _TakeAction(completer, count)); |
+ return completer.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Release the underlying stream subscription. |
+ /// |
+ /// The close operation waits until all previously requested |
+ /// events have been processed, then it cancels the subscription |
+ /// providing the events. |
+ /// |
+ /// The returned future completes with the result of calling |
+ /// `cancel`. |
+ /// |
+ /// After calling `close`, no further events can be requested. |
+ /// None of [next], [rest], [skip], [take] or [close] may be |
+ /// called again. |
+ Future close() { |
+ if (!_isClosed) { |
+ _state |= _CLOSED; |
+ if (!_isListening) { |
+ assert(_requestQueue.isEmpty); |
+ if (!_isDone) _setDone(); |
+ return new Future.value(); |
+ } |
+ Completer completer = new Completer(); |
+ _addAction(new _CloseAction(completer)); |
+ return completer.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Reused error message. |
+ Error _failClosed() { |
+ String cause = |
+ ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; |
+ return new StateError("Already closed by a call to $cause"); |
+ } |
+ |
+ /// Called when requesting an event when the requeust queue is empty. |
+ /// The underlying subscription is paused in that case, except the very |
+ /// first time when the subscription needs to be created. |
+ void _listen() { |
+ if (_isListening) { |
+ _subscription.resume(); |
+ } else if (!_isDone) { |
+ _subscription = |
+ _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
+ } |
+ } |
+ |
+ /// Pauses the underlying subscription. |
+ /// Called when the request queue is empty and the subscription isn't closed. |
+ void _pause() { |
+ assert(_isListening); |
+ _subscription.pause(); |
+ } |
+ |
+ // Callbacks receiving the events of the source stream. |
+ void _onData(T data) { |
+ assert(_requestQueue.isNotEmpty); |
+ _EventAction action = _nextAction; |
+ if (action.data(data)) { |
+ _completeAction(); |
+ } |
+ } |
+ |
+ void _onError(error, StackTrace stack) { |
+ assert(_requestQueue.isNotEmpty); |
+ _EventAction action = _nextAction; |
+ action.error(error, stack); |
+ _completeAction(); |
+ } |
+ |
+ void _onDone() { |
+ _setDone(); |
+ _flushQueue(); |
+ } |
+ |
+ // Request queue management. |
+ |
+ /// Get the next action in the queue, but don't remove it. |
+ _EventAction get _nextAction { |
+ assert(_requestQueue.isNotEmpty); |
+ return _requestQueue.first; |
+ } |
+ |
+ /// Add a new request to the queue. |
+ void _addAction(_EventAction action) { |
+ if (_isDone) { |
+ action.done(); |
+ return; |
+ } |
+ if (_requestQueue.isEmpty) { |
+ _listen(); |
+ } |
+ _requestQueue.add(action); |
+ _checkClose(); |
+ } |
+ |
+ /// Remove all requests and call their `done` event. |
+ /// |
+ /// Used when the source stream dries up. |
+ void _flushQueue() { |
+ while (_requestQueue.isNotEmpty) { |
+ _requestQueue.removeFirst().done(); |
+ } |
+ } |
+ |
+ /// Remove a completed action from the queue. |
+ /// |
+ /// An action is complete when its `data` or `error` handler |
+ /// returns true. For actions that use multiple |
+ void _completeAction() { |
+ _requestQueue.removeFirst(); |
+ if (_requestQueue.isEmpty) { |
+ _pause(); |
+ } else { |
+ _checkClose(); |
+ } |
+ } |
+ |
+ /// Check whether the only remaining action in the queue is a close action. |
+ /// |
+ /// If so, pass the subscription to the action and let it take over. |
+ void _checkClose() { |
+ // Close-actions are executed immediately when they become the |
+ // next (and last) event in the queue. |
+ // When _isClosed and the queue is not empty, the last element |
+ // of the queue is the close action. |
+ if (_isClosed && _requestQueue.length == 1) { |
+ _EventCloseAction action = _requestQueue.removeFirst(); |
+ StreamSubscription subscription = _subscription; |
+ _setDone(); |
+ action.close(subscription); |
+ } |
+ } |
+} |
+ |
+ |
+/// Action to take when a requested event arrives. |
+abstract class _EventAction { |
+ bool data(data); |
+ void error(error, StackTrace stack); |
+ void done(); |
+} |
+ |
+/// Action to take when closing the [StreamEvents] class. |
+abstract class _EventCloseAction implements _EventAction { |
+ void close(StreamSubscription subscription); |
+} |
+ |
+ |
+/// Action completing a [StreamEvents.next] request. |
+class _NextAction implements _EventAction { |
+ Completer completer; |
+ _NextAction(this.completer); |
+ |
+ bool data(data) { |
+ completer.complete(data); |
+ return true; |
+ } |
+ |
+ void error(error, StackTrace stack) { |
+ completer.completeError(error, stack); |
+ } |
+ |
+ void done() { |
+ completer.completeError(new StateError("no elements")); |
+ } |
+} |
+ |
+/// Action completing a [StreamEvents.skip] request. |
+class _SkipAction implements _EventAction { |
+ Completer completer; |
+ int count; |
+ _SkipAction(this.completer, this.count); |
+ |
+ bool data(data) { |
+ count--; |
+ if (count > 0) return false; |
+ completer.complete(count); |
+ return true; |
+ } |
+ |
+ void error(error, StackTrace stack) { |
+ completer.completeError(error, stack); |
+ } |
+ |
+ void done() { |
+ completer.complete(count); |
+ } |
+} |
+ |
+/// Action completing a [StreamEvents.take] request. |
+class _TakeAction<T> implements _EventAction { |
+ final Completer completer; |
+ final int count; |
+ List list = <T>[]; |
+ _TakeAction(this.completer, this.count); |
+ |
+ bool data(data) { |
+ list.add(data); |
+ if (list.length < count) return false; |
+ completer.complete(list); |
+ return true; |
+ } |
+ |
+ void error(error, StackTrace stack) { |
+ completer.completeError(error, stack); |
+ } |
+ |
+ void done() { |
+ completer.complete(list); |
+ } |
+} |
+ |
+/// Action completing a [StreamEvents.close] request. |
+class _CloseAction implements _EventCloseAction { |
+ final Completer completer; |
+ |
+ _CloseAction(this.completer); |
+ |
+ bool data(data) { |
+ throw new UnsupportedError("event"); |
+ } |
+ |
+ void error(e, StackTrace stack) { |
+ throw new UnsupportedError("event"); |
+ } |
+ |
+ void done() { |
+ completer.complete(); |
+ } |
+ |
+ void close(StreamSubscription subscription) { |
+ completer.complete(subscription.cancel()); |
+ } |
+} |
+ |
+/// Action completing a [StreamEvents.rest] request. |
+class _RestAction<T> implements _EventCloseAction { |
+ final StreamCompleter delayStream; |
+ _RestAction(this.delayStream); |
+ |
+ bool data(data) { |
+ throw new UnsupportedError("event"); |
+ } |
+ |
+ void error(e, StackTrace stack) { |
+ throw new UnsupportedError("event"); |
+ } |
+ |
+ void done() { |
+ delayStream.setEmpty(); |
+ } |
+ |
+ void close(StreamSubscription subscription) { |
+ delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); |
+ } |
+} |