Index: lib/src/stream_events.dart |
diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7f6bcd6904b41c302666706f5b954c31f41a1d0e |
--- /dev/null |
+++ b/lib/src/stream_events.dart |
@@ -0,0 +1,629 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.stream_events; |
+ |
+import 'dart:async'; |
+import 'dart:collection'; |
+ |
+import "subscription_stream.dart"; |
+import "stream_completer.dart"; |
+ |
+/// An asynchronous pull-based interface for accessing stream events. |
+/// |
+/// Wraps a stream and makes individual events available on request. |
+/// |
+/// You can request (and reserve) one or more events from the stream, |
+/// and after all previous requestes have been fulfilled, stream events |
+/// go towards fulfilling your request. |
+/// |
+/// For example, if you ask for [next] two times, the returned futures |
+/// will be completed by the next two unreserved events from the stream. |
+/// |
+/// The stream subscription is paused when there are no active |
+/// requests. |
+/// |
+/// Some streams, including broadcast streams, will buffer |
+/// events while paused, so waiting too long between requests may |
+/// cause memory bloat somewhere else. |
+/// |
+/// The individual requests are served in the order they are requested, |
+/// and the stream subscription is paused when there are no active requests. |
+/// |
+/// This is similar to, but more convenient than, a [StreamIterator]. |
+/// A `StreamIterator` requires you to manually check when a new event is |
+/// available and you can only access the value of that event until you |
+/// check for the next one. A `StreamEvents` allows you to request, for example, |
+/// three events at a time, either individually, as a group using [take] |
+/// or [skip], or in any combination. |
+/// |
+/// You can also ask to have the [rest] of the stream provided as |
+/// a new stream. This allows, for example, taking the first event |
+/// out of a stream and continue using the rest of the stream as a stream. |
+/// |
+/// Example: |
+/// |
+/// var events = new StreamEvents<String>(someStreamOfLines); |
+/// var first = await events.next; |
+/// while (first.startsWith('#')) { |
+/// // Skip comments. |
+/// first = await events.next; |
+/// } |
+/// |
+/// if (first.startsWith(MAGIC_MARKER)) { |
+/// var headerCount = |
+/// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
+/// handleMessage(headers: await events.take(headerCount), |
+/// body: events.rest); |
+/// return; |
+/// } |
+/// // Error handling. |
+/// |
+/// When you need no further events the `StreamEvents` should be closed |
+/// using [cancel]. This releases the underlying stream subscription. |
+/// |
+/// The underlying stream subscription is paused when there |
+/// are no requeusts. Some subscriptions, including those of broadcast streams, |
+/// will still buffer events while paused. Creating a `StreamEvents` from |
+/// such a stream and stopping to request events, will cause memory to fill up |
+/// unnecessarily. |
+class StreamEvents<T> { |
+ /// The initial state, where the stream has not been listened to yet. |
+ /// |
+ /// It will be listened to when the first event is requested. |
+ /// The `stateData` field holds the stream and the request queue is empty. |
+ static const int _stateInitial = 0; |
+ |
+ /// Listening on a stream that hasn't completed yet. |
+ /// |
+ /// If the request queue is empty, the subscription is paused. |
+ /// The `stateData` field holds the active subscription. |
+ static const int _stateListening = 1; |
+ |
+ /// The stream has completed. |
+ /// |
+ /// The `stateData` field is `null` and the request queue is empty. |
+ static const int _stateDone = 2; |
+ |
+ /// Flag set when [cancel] is called. |
+ /// The `StreamEvents` is closed and no further events can be requested. |
+ static const int _stateClosed = 4; |
+ |
+ /// Flag combined with [_stateClosed] to say it was closed using [rest]. |
+ /// Only used for error reporting purposes. |
+ static const int _restFlag = 8; |
+ |
+ /// Flag set when [rest] is called. |
+ /// Only used for error reporting, otherwise equivalent to [_stateClosed]. |
+ static const int _stateClosedRest = _stateClosed | _restFlag; |
+ |
+ /// Current state. |
+ /// |
+ /// Use getters below to check if the state is [_isListening] or [_isDone], |
+ /// and whether the stream events object [_isClosed]. |
+ int _state = _stateInitial; |
+ |
+ /// Value depending on state. Use getters below to get the value and assert |
+ /// the expected state. |
+ var _stateData; |
+ |
+ /// Queue of pending requests while state is [_stateListening]. |
+ /// Access through methods below to ensure consistency. |
+ final Queue<_EventRequest> _requestQueue = new Queue(); |
+ |
+ StreamEvents(Stream source) : _stateData = source; |
+ |
+ /// Whether we are currently listening on a stream subscription. |
+ bool get _isListening => (_state & _stateListening) != 0; |
+ |
+ /// Whether the underlying stream is done. |
+ /// |
+ /// This may return true before all events have been delivered. |
+ /// Requesting a new event when [_isDone] returns true, |
+ /// for example using [next], will always fail. |
+ bool get _isDone => (_state & _stateDone) != 0; |
+ |
+ /// Whether the stream events has been closed. |
+ /// |
+ /// While closed, no further requests can be made. |
+ bool get _isClosed => (_state & _stateClosed) != 0; |
+ |
+ /// Returns the stream subscription while in a listening state. |
+ StreamSubscription get _subscription { |
+ assert(_isListening); |
+ return _stateData; |
+ } |
+ |
+ /// Returns the source stream while in the initial state. |
+ Stream get _sourceStream { |
+ assert(!_isListening); |
+ assert(!_isDone); |
+ return _stateData; |
+ } |
+ |
+ /// Sets the subscription and transitions to listening state. |
+ void _setListening(StreamSubscription subscription) { |
+ assert(!_isListening); |
+ assert(!_isDone); |
+ _stateData = subscription; |
+ _state |= _stateListening; |
+ } |
+ |
+ /// Transitions to the done state. |
+ /// |
+ /// The stream is done, and no further events will arrive. |
+ void _setDone() { |
+ assert(!_isDone); |
+ _state = (_state & _stateClosedRest) | _stateDone; |
+ _stateData = null; |
+ } |
+ |
+ /// Request the next (yet unrequested) event from the stream. |
+ /// |
+ /// When the requested event arrives, the returned future is completed with |
+ /// the event. |
+ /// If the event is a data event, the returned future completes |
+ /// with its value. |
+ /// If the event is an error event, the returned future completes with |
+ /// its error and stack trace. |
+ /// If the stream closes before an event arrives, the returned future |
+ /// completes with a [StateError]. |
+ /// |
+ /// It's possible to have several pending [next] calls (or other requests), |
+ /// and they will be completed in the order they were requested, by the |
+ /// first events that were not used by previous requeusts. |
+ Future<T> get next { |
+ if (!_isClosed) { |
+ _NextRequest nextRequest = new _NextRequest<T>(); |
+ _addRequest(nextRequest); |
+ return nextRequest.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Returns a stream of all the remaning events of the source stream. |
+ /// |
+ /// All requested [next], [skip] or [take] operations are completed |
+ /// first, and then any remaining events are provided as events of |
+ /// the returned stream. |
+ /// |
+ /// Using `rest` closes the stream events object. After getting the |
+ /// `rest` the caller may no longer request other events, like |
+ /// after calling [cancel]. |
+ Stream<T> get rest { |
+ if (_isClosed) { |
+ throw _failClosed(); |
+ } |
+ _state |= _stateClosedRest; |
+ if (_isListening) { |
+ // We have an active subscription that we want to take over. |
+ var request = new _RestRequest<T>(this); |
+ _addRequest(request); |
+ return request.stream; |
+ } |
+ assert(_requestQueue.isEmpty); |
+ if (_isDone) { |
+ // TODO(lrn): Add Stream.empty() constructor. |
+ return new Stream<T>.fromIterable(const []); |
+ } |
+ // We have never listened to the source stream, |
+ // so just return that directly. |
+ Stream result = _sourceStream; |
+ _setDone(); |
+ return result; |
+ } |
+ |
+ /// Skips the next [count] *data* events. |
+ /// |
+ /// The [count] must be non-negative. |
+ /// |
+ /// When successful, this is equivalent to using [take] |
+ /// and ignoring the result. |
+ /// |
+ /// If an error occurs before `count` data events have been skipped, |
+ /// the returned future completes with that error instead. |
+ /// |
+ /// If the stream closes before `count` data events, |
+ /// the remaining unskipped event count is returned. |
+ /// If the returned future completes with the integer `0`, |
+ /// then all events were succssfully skipped. If the value |
+ /// is greater than zero then the stream ended early. |
+ Future<int> skip(int count) { |
+ if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
+ if (!_isClosed) { |
+ var request = new _SkipRequest(count); |
+ _addRequest(request); |
+ return request.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Requests the next [count] data events as a list. |
+ /// |
+ /// The [count] must be non-negative. |
+ /// |
+ /// Equivalent to calling [next] `count` times and |
+ /// storing the data values in a list. |
+ /// |
+ /// If an error occurs before `count` data events has |
+ /// been collected, the returned future completes with |
+ /// that error instead. |
+ /// |
+ /// If the stream closes before `count` data events, |
+ /// the returned future completes with the list |
+ /// of data collected so far. That is, the returned |
+ /// list may have fewer than [count] elements. |
+ Future<List<T>> take(int count) { |
+ if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
+ if (!_isClosed) { |
+ var request = new _TakeRequest<T>(count); |
+ _addRequest(request); |
+ return request.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Cancels the underlying stream subscription. |
+ /// |
+ /// The cancel operation waits until all previously requested |
+ /// events have been processed, then it cancels the subscription |
+ /// providing the events. |
+ /// |
+ /// The returned future completes with the result of calling |
+ /// `cancel`. |
+ /// |
+ /// After calling `cancel`, no further events can be requested. |
+ /// None of [next], [rest], [skip], [take] or [cancel] may be |
+ /// called again. |
+ Future cancel() { |
+ if (!_isClosed) { |
+ _state |= _stateClosed; |
+ if (!_isListening) { |
+ // The request queue is only non-empty while we are listening. |
+ assert(_requestQueue.isEmpty); |
+ if (!_isDone) _setDone(); |
+ return new Future.value(); |
+ } |
+ var request = new _CancelRequest(this); |
+ _addRequest(request); |
+ return request.future; |
+ } |
+ throw _failClosed(); |
+ } |
+ |
+ /// Returns an error for when a request is made after cancel. |
+ /// |
+ /// Returns a [StateError] with a message saying that either |
+ /// [cancel] or [rest] have already been called. |
+ Error _failClosed() { |
+ String cause = |
+ ((_state & _stateClosedRest) == _stateClosedRest) ? "rest" : "cancel"; |
+ return new StateError("Already cancelled by a call to $cause"); |
+ } |
+ |
+ // Callbacks receiving the events of the source stream. |
+ |
+ void _onData(T data) { |
+ assert(_requestQueue.isNotEmpty); |
+ _EventRequest request = _nextRequest; |
+ request.add(data); |
+ _checkCompleted(); |
+ } |
+ |
+ void _onError(error, StackTrace stack) { |
+ assert(_requestQueue.isNotEmpty); |
+ _EventRequest request = _nextRequest; |
+ request.addError(error, stack); |
+ _checkCompleted(); |
+ } |
+ |
+ void _onDone() { |
+ _setDone(); |
+ _closeAllRequests(); |
+ } |
+ |
+ // Request queue management. |
+ |
+ /// Returns the next request in the queue, but don't remove it. |
+ _EventRequest get _nextRequest { |
+ return _requestQueue.first; |
+ } |
+ |
+ /// Add a new request to the queue. |
+ void _addRequest(_EventRequest request) { |
+ if (_isDone) { |
+ request.close(); |
+ return; |
+ } |
+ if (_requestQueue.isEmpty) { |
+ if (request.isComplete) { |
+ // Some requests are complete without receiving any events. |
+ // This includes [cancel] and [rest] requests, as well as |
+ // [take] and [skip] events with zero count. |
+ |
+ // We can skip listening and jsut complete the request immediately. |
+ request.close(); |
+ return; |
+ } |
+ // Continue listening on the source stream. |
+ // The source stream is paused while the requeust queue is empty, |
+ // except at the beginning when it hasn't been listened to at all. |
+ if (_isListening) { |
+ _subscription.resume(); |
+ } else if (!_isDone) { |
+ _setListening( |
+ _sourceStream.listen(_onData, onError: _onError, onDone: _onDone)); |
+ } |
+ } |
+ _requestQueue.add(request); |
+ } |
+ |
+ /// Remove all requests and close them. |
+ /// |
+ /// Used when the source stream is done. |
+ void _closeAllRequests() { |
+ assert(_isDone); |
+ while (_requestQueue.isNotEmpty) { |
+ _requestQueue.removeFirst().close(); |
+ } |
+ } |
+ |
+ /// Check whether the next requests in the queue are complete. |
+ /// |
+ /// If so, remove them and call their `close` method. |
+ void _checkCompleted() { |
+ // Close-actions are executed immediately when they become the |
+ // next (and last) event in the queue. |
+ // When _isClosed and the queue is not empty, the last element |
+ // of the queue is the close action. |
+ while (_requestQueue.isNotEmpty) { |
+ if (!_requestQueue.first.isComplete) { |
+ return; |
+ } |
+ _requestQueue.removeFirst().close(); |
+ } |
+ assert(_requestQueue.isEmpty); |
+ if (!_isDone) { |
+ // Pause the underlying subscription. |
+ // Won't get here without adding a request, so we must be listening |
+ // already. |
+ assert(_isListening); |
+ _subscription.pause(); |
+ } |
+ } |
+ |
+ /// Extracts the subscription and makes the events object unusable. |
+ /// |
+ /// Can only be used by the very last request. |
+ StreamSubscription _dispose() { |
+ assert(_isClosed); |
+ assert(_isListening); |
+ assert(_requestQueue.isEmpty); |
+ StreamSubscription subscription = _subscription; |
+ _setDone(); |
+ return subscription; |
+ } |
+} |
+ |
+/// Request object that receives events when they arrive, until fulfilled. |
+/// |
+/// Each request that cannot be fulfilled immediately is represented by |
+/// an `_EventRequest` object in the request queue. |
+/// |
+/// Events from the source stream are sent to the first request in the |
+/// queue until it reports itself as [isComplete]. |
+/// |
+/// When the first request in the queue `isComplete`, either when becoming |
+/// the first request or after receiving an event, its [close] methods is |
+/// called. |
+/// |
+/// The [close] method is also called immediately when the source stream |
+/// is done. |
+abstract class _EventRequest implements EventSink { |
+ /// Handle a data event. |
+ void add(data); |
+ |
+ /// Handle an error event. |
+ void addError(error, [StackTrace stackTrace]); |
+ |
+ /// Complete the request. |
+ /// |
+ /// This may be called either when [isComplete] returns true, |
+ /// or if the source stream is done. |
+ void close(); |
+ |
+ /// Whether the request considers itself fulfilled. |
+ /// |
+ /// This is checked whenever a request becomes the first request |
+ /// in the request queue, and after it receives an event. |
+ /// |
+ /// When a request is complete, its [close] method is called and |
+ /// it's removed from the request queue. |
+ bool get isComplete; |
+} |
+ |
+/// Request for a [StreamEvents.next] call. |
+/// |
+/// Completes the returned future when receiving the first event, |
+/// and is then complete. |
+class _NextRequest<T> implements _EventRequest { |
+ /// Completer for the future returned by [StreamEvents.next]. |
+ /// |
+ /// Set to `null` when it is completed, to mark it as already complete. |
+ final Completer _completer; |
+ |
+ _NextRequest() : _completer = new Completer<T>(); |
+ |
+ Future<T> get future => _completer.future; |
+ |
+ void add(data) { |
+ _completer.complete(data); |
+ } |
+ |
+ void addError(error, [StackTrace stack]) { |
+ _completer.completeError(error, stack); |
+ } |
+ |
+ void close() { |
+ if (!_completer.isCompleted) { |
+ _completer.completeError(new StateError("no elements")); |
+ } |
+ } |
+ |
+ bool get isComplete => _completer.isCompleted; |
+} |
+ |
+/// Request for a [StreamEvents.skip] call. |
+class _SkipRequest implements _EventRequest { |
+ /// Completer for the future returned by the skip call. |
+ final Completer _completer = new Completer<int>(); |
+ |
+ /// Number of remaining events to skip. |
+ /// |
+ /// The request [isComplete] when the values reaches zero. |
+ /// |
+ /// Decremented when an event is seen. |
+ /// Set to zero when an error is seen since errors abort the skip request. |
+ int _eventsToSkip; |
+ |
+ _SkipRequest(this._eventsToSkip); |
+ |
+ /// The future completed when the correct number of events have been skipped. |
+ Future get future => _completer.future; |
+ |
+ void add(data) { |
+ assert(_eventsToSkip > 0); |
+ _eventsToSkip--; |
+ } |
+ |
+ void addError(error, [StackTrace stackTrace]) { |
+ _eventsToSkip = 0; |
+ _completer.completeError(error, stackTrace); |
+ } |
+ |
+ void close() { |
+ if (!_completer.isCompleted) { |
+ _completer.complete(_eventsToSkip); |
+ } |
+ } |
+ |
+ bool get isComplete => _eventsToSkip == 0; |
+} |
+ |
+/// Request for a [StreamEvents.take] call. |
+class _TakeRequest<T> implements _EventRequest { |
+ /// Completer for the future returned by the take call. |
+ final Completer _completer; |
+ |
+ /// List collecting events until enough have been seen. |
+ final List _list = <T>[]; |
+ |
+ /// Number of events to capture. |
+ /// |
+ /// The request [isComplete] when the length of [_list] reaches |
+ /// this value. |
+ final int _eventsToTake; |
+ |
+ _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
+ |
+ /// The future completed when the correct number of events have been captured. |
+ Future get future => _completer.future; |
+ |
+ void add(data) { |
+ _list.add(data); |
+ } |
+ |
+ void addError(error, [StackTrace stack]) { |
+ _completer.completeError(error, stack); |
+ } |
+ |
+ void close() { |
+ if (!_completer.isCompleted) { |
+ _completer.complete(_list); |
+ } |
+ } |
+ |
+ bool get isComplete => |
+ _list.length == _eventsToTake || _completer.isCompleted; |
+} |
+ |
+/// Request for a [StreamEvents.cancel] call. |
+/// |
+/// The request is always complete, it just waits in the request queue |
+/// until all previous events are fulfilled, then it cancels the stream events |
+/// subscription. |
+class _CancelRequest implements _EventRequest { |
+ /// Completer for the future returned by the `cancel` call. |
+ final Completer _completer = new Completer(); |
+ |
+ /// The [StreamEvents] object that has this request queued. |
+ /// |
+ /// When the event is completed, it needs to cancel the active subscription |
+ /// of the `StreamEvents` object, if any. |
+ final StreamEvents _events; |
+ |
+ _CancelRequest(this._events); |
+ |
+ /// The future completed when the cancel request is completed. |
+ Future get future => _completer.future; |
+ |
+ void add(data) { |
+ assert(false); // Unreachable. |
+ } |
+ |
+ void addError(error, [StackTrace stack]) { |
+ assert(false); // Unreachable. |
+ } |
+ |
+ void close() { |
+ if (_events._isListening) { |
+ _completer.complete(_events._dispose().cancel()); |
+ } else { |
+ _completer.complete(); |
+ } |
+ } |
+ |
+ bool get isComplete => true; |
+} |
+ |
+/// Request for a [StreamEvents.rest] call. |
+/// |
+/// The request is always complete, it just waits in the request queue |
+/// until all previous events are fulfilled, then it takes over the |
+/// stream events subscription and creates a stream from it. |
+class _RestRequest<T> implements _EventRequest { |
+ /// Completer for the stream returned by the `rest` call. |
+ final StreamCompleter _completer; |
+ |
+ /// The [StreamEvents] object that has this request queued. |
+ /// |
+ /// When the event is completed, it needs to cancel the active subscription |
+ /// of the `StreamEvents` object, if any. |
+ final StreamEvents _events; |
+ _RestRequest(this._events) : _completer = new StreamCompleter<T>(); |
+ |
+ /// The future which will contain the remaining events of [_events]. |
+ Stream<T> get stream => _completer.stream; |
+ |
+ void add(data) { |
+ assert(false); // Unreachable. |
+ } |
+ |
+ void addError(error, [StackTrace stack]) { |
+ assert(false); // Unreachable. |
+ } |
+ |
+ void close() { |
+ if (_events._isListening) { |
+ StreamSubscription subscription = _events._dispose(); |
+ _completer.setSourceStream(new SubscriptionStream<T>(subscription)); |
+ if (subscription.isPaused) subscription.resume(); |
+ } else { |
+ assert(_events._isDone); |
+ _completer.setEmpty(); |
+ } |
+ } |
+ |
+ bool get isComplete => true; |
+} |