Chromium Code Reviews| Index: lib/src/stream_events.dart |
| diff --git a/lib/src/stream_events.dart b/lib/src/stream_events.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..848f6318511c9ecff5cf4de8e0b38a815536316f |
| --- /dev/null |
| +++ b/lib/src/stream_events.dart |
| @@ -0,0 +1,490 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.streams.stream_events; |
| + |
| +import 'dart:async'; |
| +import 'dart:collection'; |
| + |
| +import "subscription_stream.dart"; |
| +import "stream_completer.dart"; |
| + |
| +/// An asynchronous pull-based interface for accessing stream events. |
| +/// |
| +/// Wraps a stream and makes individual events available on request. |
| +/// |
| +/// The individual requests are served in the order they are requested. |
|
nweiz
2015/06/12 01:24:25
Explain how this is different from a [StreamIterat
Lasse Reichstein Nielsen
2015/06/17 11:08:29
Done.
|
| +/// |
| +/// Example: |
| +/// |
| +/// var events = new StreamEvents<String>(someStreamOfLines); |
| +/// var first = await events.next; |
| +/// while (first.startsWith('#')) { |
|
Søren Gjesse
2015/06/11 07:57:06
Maybe add an isDone check in this sample.
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Hmm. Not good for the flow, but I guess I can try
|
| +/// // Skip comments. |
| +/// first = await events.next; |
| +/// } |
| +/// |
| +/// if (first.startsWith(MAGIC_MARKER)) { |
| +/// var headerCount = |
| +/// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
| +/// handleMessage(headers: await events.take(headerCount), |
| +/// body: events.rest); |
| +/// return; |
| +/// } |
| +/// // Error handling. |
| +/// |
| +class StreamEvents<T> { |
|
nweiz
2015/06/12 01:24:26
Plural class names always seem weird to me. What d
Lasse Reichstein Nielsen
2015/06/12 13:04:22
It's not a stream, so "PullStream" won't work.
The
nweiz
2015/06/16 01:05:23
Bob suggested "StreamPump", using the water analog
Lasse Reichstein Nielsen
2015/06/16 13:05:45
I'm not a great fan of analogies in nameing - it o
nweiz
2015/06/16 22:34:11
All names in programming are analogies at some lev
Lasse Reichstein Nielsen
2015/06/17 11:08:29
I'm trying out StreamQueue now. It is slightly irk
|
| + /// In the initial state, the stream has not been listened to yet. |
|
nweiz
2015/06/12 01:24:25
Nit: The first paragraph of a doc comment should b
|
| + /// It will be listened to when the first event is requested. |
| + /// The `stateData` field holds the stream and the request queue is empty. |
| + static const int _INITIAL = 0; |
|
nweiz
2015/06/12 01:24:25
Nit: since these fields aren't right up next to on
Lasse Reichstein Nielsen
2015/06/12 13:04:22
In my editor it still works very well to show the
|
| + |
| + /// Listening on the stream. |
| + /// If the request queue is empty and the subscription isn't done, |
| + /// the subscription is paused. |
| + /// The `stateData` field holds the subscription. |
| + static const int _LISTENING = 1; |
| + |
| + /// The stream has completed. |
| + /// The `stateData` field is `null` and the request queue is empty. |
| + static const int _DONE = 2; |
| + |
|
Søren Gjesse
2015/06/11 07:57:06
Maybe put an additional comment that 4 is reserved
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Will do.
|
| + /// Flag set when [close] is called. |
| + /// The `StreamEvents` is closed and no further events can be requested. |
| + /// The last request in the queue, if any, |
| + /// is completed and is witing to clean up. |
|
nweiz
2015/06/12 01:24:24
"witing" -> "waiting"
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + static const int _CLOSED = 8; |
| + |
| + /// Flag set when [rest] is called. |
| + /// Only used for error reporting, otherwise equivalent to [_CLOSED]. |
| + static const int _CLOSED_REST = 12; |
| + |
| + /// Current state. |
| + /// |
| + /// Use getters below to check if the state is [_isListening] or [_isDone], |
| + /// and whether the stream events object [_isClosed]. |
| + int _state = _INITIAL; |
| + |
| + /// Value depending on state. Use getters below to get the value and assert |
| + /// the expected state. |
| + var _stateData; |
| + |
| + /// Queue of pending requests while state is [_LISTENING]. |
| + /// Access through methods below to ensure consistency. |
| + Queue<_EventRequest> _requestQueue = new Queue(); |
|
nweiz
2015/06/12 01:24:25
Make this final.
Personally, I prefer to omit the
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Made final.
I prefer type annotations on all field
|
| + |
| + StreamEvents(Stream source) : _stateData = source; |
| + |
| + bool get _isListening => (_state & _LISTENING) != 0; |
| + bool get _isClosed => (_state & _CLOSED) != 0; |
| + bool get _isDone => (_state & _DONE) != 0; |
| + |
| + /// Whether the underlying stream is spent. |
|
nweiz
2015/06/12 01:24:24
"spent" -> "done"
Just to use consistent terminol
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + /// |
| + /// This may return true before all events have been delivered. |
| + /// Requesting a new event when [isDone] returns true, |
| + /// for example using [next], will always fail. |
| + bool get isDone => _isDone; |
|
nweiz
2015/06/12 01:24:25
What's the use case for exposing this to the user?
Lasse Reichstein Nielsen
2015/06/12 13:04:23
It might be a little speculative because it doesn'
nweiz
2015/06/16 01:05:23
The version of this I wrote for ScheduledTest has
Lasse Reichstein Nielsen
2015/06/16 13:05:45
It doesn't solve the problem of what to do with th
nweiz
2015/06/16 22:34:11
It's unlikely that someone will call [hasNext] fol
|
| + |
| + /// Return the stream subscription while state is listening. |
|
nweiz
2015/06/12 01:24:25
Nit: use present tense (e.g. "Returns" rather than
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + StreamSubscription get _subscription { |
| + assert(_isListening); |
| + return _stateData; |
| + } |
| + |
| + /// Return the source stream while state is initial. |
| + Stream get _sourceStream { |
| + assert(!_isListening); |
| + assert(!_isDone); |
| + return _stateData; |
| + } |
| + |
| + // Set the subscription and transition to listening state. |
| + void set _subscription(StreamSubscription subscription) { |
| + assert(!_isListening); |
| + assert(!_isDone); |
| + _stateData = subscription; |
| + _state |= _LISTENING; |
| + } |
| + |
| + void _setDone() { |
| + assert(!_isDone); |
| + _state = (_state & _CLOSED_REST) | _DONE; |
| + _stateData = null; |
| + } |
| + |
| + /// Request the next (yet unrequested) event from the stream. |
|
nweiz
2015/06/12 01:24:26
Clarify in this documentation that it's valid to h
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + /// |
| + /// When the requested event arrives, the returned future is completed with |
| + /// the event. This is independent of whether the event is a data event or |
| + /// an error event. |
|
nweiz
2015/06/12 01:24:26
I'd write this as: "If the event is a value event,
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + /// |
| + /// If the stream closed before an event arrives, the future is completed |
|
nweiz
2015/06/12 01:24:24
"closed" -> "closes", "is completed" -> "completes
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + /// with a [StateError]. |
| + Future<T> get next { |
| + if (!_isClosed) { |
| + Completer completer = new Completer<T>(); |
| + _addRequest(new _NextRequest(completer)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Request a stream of all the remaning events of the source stream. |
|
nweiz
2015/06/12 01:24:25
"Request" -> "Returns"
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + /// |
| + /// All requested [next], [skip] or [take] operations are completed |
| + /// first, and then any remaining events are provided as events of |
| + /// the returned stream. |
| + /// |
| + /// Using `rest` closes the stream events object. After getting the |
| + /// `rest` it is no longer allowed to request other events, like |
|
nweiz
2015/06/12 01:24:25
"it is no longer allowed to" -> "the caller may no
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + /// after calling [close]. |
| + Stream<T> get rest { |
| + if (!_isClosed) { |
|
nweiz
2015/06/12 01:24:25
Nit: Short-circuit if it's closed to save on inden
Lasse Reichstein Nielsen
2015/06/12 13:04:22
ACK.
This is not going to be in any inner loops, s
|
| + _state |= _CLOSED_REST; |
| + if (_isListening) { |
| + // We have an active subscription that we want to take over. |
| + var delayStream = new StreamCompleter<T>(); |
| + _addRequest(new _RestRequest<T>(delayStream, this)); |
| + return delayStream.stream; |
| + } |
| + assert(_requestQueue.isEmpty); |
| + if (isDone) { |
| + // TODO(lrn): Add Stream.empty() constructor. |
| + return new Stream<T>.fromIterable(const []); |
| + } |
| + // We have never listened the source stream, so just return that directly. |
|
nweiz
2015/06/12 01:24:26
"listened to"
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + Stream result = _sourceStream; |
| + _setDone(); |
| + return result; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Requests to skip the next [count] *data* events. |
|
nweiz
2015/06/12 01:24:24
"Requests to skip" -> "Skips"
|
| + /// |
| + /// The [count] value must be non-negative. |
| + /// |
| + /// When successful, this is equivalent to using [take] |
| + /// and ignoring the result. |
| + /// |
| + /// If an error occurs before `count` data events has |
|
nweiz
2015/06/12 01:24:25
"has" -> "have"
|
| + /// been skipped, the returned future completes with |
| + /// that error instead. |
| + /// |
| + /// If the stream closes before `count` data events, |
| + /// the remaining unskipped event count is returned. |
| + /// If the returned future completes with the integer `0`, |
| + /// then all events were succssfully skipped. If the value |
| + /// is greater than zero then the stream ended early. |
| + Future<int> skip(int count) { |
| + if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| + if (!_isClosed) { |
| + Completer completer = new Completer<int>(); |
| + _addRequest(new _SkipRequest(completer, count)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Requests the next [count] data events as a list. |
| + /// |
| + /// The [count] value must be non-negative. |
|
nweiz
2015/06/12 01:24:25
"The [count] value" -> "[count]"
Lasse Reichstein Nielsen
2015/06/12 13:04:21
-> "The [count]".
Otherwise it would start the sen
|
| + /// |
| + /// Equivalent to calling [next] `count` times and |
| + /// storing the data values in a list. |
| + /// |
| + /// If an error occurs before `count` data events has |
| + /// been collected, the returned future completes with |
| + /// that error instead. |
| + /// |
| + /// If the stream closes before `count` data events, |
| + /// the returned future completes with the list |
| + /// of data collected so far. That is, the returned |
| + /// list may have fewer than [count] elements. |
| + Future<List<T>> take(int count) { |
| + if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| + if (!_isClosed) { |
| + Completer completer = new Completer<List<T>>(); |
| + _addRequest(new _TakeRequest(completer, count)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Release the underlying stream subscription. |
|
nweiz
2015/06/12 01:24:26
"Release" -> "Cancels"
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + /// |
| + /// The close operation waits until all previously requested |
| + /// events have been processed, then it cancels the subscription |
| + /// providing the events. |
| + /// |
| + /// The returned future completes with the result of calling |
| + /// `cancel`. |
| + /// |
| + /// After calling `close`, no further events can be requested. |
| + /// None of [next], [rest], [skip], [take] or [close] may be |
| + /// called again. |
| + Future close() { |
|
nweiz
2015/06/12 01:24:24
Consider calling this [cancel], to match [StreamSu
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Good idea!
|
| + if (!_isClosed) { |
| + _state |= _CLOSED; |
| + if (!_isListening) { |
| + assert(_requestQueue.isEmpty); |
|
nweiz
2015/06/12 01:24:24
Add a comment explaining why the request queue wil
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + if (!_isDone) _setDone(); |
| + return new Future.value(); |
| + } |
| + Completer completer = new Completer(); |
| + _addRequest(new _CloseRequest(completer, this)); |
| + return completer.future; |
| + } |
| + throw _failClosed(); |
|
nweiz
2015/06/12 01:24:26
Everything in the core libraries that's closable a
Lasse Reichstein Nielsen
2015/06/12 13:04:22
True.
One reason why I didn't allow that was that
|
| + } |
| + |
| + /// Reused error message. |
|
nweiz
2015/06/12 01:24:24
Expand on this.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + Error _failClosed() { |
| + String cause = |
| + ((_state & _CLOSED_REST) == _CLOSED_REST) ? "rest" : "close"; |
| + return new StateError("Already closed by a call to $cause"); |
| + } |
| + |
| + /// Called when requesting an event when the requeust queue is empty. |
| + /// The underlying subscription is paused in that case, except the very |
| + /// first time when the subscription needs to be created. |
| + void _listen() { |
|
nweiz
2015/06/12 01:24:24
Methods like this and [_pause] that are only calle
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Done.
|
| + if (_isListening) { |
| + _subscription.resume(); |
| + } else if (!_isDone) { |
| + _subscription = |
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| + } |
| + } |
| + |
| + /// Pauses the underlying subscription. |
| + /// Called when the request queue is empty and the subscription isn't closed. |
| + void _pause() { |
| + assert(_isListening); |
| + _subscription.pause(); |
| + } |
| + |
| + // Callbacks receiving the events of the source stream. |
|
nweiz
2015/06/12 01:24:25
Nit: If this is referring to a group of methods, a
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + void _onData(T data) { |
| + assert(_requestQueue.isNotEmpty); |
| + _EventRequest action = _nextAction; |
| + action.add(data); |
| + _checkCompleted(); |
| + } |
| + |
| + void _onError(error, StackTrace stack) { |
| + assert(_requestQueue.isNotEmpty); |
| + _EventRequest action = _nextAction; |
| + action.addError(error, stack); |
| + _checkCompleted(); |
| + } |
| + |
| + void _onDone() { |
| + _setDone(); |
| + _closeAllRequests(); |
| + } |
| + |
| + // Request queue management. |
| + |
| + /// Get the next action in the queue, but don't remove it. |
| + _EventRequest get _nextAction { |
| + assert(_requestQueue.isNotEmpty); |
|
nweiz
2015/06/12 01:24:24
This assertion seems redundant, since the line bel
Lasse Reichstein Nielsen
2015/06/12 13:04:22
True.
|
| + return _requestQueue.first; |
| + } |
| + |
| + /// Add a new request to the queue. |
| + void _addRequest(_EventRequest action) { |
| + if (_isDone) { |
| + action.close(); |
| + return; |
| + } |
| + if (_requestQueue.isEmpty) { |
| + if (action.isComplete) { |
|
nweiz
2015/06/12 01:24:24
Explain under what circumstances an action that wa
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Done.
|
| + // Skip listening and complete this immediately. |
| + action.close(); |
| + return; |
| + } |
| + _listen(); |
| + } |
| + _requestQueue.add(action); |
| + } |
| + |
| + /// Remove all requests and call their `done` event. |
| + /// |
| + /// Used when the source stream dries up. |
| + void _closeAllRequests() { |
| + assert(_isDone); |
| + while (_requestQueue.isNotEmpty) { |
| + _requestQueue.removeFirst().close(); |
| + } |
| + } |
| + |
| + /// Check whether the next actions in the queue are complete. |
| + /// |
| + /// If so, remove them and call their `complete` method. |
| + void _checkCompleted() { |
| + // Close-actions are executed immediately when they become the |
| + // next (and last) event in the queue. |
| + // When _isClosed and the queue is not empty, the last element |
| + // of the queue is the close action. |
| + while (_requestQueue.isNotEmpty) { |
| + if (!_requestQueue.first.isComplete) { |
| + return; |
| + } |
| + _requestQueue.removeFirst().close(); |
| + } |
| + assert(_requestQueue.isEmpty); |
| + if (!_isDone) _pause(); |
| + } |
| + |
| + /// Extracts the subscription and makes the events object unusable. |
| + /// |
| + /// Can only be used by the very last request. |
| + StreamSubscription _dispose() { |
| + assert(_isClosed); |
| + assert(_isListening); |
| + assert(_requestQueue.isEmpty); |
| + StreamSubscription subscription = _subscription; |
| + _setDone(); |
| + return subscription; |
| + } |
| +} |
| + |
|
nweiz
2015/06/12 01:24:25
Nit: extra newline.
|
| + |
| +/// Action to take when a requested event arrives. |
| +abstract class _EventRequest implements EventSink { |
|
nweiz
2015/06/12 01:24:25
It's kind of weird that the documentation for thes
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Agree, I switched to "request" halfway through, an
|
| + bool get isComplete; |
|
Søren Gjesse
2015/06/11 07:57:06
Add close method here as well
I think the 'close'
nweiz
2015/06/12 01:24:25
This should also have [add] and [addError] abstrac
Lasse Reichstein Nielsen
2015/06/12 13:04:22
It's inherited from EventSink.
I picked EventSink
|
| +} |
| + |
| +/// Action completing a [StreamEvents.next] request. |
| +class _NextRequest implements _EventRequest { |
| + Completer completer; |
|
nweiz
2015/06/12 01:24:25
Even though it doesn't do anything at the language
|
| + _NextRequest(this.completer); |
|
nweiz
2015/06/12 01:24:24
Rather than passing in a completer, consider makin
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Good idea. Will do.
The type parameter needs to go
|
| + |
| + void add(data) { |
| + completer.complete(data); |
| + completer = null; |
|
nweiz
2015/06/12 01:24:24
Does nulling this out buy you anything? It seems l
Lasse Reichstein Nielsen
2015/06/12 13:04:23
We have an isCompleted getter? Whoa!
I'm too used
|
| + } |
| + |
| + void addError(error, [StackTrace stack]) { |
| + completer.completeError(error, stack); |
| + completer = null; |
| + } |
| + |
| + void close() { |
| + if (!isComplete) { |
| + completer.completeError(new StateError("no elements")); |
|
nweiz
2015/06/12 01:24:25
If you do continue to check against null, null out
Lasse Reichstein Nielsen
2015/06/12 13:04:22
Technically not necessary since "close" is always
|
| + } |
| + } |
| + |
| + bool get isComplete => completer == null; |
| +} |
| + |
| +/// Action completing a [StreamEvents.skip] request. |
| +class _SkipRequest implements _EventRequest { |
| + final Completer completer; |
| + int count; |
|
nweiz
2015/06/12 01:24:24
Document this. Also consider calling it something
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
|
| + _SkipRequest(this.completer, this.count); |
| + |
| + void add(data) { |
| + count--; |
| + } |
| + |
| + void addError(error, [StackTrace stack]) { |
| + completer.completeError(error, stack); |
| + count = 0; |
| + } |
| + |
| + void close() { |
| + if (!completer.isCompleted) { |
| + completer.complete(count); |
| + count = 0; |
| + } |
| + } |
| + |
| + bool get isComplete { |
| + return count == 0; |
| + } |
| +} |
| + |
| +/// Action completing a [StreamEvents.take] request. |
| +class _TakeRequest<T> implements _EventRequest { |
| + final Completer completer; |
| + final List list = <T>[]; |
| + int count; |
| + _TakeRequest(this.completer, this.count); |
| + |
| + void add(data) { |
| + list.add(data); |
| + count--; |
| + } |
| + |
| + void addError(error, [StackTrace stack]) { |
| + completer.completeError(error, stack); |
| + count = 0; |
| + } |
| + |
| + void close() { |
| + if (!completer.isCompleted) { |
| + completer.complete(list); |
| + } |
| + } |
| + |
| + bool get isComplete => count == 0; |
| +} |
| + |
| +/// Action completing a [StreamEvents.close] request. |
| +class _CloseRequest implements _EventRequest { |
| + final Completer completer; |
| + StreamEvents events; |
| + |
| + _CloseRequest(this.completer, this.events); |
| + |
| + void add(data) { |
| + throw new UnsupportedError("event"); |
| + } |
| + |
| + void addError(error, [StackTrace stack]) { |
| + throw new UnsupportedError("event"); |
|
nweiz
2015/06/12 01:24:24
Shouldn't this be "error"?
Lasse Reichstein Nielsen
2015/06/12 13:04:22
No, we don't support receiving events, that's the
|
| + } |
| + |
| + void close() { |
| + if (events._isListening) { |
| + completer.complete(events._dispose().cancel()); |
| + } else { |
| + completer.complete(); |
| + } |
| + } |
| + |
| + bool get isComplete => true; |
| +} |
| + |
| +/// Action completing a [StreamEvents.rest] request. |
| +class _RestRequest<T> implements _EventRequest { |
| + final StreamCompleter delayStream; |
| + final StreamEvents events; |
| + _RestRequest(this.delayStream, this.events); |
| + |
| + void add(data) { |
| + throw new UnsupportedError("event"); |
| + } |
| + |
| + void addError(error, [StackTrace stack]) { |
| + throw new UnsupportedError("event"); |
|
nweiz
2015/06/12 01:24:26
Ditto.
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
|
| + } |
| + |
| + void close() { |
| + if (events._isListening) { |
| + StreamSubscription subscription = events._dispose(); |
| + delayStream.setSourceStream(new SubscriptionStream<T>(subscription)); |
| + if (subscription.isPaused) subscription.resume(); |
| + } else { |
| + assert(events._isDone); |
| + delayStream.setEmpty(); |
| + } |
| + } |
| + |
| + bool get isComplete => true; |
| +} |