Chromium Code Reviews| Index: lib/src/stream_completer.dart |
| diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9d743a183043383c46bdcd144f7663b36c343ae1 |
| --- /dev/null |
| +++ b/lib/src/stream_completer.dart |
| @@ -0,0 +1,275 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
|
nweiz
2015/06/16 01:05:23
As part of moving a bunch of existing async utilit
Lasse Reichstein Nielsen
2015/06/16 13:05:45
True. And if I was just using a controller this co
nweiz
2015/06/16 22:34:11
Here's a (rough untested) sketch of what it would
Lasse Reichstein Nielsen
2015/06/17 11:08:29
And, to make it even more obvious that this is a g
nweiz
2015/06/17 22:58:45
Oh dang, I thought I published that CL yesterday,
|
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.streams.stream_completer; |
| + |
| +import 'dart:async'; |
| + |
| +/// A [stream] where the contents aren't known at creation time. |
| +/// |
| +/// It is generally recommended that you never create a `Future<Stream>` |
| +/// because you can just use directly create a stream that doesn't do anything |
|
nweiz
2015/06/12 01:24:23
"use directly" -> "directly"
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
|
| +/// until it's ready to do so. |
| +/// This class can be used to create such a stream. |
| +/// |
| +/// The [stream] is a normal stream that you can listen to immediately, |
| +/// but until [setSourceStream] is called, the stream won't produce |
| +/// any events. |
| +/// |
| +/// The same effect can be achieved by using a [StreamController] |
| +/// and adding the stream using `addStream` when both |
| +/// the controller's stream is listened to and the source stream is ready. |
| +/// This class attempts to shortcut some of the overhead when possible. |
| +/// For example, if the [stream] is only listened to |
| +/// after the source stream has been set, |
| +/// the listen is performed directly on the source stream. |
| +class StreamCompleter<T> { |
|
Søren Gjesse
2015/06/11 07:57:06
Maybe add a constructor with an optional onListen
nweiz
2015/06/12 01:24:23
Consider adding a constructor so that the user can
Lasse Reichstein Nielsen
2015/06/12 13:04:19
I think I'd use a completely different class for a
Lasse Reichstein Nielsen
2015/06/12 13:04:20
You are thinking the case where you don't even wan
|
| + final Stream<T> stream = new _PromiseStream<T>(); |
|
nweiz
2015/06/12 01:24:22
Nit: document this
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + |
|
nweiz
2015/06/12 01:24:23
Suggestion: add "static Stream fromFuture(Future<S
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Good idea. Done.
|
| + /// Set a stream as the source of events for the [StreamCompleter]. |
| + /// |
| + /// There is no guarantee that the stream will ever be listened to. |
|
nweiz
2015/06/12 01:24:23
It would be nice to go further and fully explain h
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
|
| + void setSourceStream(Stream<T> stream) { |
| + _PromiseStream promiseStream = this.stream; |
| + promiseStream._linkStream(stream); |
| + } |
| + |
| + /// As setting an empty stream using [setSourceStream]. |
| + void setEmpty() { |
|
nweiz
2015/06/12 01:24:23
Document that this is mutually exclusive with [set
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
|
| + // TODO(lrn): Optimize this to not actually create the empty stream. |
| + _PromiseStream promiseStream = this.stream; |
| + promiseStream._linkStream(new Stream.fromIterable(const [])); |
| + } |
| +} |
| + |
| +/// Stream that acts as a source stream it is (eventually) linked with. |
| +/// |
| +/// The linked source stream can be set after a user has started listening on |
| +/// this stream. No events occur before the source stream is provided. |
| +/// |
| +/// If a user listens before events are available, the state of the |
| +/// subscription is maintained, and the subscription is then linked |
| +/// to the source stream when that becomes available. |
| +class _PromiseStream<T> extends Stream<T> { |
|
nweiz
2015/06/12 01:24:23
Even though it's not public, it would be nice to h
Lasse Reichstein Nielsen
2015/06/12 13:04:19
True. I renamed the public class at some point, so
|
| + static const int _UNINITIALIZED = 0; |
| + static const int _STREAM_SET = 1; |
| + static const int _LISTENED = 2; |
|
nweiz
2015/06/12 01:24:22
Document the semantics of each of these variables.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done
|
| + |
| + int _state = _UNINITIALIZED; |
|
nweiz
2015/06/12 01:24:23
Document that this is a bitfield. At first glance,
Lasse Reichstein Nielsen
2015/06/12 13:04:20
This is no longer used as a bit field. I have cons
|
| + var _streamOrSubscription; |
|
nweiz
2015/06/12 01:24:22
This really seems like it should be two separate f
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I think the way I have (now) encapsulated it shoul
nweiz
2015/06/16 01:05:23
It's definitely better, but it's still a lot of ex
Lasse Reichstein Nielsen
2015/06/16 13:05:44
True. I think I'll try rewriting it without the st
|
| + |
| + void _linkStream(Stream stream) { |
|
nweiz
2015/06/12 01:24:23
It would be really nice to have all these private
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Done.
|
| + if (_state == _UNINITIALIZED) { |
| + _streamOrSubscription = stream; |
| + _state |= _STREAM_SET; |
| + } else if (_state == _LISTENED) { |
| + _PromiseSubscription promiseSubscription = _streamOrSubscription; |
| + promiseSubscription._linkStream(stream); |
| + _state |= _STREAM_SET; |
| + } else { |
| + throw new StateError("Stream already linked."); |
| + } |
| + } |
| + |
| + StreamSubscription listen(void onData(T event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + int state = _state; |
|
nweiz
2015/06/12 01:24:22
Nit: don't type-annotate local variables. There ar
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Yeah, about that.
I try to use more "var", but I
nweiz
2015/06/16 01:05:22
As Bob likes to say, code is for people to read it
Lasse Reichstein Nielsen
2015/06/16 13:05:45
True.
It's mainly for myself. I like to see the t
|
| + _state = state | _LISTENED; |
| + if (state == _UNINITIALIZED) { |
|
Søren Gjesse
2015/06/11 07:57:05
Why not just check (_state & _STREAM_SET) == 0? Th
Lasse Reichstein Nielsen
2015/06/12 13:04:19
I still need to read the state before setting it,
|
| + _PromiseSubscription subscription = |
| + new _PromiseSubscription<T>(true == cancelOnError); |
|
nweiz
2015/06/12 01:24:23
"true == cancelOnError" is really hard to read. It
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Now forwards the parameter all the way to the fina
|
| + subscription.onData(onData); |
| + subscription.onError(onError); |
| + subscription.onDone(onDone); |
| + _streamOrSubscription = subscription; |
| + return subscription; |
| + } |
| + if (state == _STREAM_SET) { |
| + Stream stream = _streamOrSubscription; |
| + return stream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| + } |
| + print(state); |
|
Søren Gjesse
2015/06/11 07:57:06
Debug print.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Acknowledged.
|
| + throw new StateError("Stream has already been listened to."); |
| + } |
| +} |
| + |
| +/// Subscription for a [_PromiseStream] that is listened to but has no data. |
|
nweiz
2015/06/12 01:24:23
"has no data" is a little misleading, since this w
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I have reworded it. It's a little constrained by w
|
| +/// |
| +/// Maintains the state of a stream subscription that hasn't received any |
| +/// events until events are available, then it starts forwarding to another |
|
nweiz
2015/06/12 01:24:22
"events are available" has the same issue as above
Lasse Reichstein Nielsen
2015/06/12 13:04:20
reworded.
|
| +/// subscription. |
| +class _PromiseSubscription<T> implements StreamSubscription<T> { |
| + // State values |
| + // The subscription is in one of three distinct states: |
| + // - Initial (remembers whether it's cancelOnError and paused). |
| + // - Cancelled (before being linked). |
| + // - Linked (before being cancelled). |
| + static const int _INIT = 0; |
| + static const int _INIT_CANCEL_ON_ERROR = 1; |
| + static const int _CANCELLED = 2; // Exclusive. |
| + static const int _LINKED = 3; // Exclusive |
| + static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. |
|
nweiz
2015/06/12 01:24:23
Similarly to the above, it would be a lot clearer
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I have moved the entire state into a separate obje
|
| + |
| + /// State represents the status of the subscription until the |
| + /// real subscription becomes available. |
| + /// |
| + /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains |
| + /// a list of length three with the data, error and done handlers that |
| + /// have been set. |
| + /// |
| + /// While `_state` is [_LINKED], [_stateData] contains the real |
| + /// stream subscription. |
| + /// |
| + /// When `_state` is `_CANCELED`, `_stateData` is cleared since the |
| + /// event handlers won't be needed anyway. |
| + int _state; |
| + var _stateData = new List(3); |
|
Søren Gjesse
2015/06/11 07:57:05
Why use a list with three members instead of three
nweiz
2015/06/12 01:24:23
+1
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Space! Saving of!
I started writing it as a real o
|
| + |
| + _PromiseSubscription(bool cancelOnError) |
| + : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT); |
| + |
| + bool get _isLinked => _state == _LINKED; |
|
Søren Gjesse
2015/06/11 07:57:06
If you relay want to use a bit-field why not ave g
Lasse Reichstein Nielsen
2015/06/12 13:04:21
It's not always a bit field.
It's really a combo:
|
| + bool get _isCancelled => _state == _CANCELLED; |
| + bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR; |
| + |
| + void _linkStream(Stream stream) { |
| + if (_isLinked) { |
| + throw new StateError("Already linked to a stream."); |
| + } |
| + if (_isCancelled) { |
|
Søren Gjesse
2015/06/11 07:57:06
Shouldn't we listen on and cancel the stream in th
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Probably not.
For one thing, it's unnecessary ove
nweiz
2015/06/16 01:05:23
I agree with Søren that the default should be to s
Lasse Reichstein Nielsen
2015/06/16 13:05:45
Ok. The only problem with that is that listen+canc
|
| + return; |
| + } |
| + bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0; |
| + StreamSubscription subscription = |
| + stream.listen(null, cancelOnError: cancelOnError); |
| + List handlers = _stateData; |
|
Søren Gjesse
2015/06/11 07:57:05
Why this 'cast' - documentation?
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Because I like it. It's not obvious what the type
|
| + subscription.onData(handlers[0]); |
| + subscription.onError(handlers[1]); |
| + subscription.onDone(handlers[2]); |
| + int state = _state; |
| + _subscription = subscription; |
| + while (state >= _PAUSE) { |
| + subscription.pause(); |
| + state -= _PAUSE; |
|
nweiz
2015/06/12 01:24:22
I have no idea what's going on here :-/. If you're
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Rewritten completely (twice), and there is no fanc
|
| + } |
| + } |
| + |
| + List get _handlers { |
| + assert(_isInitial); |
| + return _stateData; |
| + } |
| + |
| + StreamSubscription get _subscription { |
| + assert(_isLinked); |
| + return _stateData; |
| + } |
| + |
| + // Sets state to linked. |
| + void set _subscription(StreamSubscription subscription) { |
|
nweiz
2015/06/12 01:24:22
Since this is only called once, I think it would b
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Now a function named setListened that does the sta
|
| + assert(_isInitial); |
| + _stateData = subscription; |
| + _state = _LINKED; |
| + } |
| + |
| + void onData(void handleData(T data)) { |
| + if (_isLinked) { |
| + _subscription.onData(handleData); |
| + } else { |
| + assert(_isInitial); |
| + _handlers[0] = handleData; |
| + } |
| + } |
| + |
| + void onError(void handleError(error, StackTrace stackTrace)) { |
| + if (_isLinked) { |
| + _subscription.onError(handleError); |
| + } else { |
| + assert(_isInitial); |
| + _handlers[1] = handleError; |
| + } |
| + } |
| + |
| + void onDone(void handleDone()) { |
| + if (_isLinked) { |
| + _subscription.onDone(handleDone); |
| + } else { |
| + assert(_isInitial); |
| + _handlers[2] = handleDone; |
| + } |
| + } |
| + |
| + void pause([Future resumeFuture]) { |
| + if (_isLinked) { |
| + _subscription.pause(resumeFuture); |
| + } else if (!_isCancelled) { |
|
Søren Gjesse
2015/06/11 07:57:05
assert that it is not already paused (or use |).
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Pauses are cumulative, it can be paused more than
|
| + _state += _PAUSE; |
| + if (resumeFuture != null) { |
| + resumeFuture.whenComplete(this.resume); |
| + } |
| + } |
| + } |
| + |
| + void resume() { |
| + if (_isLinked) { |
| + _subscription.resume(); |
| + } else if (_state >= _PAUSE) { |
|
Søren Gjesse
2015/06/11 07:57:05
Just check the bit instead of using >=.
Lasse Reichstein Nielsen
2015/06/12 13:04:20
It can be paused more than once, so the bit will o
|
| + _state -= _PAUSE; |
| + } |
| + } |
| + |
| + Future cancel() { |
|
Søren Gjesse
2015/06/11 07:57:06
What is the semantics of calling cancel several ti
Lasse Reichstein Nielsen
2015/06/12 13:04:20
I'm not sure it's specified, but that's what we us
|
| + if (_isLinked) { |
| + return _subscription.cancel(); |
| + } else { |
| + _stateData = null; |
| + _state = _CANCELLED; |
| + return new Future.value(); |
| + } |
| + } |
| + |
| + Future asFuture([futureValue]) { |
| + if (_isLinked) { |
| + return _subscription.asFuture(futureValue); |
| + } |
| + Completer completer = new Completer(); |
| + if (!_isCancelled) { |
|
nweiz
2015/06/12 01:24:23
Nit: Short-circuit if it is cancelled to save some
Lasse Reichstein Nielsen
2015/06/12 13:04:20
Rewritten.
|
| + // Asking for a future of a cancelled subscription gives a future |
| + // which never completes. |
| + _handlers[1] = _cancelBeforeError(completer.completeError); |
| + if (futureValue == null) { |
| + _handlers[2] = completer.complete; |
| + } else { |
| + _handlers[2] = () { completer.complete(futureValue); }; |
|
nweiz
2015/06/12 01:24:23
Nit: Either use "=>" or make this multiple lines.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
I skipped the special casing of null completely (a
|
| + } |
| + } |
| + return completer.future; |
| + } |
| + |
| + bool get isPaused { |
| + if (_isLinked) { |
| + return _subscription.isPaused; |
| + } |
| + return _state >= _PAUSE; |
| + } |
| + |
| + /// Helper function used by [asFuture]. |
| + /// |
| + /// Returns an error handler which cancels the stream when it receives an |
| + /// error. |
| + Function _cancelBeforeError(Function handleError) { |
| + return (e, s) { |
|
nweiz
2015/06/12 01:24:23
Nit: use full words for variables.
Lasse Reichstein Nielsen
2015/06/12 13:04:21
Done.
|
| + cancel(); |
| + if (handleError is _BinaryCallback) { |
| + handleError(e, s); |
| + } else { |
| + handleError(e); |
| + } |
| + }; |
| + } |
| +} |
| + |
| +typedef _BinaryCallback(a, b); |
|
Søren Gjesse
2015/06/11 07:57:06
We don't have this typedef publid in and dart: lib
Lasse Reichstein Nielsen
2015/06/12 13:04:19
We do: ZoneBinaryCallback in dart:async.
Also not
|