|
|
Created:
5 years, 6 months ago by Lasse Reichstein Nielsen Modified:
5 years, 5 months ago CC:
reviews_dartlang.org, ahe, floitsch Base URL:
https://github.com/dart-lang/async@master Target Ref:
refs/heads/master Visibility:
Public. |
DescriptionAdd new features to package:async.
- `DelegatingStreamSubscription`: Wrap a subscription and maybe override some methods.
- `SubscriptionStream`: Rewrap a stream subscription as a single-subscription stream.
- `StreamCompleter`: Utility to help when avoiding returning a `Future<Stream>`.
- `StreamEvents`: A pull interface for reading the events of a stream.
R=nweiz@google.com, sgjesse@google.com
Committed: https://github.com/dart-lang/async/commit/dc58de5505ed02563370f03f7f17377f2bfc9e98
Patch Set 1 #
Total comments: 4
Patch Set 2 : Allow skip/take with 0 as argument. #Patch Set 3 : more docs and tests. #
Total comments: 239
Patch Set 4 : Partially address comments, doesn't run yet. #Patch Set 5 : Address remaining comments. #
Total comments: 24
Patch Set 6 : Rewrote StreamCompleter. #Patch Set 7 : Switch StreamEvents to StreamQueue, completely new implementation that can support hasNext. #Patch Set 8 : Add hasNext operation. #Patch Set 9 : Remove the mutable delegating subscription again. #Patch Set 10 : Add all.dart to test. Apparently people like that. #
Total comments: 117
Patch Set 11 : Address comments. #
Total comments: 10
Patch Set 12 : Address comments. #
Messages
Total messages: 25 (3 generated)
lrn@google.com changed reviewers: + sgjesse@google.com
Thank you, Lasse! I look forward to when this becomes available, but I dare not review this. I always get confused when it comes to async. https://codereview.chromium.org/1149563010/diff/1/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/1/lib/src/stream_events.dart#... lib/src/stream_events.dart:16: class StreamEvents<T> { It took me quite some time to realize this is the feature we discussed earlier this week. Perhaps we can find a more descriptive name, for example, PullStream? The part that's confusing, IMO, is "Events". Too generic. https://codereview.chromium.org/1149563010/diff/1/test/subscription_stream_te... File test/subscription_stream_test.dart (right): https://codereview.chromium.org/1149563010/diff/1/test/subscription_stream_te... test/subscription_stream_test.dart:10: import "package:test/test.dart"; This most likely makes this a really slow test to run on dart2js. Also, it introduces a weird cycle in the dependencies of testing dart:async: what if package:test uses these features? How dow you know if package:test or dart:async is broken?
+floitsch for naming suggestions. https://codereview.chromium.org/1149563010/diff/1/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/1/lib/src/stream_events.dart#... lib/src/stream_events.dart:16: class StreamEvents<T> { This is a little hard to name well. PullStream won't work because it's not a Stream. StreamPuller is .... just bad. I want a noun phrase that captures what this is about, and it is a way to access stream events. I don't want any "-er" suffix (Provider, Handler, Manager). So, suggestions are welcome, but it's not easy (or at least I haven't found the easy answer yet :). https://codereview.chromium.org/1149563010/diff/1/test/subscription_stream_te... File test/subscription_stream_test.dart (right): https://codereview.chromium.org/1149563010/diff/1/test/subscription_stream_te... test/subscription_stream_test.dart:10: import "package:test/test.dart"; On 2015/06/08 16:55:16, ahe wrote: > This most likely makes this a really slow test to run on dart2js. If the test package is slow on dart2js, then it should probably be optimized for that. It is being used for almost every package anyway. > Also, it introduces a weird cycle in the dependencies of testing dart:async: > what if package:test uses these features? How dow you know if package:test or > dart:async is broken? This is package:async, not dart:async. Still, if package:test uses package:async, then we have a problem, but that too is true for every other package. In practice I don't think we'll manage to write two independent bugs that manages to just cancel each other out when testing this package and package:test, and not be detected in some other way.
LGTM Realy nice async abstractions. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:27: class StreamCompleter<T> { Maybe add a constructor with an optional onListen callback taking the StreamCompleter as an argument. That callback can then call setSourceStream when the stream is listened on. Use case: The resulting stream is from some async request where the response completes with something that has/is a stream (e.g. HTTP or other RPC request). https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:81: if (state == _UNINITIALIZED) { Why not just check (_state & _STREAM_SET) == 0? Then you don't need the temp variable state. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:95: print(state); Debug print. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:130: var _stateData = new List(3); Why use a list with three members instead of three named member variables? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:135: bool get _isLinked => _state == _LINKED; If you relay want to use a bit-field why not ave getters and settes for all bits? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:143: if (_isCancelled) { Shouldn't we listen on and cancel the stream in this case? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:149: List handlers = _stateData; Why this 'cast' - documentation? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:208: } else if (!_isCancelled) { assert that it is not already paused (or use |). https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:219: } else if (_state >= _PAUSE) { Just check the bit instead of using >=. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:224: Future cancel() { What is the semantics of calling cancel several times? Should it return the same future? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:275: typedef _BinaryCallback(a, b); We don't have this typedef publid in and dart: library? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:23: /// while (first.startsWith('#')) { Maybe add an isDone check in this sample. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:52: Maybe put an additional comment that 4 is reserved bu _ClOSE_REST below. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:358: bool get isComplete; Add close method here as well I think the 'close' method should have a different name, e.g. 'done'. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:51: /// events after the first error, including done events. I find that the requirement to know the cancel on error setting of the subscription is somewhat subtle - and required a long explanation. As discussed off-line we could make a change in dart:async to make the cancel on error state visible on a StreamSubscription. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:165: typedef _BinaryCallback(e, s); See comment in other file. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... File test/stream_events_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:12: main() { Add check of isDone. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:24: var r = await Future.wait(new Iterable.generate(4, (n) => e.next)); n -> _ https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:232: var tests = [nextTest, skipTest, takeTest]; assert(tests.length * tests.length + 20 == 200) or pass 'tests.length * tests.length + 20' as an argument to createLongStream. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:261: () async{ Space after async.
nweiz@google.com changed reviewers: + nweiz@google.com
First of all: this is really exciting stuff. I'm glad to see this package getting love after languishing for so long, and all the utilities you're working on are things I've personally run into the need for. Not having to re-invent the wheel all the time will be great. It's clear from your style that you're used to writing very performance-critical code that few people will look at directly. That's perfect for the core libraries, but the package world has some different constraints. Packages are much less likely to be on the critical path for a large amount of code, so very tight optimizations pay off a lot less. On the other hand, they're much *more* likely to be read and contributed to by users, so very readable code pays off a lot more. This is the thrust of a lot of my comments on the CL: moving away from optimal low-level code and towards higher-level code that's a little slower but also easier to follow. That will help make this package easier for other people (including me) to follow and thus more likely for them to contribute, or even just to feel like they understand the tools they're using. https://codereview.chromium.org/1149563010/diff/40001/lib/src/delegates.dart File lib/src/delegates.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/delegates.dart#... lib/src/delegates.dart:5: library async.streams.delegates; Library annotations should match the path to the library (here and elsewhere). Also, I'd call this file "delegating_stream_subscription". It would be nice to keep this one-class-per-file, but even if not, the name should reflect the current contents rather than the future contents. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:12: /// because you can just use directly create a stream that doesn't do anything "use directly" -> "directly" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:27: class StreamCompleter<T> { Consider adding a constructor so that the user can specify whether this will return a broadcast stream or not. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:28: final Stream<T> stream = new _PromiseStream<T>(); Nit: document this https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:29: Suggestion: add "static Stream fromFuture(Future<Stream> future)". I've written that function several times across several projects, and it would be nice to have it available here. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:32: /// There is no guarantee that the stream will ever be listened to. It would be nice to go further and fully explain how all operations—listening, pausing, canceling—on the returned stream affect the source stream. Also, document what happens if multiple source streams are set. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:39: void setEmpty() { Document that this is mutually exclusive with [setSourceStream]. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:54: class _PromiseStream<T> extends Stream<T> { Even though it's not public, it would be nice to have this name match up with "StreamCompleter" somehow. Maybe "_CompleterStream"? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:57: static const int _LISTENED = 2; Document the semantics of each of these variables. Nit: Constants should no longer be all-caps per the style guide. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:59: int _state = _UNINITIALIZED; Document that this is a bitfield. At first glance, it looks like it just has a simple enum value. Actually, it would probably be clearest if [streamSet] and [listened] were both boolean fields, and [uninitialized] was defined as "bool get uninitialized => !streamSet && !listened". https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:60: var _streamOrSubscription; This really seems like it should be two separate fields. I get that they'll never be set at the same time, but merging them together sacrifices readability for a performance gain that's probably extremely marginal. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:62: void _linkStream(Stream stream) { It would be really nice to have all these private functions documented. Since this is in a package, more people (including me!) are likely to want to look through the implementation and figure out what's going on. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:79: int state = _state; Nit: don't type-annotate local variables. There are a couple more scattered about that aren't necessary for type coercion. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:83: new _PromiseSubscription<T>(true == cancelOnError); "true == cancelOnError" is really hard to read. It would be a lot clearer if you just did "if (cancelOnError == null) cancelOnError = true" at the top of the function to match the convention for default values. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:100: /// Subscription for a [_PromiseStream] that is listened to but has no data. "has no data" is a little misleading, since this won't be used if the stream has been completed but the underlying stream hasn't emitted any data. Maybe "hasn't yet completed to an actual stream"? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:103: /// events until events are available, then it starts forwarding to another "events are available" has the same issue as above. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:115: static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. Similarly to the above, it would be a lot clearer if [pause] and [cancelOnError] were separate fields. Bitwise boolean logic is really hard to understand, and I can't imagine the performance difference will matter in practice. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:130: var _stateData = new List(3); On 2015/06/11 07:57:05, Søren Gjesse wrote: > Why use a list with three members instead of three named member variables? +1 https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:157: state -= _PAUSE; I have no idea what's going on here :-/. If you're going to do complex bitwise stuff like this, please document *heavily* why it works. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:172: void set _subscription(StreamSubscription subscription) { Since this is only called once, I think it would be clearer to inline it. Right now when reading [_linkStream] it looks like it's just assigning to an instance variable; it's not clear that [_state] gets modified too. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:239: if (!_isCancelled) { Nit: Short-circuit if it is cancelled to save some indentation. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:246: _handlers[2] = () { completer.complete(futureValue); }; Nit: Either use "=>" or make this multiple lines. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:264: return (e, s) { Nit: use full words for variables. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:17: /// The individual requests are served in the order they are requested. Explain how this is different from a [StreamIterator]. Also, document that the underlying stream is paused as long as there are no pending requests. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:37: class StreamEvents<T> { Plural class names always seem weird to me. What do you think about calling this something like "PullStream" instead? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:38: /// In the initial state, the stream has not been listened to yet. Nit: The first paragraph of a doc comment should be a one-sentence description, per https://www.dartlang.org/articles/doc-comment-guidelines/. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:41: static const int _INITIAL = 0; Nit: since these fields aren't right up next to one another, I don't think there's much value in adding a bunch of extra whitespace to get them aligned. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:56: /// is completed and is witing to clean up. "witing" -> "waiting" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:75: Queue<_EventRequest> _requestQueue = new Queue(); Make this final. Personally, I prefer to omit the type annotation for private variables with initializers, since it's clear from the value what the type is. I'd write this: final _requestQueue = new Queue<_EventRequest>(); https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:83: /// Whether the underlying stream is spent. "spent" -> "done" Just to use consistent terminology for the same thing. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:88: bool get isDone => _isDone; What's the use case for exposing this to the user? It seems like it violates the encapsulation of the inner stream. Also, consider adding a [hasNext] getter that (asynchronously) returns whether a [next] can be called without getting a [StateError]. This will make it easier for users to safely iterate through the stream. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:90: /// Return the stream subscription while state is listening. Nit: use present tense (e.g. "Returns" rather than "Return") when describing methods. It makes it much easier to avoid confusing tense disagreements. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:117: /// Request the next (yet unrequested) event from the stream. Clarify in this documentation that it's valid to have multiple pending calls to [next] at once (this is an important difference from [StreamIterator]). https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:121: /// an error event. I'd write this as: "If the event is a value event, the returned future completes with its value. If it's an error, the returned future throws that error." https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:123: /// If the stream closed before an event arrives, the future is completed "closed" -> "closes", "is completed" -> "completes" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:134: /// Request a stream of all the remaning events of the source stream. "Request" -> "Returns" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:141: /// `rest` it is no longer allowed to request other events, like "it is no longer allowed to" -> "the caller may no longer https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:144: if (!_isClosed) { Nit: Short-circuit if it's closed to save on indentation. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:157: // We have never listened the source stream, so just return that directly. "listened to" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:165: /// Requests to skip the next [count] *data* events. "Requests to skip" -> "Skips" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:172: /// If an error occurs before `count` data events has "has" -> "have" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:193: /// The [count] value must be non-negative. "The [count] value" -> "[count]" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:216: /// Release the underlying stream subscription. "Release" -> "Cancels" https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:228: Future close() { Consider calling this [cancel], to match [StreamSubscription] and [StreamIterator]. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:232: assert(_requestQueue.isEmpty); Add a comment explaining why the request queue will be empty here. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:240: throw _failClosed(); Everything in the core libraries that's closable allows [close] to be called multiple times; it seems like this should be similar. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:243: /// Reused error message. Expand on this. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:253: void _listen() { Methods like this and [_pause] that are only called from one place and have a lot of description to explain the context seem like they might be clearer if they were just inlined. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:269: // Callbacks receiving the events of the source stream. Nit: If this is referring to a group of methods, add a blank line after it so that it doesn't look like it's just attached to [_onData]. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:293: assert(_requestQueue.isNotEmpty); This assertion seems redundant, since the line below will throw if the queue is empty anyway. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:304: if (action.isComplete) { Explain under what circumstances an action that was just added could already be complete. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:354: Nit: extra newline. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:357: abstract class _EventRequest implements EventSink { It's kind of weird that the documentation for these describes them as "actions" but the class names are all "request". It would be nice to settle on consistent terminology. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:358: bool get isComplete; This should also have [add] and [addError] abstract methods. Also, document this. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:363: Completer completer; Even though it doesn't do anything at the language level, I really like making members of private classes that aren't used by other classes private. It helps communicate to the reader that this isn't something that will be used externally. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:364: _NextRequest(this.completer); Rather than passing in a completer, consider making it an instance field and exposing its future to the caller. For example: Future get future => _completer.future; final _completer = new Completer(); then from the caller: var request = new _NextRequest(); _addRequest(request); return request.future; This just helps keep the code a little more cleanly layered. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:368: completer = null; Does nulling this out buy you anything? It seems like you could just check [completer.isCompleted] instead of checking against null. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:378: completer.completeError(new StateError("no elements")); If you do continue to check against null, null out completer here. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:388: int count; Document this. Also consider calling it something more semantic, like "eventsToSkip". https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:450: throw new UnsupportedError("event"); Shouldn't this be "error"? https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:475: throw new UnsupportedError("event"); Ditto. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:32: /// If the original `isCancelOnError` value for the subscription is known, "isCancelOnError" -> "cancelOnError" https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:51: /// events after the first error, including done events. On 2015/06/11 07:57:07, Søren Gjesse wrote: > I find that the requirement to know the cancel on error setting of the > subscription is somewhat subtle - and required a long explanation. > > As discussed off-line we could make a change in dart:async to make the cancel on > error state visible on a StreamSubscription. I also found this pretty hard to follow. If [StreamSubscription] doesn't get updated, I'd consider writing out the behavior more explicitly, like this: [cancelOnError] doesn't need to match the original subscription's value; however, if [cancelOnError] is `false` and the original subscription's value is `true`, this stream will never emit a done event. On the other hand, if [cancelOnError] is true, this stream will emit a done event after the first error regardless of the original subscription's value. Since an updated StreamSubscription won't hit stable for a while, and when it does this class will be in a weird state, it's also worth considering just removing the [isCancelOnError] parameter entirely and saying that cancel-on-error subscriptions aren't supported yet. I doubt that will cause much of a problem in practice. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:52: SubscriptionStream(StreamSubscription subscription, Shouldn't this be a StreamSubscription<T>? https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:53: {bool isCancelOnError: false}) I'd call this "cancelOnError" to match the parameter to [listen], especially since there's no way to enforce that it actually matches [subscription]. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:75: if (subscription is StreamSubscription<T>) { I don't understand why [subscription] isn't typed as a StreamSubscription<T>, but in my experience "is" checks against generics are extremely unreliable and should be avoided. If the user passed in a subscription without a reified generic, it's perfectly fine to return the same. Generic reification gets lost all the time anyway. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:96: class _CancelOnErrorSubscriptionWrapper<T> Document this. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:110: Function _cancelBeforeError(Function handleError) { Why not just inline this? https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:111: return (e, s) { Use full words for variables. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:122: /// Subscription wrapper that assumes wrapped subscription is cancel-on-error. "assumes wrapped" -> "assumes the wrapped" https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:131: var _onDone; Nit: swap this with the previous blank line. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:145: Function _doneAfterError(Function errorHandler) { This also seems like it would be clearer if it were inlined. https://codereview.chromium.org/1149563010/diff/40001/pubspec.yaml File pubspec.yaml (right): https://codereview.chromium.org/1149563010/diff/40001/pubspec.yaml#newcode2 pubspec.yaml:2: version: 1.1.1 1.2.0-dev https://codereview.chromium.org/1149563010/diff/40001/pubspec.yaml#newcode7 pubspec.yaml:7: test: ">=0.12.0 <0.14.0" The upper bound should be 0.13.0 (or just merge with master). https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... File test/stream_completer_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:5: library async.stream_completer_test; Tests don't need library annotations. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:13: test("early link", () async { Rather than "early" and "late" consider spelling out exactly at what point in the lifecycle a link is being made. The test API is designed explicitly to make it possible to use full sentences or sentence fragments; for example, "a source stream is set before the completer is listened to." https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:14: var c = new StreamCompleter(); Use full words for variables. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:15: var s = c.stream; Pulling this out into a variable makes this harder to follow, especially when the variable is declared so far away from where it's used. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:18: await s.listen(r.add).asFuture(); Consider just: expect(s.toList(), completion(equals([1, 2, 3, 4]))); Since you're not writing tests for the Stream base class, it's fine to assume that [Stream.toList] will translate reasonably into a call to [listen]. Same goes elsewhere in this file. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:27: await sleep(250); Sleeping is a recipe for tests that are both slow and flaky. If you need to ensure that all microtask-level events have finished firing, just use "await new Future.delayed(Duration.ZERO)". If there are non-microtask-level events in this code, that's a problem anyway. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:75: } There's a bunch of publicly-visible behavior that isn't tested for: * Pausing/canceling the stream after the completer has fired but before the stream is complete. * [setEmpty]. * Ensuring that [listen] isn't called on the underlying stream until the stream completer is listened to. * Ensuring that [cancelOnError] is forwarded properly, both when the completer stream is listened to before and after it's linked. * Handling errors, both before and after linking. * Linking a stream more than once. * Listening to a completer stream more than once. * Setting onData/onError/onDone on the completer subscription, both before and after it's linked. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... File test/stream_events_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:19: expect(e.next, throws); throwsStateError. In general, check that the exceptions have the types you expect throughout this file (or in the case of thrown strings, that they're the strings you expect). https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:22: test("request mutiple", () async { "mutiple" -> "multiple" https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:24: var r = await Future.wait(new Iterable.generate(4, (n) => e.next)); On 2015/06/11 07:57:07, Søren Gjesse wrote: > n -> _ Explicitly writing out the [e.next] calls would be more readable here, in my opinion. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:26: await e.close(); Is there any reason to explicitly close the stream here? It and all its subscribers should get garbage-collected when it goes out of scope. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:37: }); Test that the underlying subscription is paused when there are no [next] calls pending. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:40: test("simple-2", () async { This name isn't very descriptive. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:51: expect(() => e.skip("not a number"), throws); I don't think it's necessary to test type errors. That's the language's job to enforce, and it would be a huge pain to test everywhere. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:205: }); Also test forwarding pause/cancel to the underlying subscription. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:217: }); Ensure that it cancels the underlying subscription. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:242: completion(new List.generate(20, (i) => counter + i))); It's hard for me to follow what's going on here... can you add some comments? https://codereview.chromium.org/1149563010/diff/40001/test/subscription_strea... File test/subscription_stream_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/subscription_strea... test/subscription_stream_test.dart:34: }); Also test that pausing and canceling forwards properly, and that listening multiple times throws. https://codereview.chromium.org/1149563010/diff/40001/test/subscription_strea... test/subscription_stream_test.dart:36: group("cancelOnError", () { The behavior when the [cancelOnError] values don't match is also part of the public API—at least when the SubscriptionStream's is true but the underlying stream's is false. Also test that setting [onDone] works properly when both streams have [cancelOnError] true.
These are excellent comments, and it'll take some while going through them, so here are my comments so far (and no CL upload since it's not currently running :). https://codereview.chromium.org/1149563010/diff/40001/lib/src/delegates.dart File lib/src/delegates.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/delegates.dart#... lib/src/delegates.dart:5: library async.streams.delegates; On 2015/06/12 01:24:22, nweiz wrote: > Library annotations should match the path to the library (here and elsewhere). Without the "src" I assume. I originally expected to have a streams library exposing this, but dropped it without changing the library name, so this is a bug. Not that library names matter the slightest bit ;). > Also, I'd call this file "delegating_stream_subscription". It would be nice to > keep this one-class-per-file, but even if not, the name should reflect the > current contents rather than the future contents. That's fine. If there is no particular reason to keep two classees in the same library, they might as well each get their own. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:12: /// because you can just use directly create a stream that doesn't do anything On 2015/06/12 01:24:23, nweiz wrote: > "use directly" -> "directly" Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:27: class StreamCompleter<T> { You are thinking the case where you don't even want to create the stream if it won't be used. Makes sense, will consider. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:27: class StreamCompleter<T> { I think I'd use a completely different class for a broadcast stream, that would be BroadcastStreamCompleter. That's necessary because a broadcast stream can have multiple simultaneous subscriptions, and the stream here doesn't handle that. I'll change the documentation to say "single-subscription". https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:28: final Stream<T> stream = new _PromiseStream<T>(); On 2015/06/12 01:24:22, nweiz wrote: > Nit: document this Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:29: Good idea. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:32: /// There is no guarantee that the stream will ever be listened to. On 2015/06/12 01:24:23, nweiz wrote: > It would be nice to go further and fully explain how all operations—listening, > pausing, canceling—on the returned stream affect the source stream. Also, > document what happens if multiple source streams are set. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:39: void setEmpty() { On 2015/06/12 01:24:23, nweiz wrote: > Document that this is mutually exclusive with [setSourceStream]. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:54: class _PromiseStream<T> extends Stream<T> { True. I renamed the public class at some point, so the private names no longer make sense. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:57: static const int _LISTENED = 2; On 2015/06/12 01:24:22, nweiz wrote: > Document the semantics of each of these variables. Done > Nit: Constants should no longer be all-caps per the style guide. I have rewritten this file quite a lot. It actually works with lower-case constants when I use them as state names. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:59: int _state = _UNINITIALIZED; This is no longer used as a bit field. I have constants for all four state combinations, and use those instead of the individual bits. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:60: var _streamOrSubscription; I think the way I have (now) encapsulated it should make it readable. You should never read this field directly, and instead use a _stream or _subscription getter that tests that you are in a compatible state. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:62: void _linkStream(Stream stream) { On 2015/06/12 01:24:23, nweiz wrote: > It would be really nice to have all these private functions documented. Since > this is in a package, more people (including me!) are likely to want to look > through the implementation and figure out what's going on. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:79: int state = _state; Yeah, about that. I try to use more "var", but I generally still prefer to use the type if it's at all necessary (if the type isn't obvious from either the name or the right-hand side, or if I call any function on the object that isn't on object, so I can get a warning if it's not correct). For the state, it's now unnecessary since I don't do integer operations on them. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:81: if (state == _UNINITIALIZED) { I still need to read the state before setting it, to check that the stream hasn't already been listened to. It won't buy anything in practice. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:83: new _PromiseSubscription<T>(true == cancelOnError); Now forwards the parameter all the way to the final listen, so I never need to coerce it to bool. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:95: print(state); On 2015/06/11 07:57:06, Søren Gjesse wrote: > Debug print. Acknowledged. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:100: /// Subscription for a [_PromiseStream] that is listened to but has no data. I have reworded it. It's a little constrained by wanting to fit the core description on one line, but I think it works. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:103: /// events until events are available, then it starts forwarding to another reworded. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:115: static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. I have moved the entire state into a separate object so this class is just delegating to another stream subscription in all cases, and then when you link a stream the delegate is replaced. The other class does not have this complex state. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:130: var _stateData = new List(3); Space! Saving of! I started writing it as a real object instead, with onData, onError and onDone fields. Then I realized that cancelOnError should also be on that object. And I might as well put the pause state there too, and then I had a completely different (and better) solution. So, thanks :) https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:135: bool get _isLinked => _state == _LINKED; It's not always a bit field. It's really a combo: _CANCELLED and _LINKED are just states. The remaining case is a cancel-on-error flag and a pause count. Anyway, it's gone now :) https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:143: if (_isCancelled) { Probably not. For one thing, it's unnecessary overhead. Why start doing something potentially expensive if you *know* you are not going to need it. Also, calling cancel may give you a future with an error, and we don't have any place to report that error. You already got a completed future from the original cancel call. There might be cases where we do want it, so maybe it should be a flag somewhere to always subscribe, even if canceled. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:149: List handlers = _stateData; Because I like it. It's not obvious what the type is from either name or right-hand-side. And I get warnings if I use it badly below. I like warnings more than hints. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:157: state -= _PAUSE; Rewritten completely (twice), and there is no fancy stuff left. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:172: void set _subscription(StreamSubscription subscription) { Now a function named setListened that does the state transition from _inital to _listenerOnly; https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:208: } else if (!_isCancelled) { Pauses are cumulative, it can be paused more than once. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:219: } else if (_state >= _PAUSE) { It can be paused more than once, so the bit will only be set for an odd number of pauses. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:224: Future cancel() { I'm not sure it's specified, but that's what we usually do. At least it should return equivalent futures - in this case it's pre-completed futures with a null value. (I actually have a change for the Future.value constructor so it returns the same future every time you call it with null as argument - it happens often enough that it's worth optimizing, and we already have a constant null future in the code). https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:239: if (!_isCancelled) { Rewritten. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:246: _handlers[2] = () { completer.complete(futureValue); }; I skipped the special casing of null completely (and uses multiple lines now - I never user => with a void result). https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:264: return (e, s) { On 2015/06/12 01:24:23, nweiz wrote: > Nit: use full words for variables. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:275: typedef _BinaryCallback(a, b); We do: ZoneBinaryCallback in dart:async. Also not using it any more :) https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:23: /// while (first.startsWith('#')) { Hmm. Not good for the flow, but I guess I can try something. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:37: class StreamEvents<T> { It's not a stream, so "PullStream" won't work. There are other constraints, but something that mentions the events of a stream, and doesn't end in a nounified verb (no "Puller", "Manager", "Handler", etc.). I haven't found a good alternative. I also don't have a problem with plural names :) The usage of this will be: var events = new StreamEvents(someStream); var first = await events.next; I think it works. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:41: static const int _INITIAL = 0; In my editor it still works very well to show the layout. Comments are muted in color so the structure shows as a clear grid of yellow boilerplate, white names and purple values, all nicely aligned. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:52: Will do. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:56: /// is completed and is witing to clean up. On 2015/06/12 01:24:24, nweiz wrote: > "witing" -> "waiting" Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:75: Queue<_EventRequest> _requestQueue = new Queue(); Made final. I prefer type annotations on all fields. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:83: /// Whether the underlying stream is spent. On 2015/06/12 01:24:24, nweiz wrote: > "spent" -> "done" > > Just to use consistent terminology for the same thing. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:88: bool get isDone => _isDone; On 2015/06/12 01:24:25, nweiz wrote: > What's the use case for exposing this to the user? It seems like it violates the > encapsulation of the inner stream. It might be a little speculative because it doesn't actually guarantee that there will be a next data/error event - the next event might be the done event. Let's remove it - it looks like it allows you to check if .next will work, which it doesn't. > Also, consider adding a [hasNext] getter that (asynchronously) returns whether a > [next] can be called without getting a [StateError]. This will make it easier > for users to safely iterate through the stream. I don't think that would work with this abstraction. We don't know if there is a next event until that event arrives. We are not buffering events, so we need to put the event somewhere when it arrives. Maybe have a more complex nextOrElse function: Future nextOrElse(orElse()); which returns the next event as it arrives, and completes with the value of orElse() if the streams is done before that. What "next" does is to make that an error, but we could allow the user to define what happens in the done case, so they can recognize it. The default could be returning null or throwing the same state error as "next". https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:90: /// Return the stream subscription while state is listening. On 2015/06/12 01:24:25, nweiz wrote: > Nit: use present tense (e.g. "Returns" rather than "Return") when describing > methods. It makes it much easier to avoid confusing tense disagreements. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:117: /// Request the next (yet unrequested) event from the stream. On 2015/06/12 01:24:26, nweiz wrote: > Clarify in this documentation that it's valid to have multiple pending calls to > [next] at once (this is an important difference from [StreamIterator]). Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:121: /// an error event. On 2015/06/12 01:24:26, nweiz wrote: > I'd write this as: "If the event is a value event, the returned future completes > with its value. If it's an error, the returned future throws that error." Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:123: /// If the stream closed before an event arrives, the future is completed On 2015/06/12 01:24:24, nweiz wrote: > "closed" -> "closes", "is completed" -> "completes" Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:134: /// Request a stream of all the remaning events of the source stream. On 2015/06/12 01:24:25, nweiz wrote: > "Request" -> "Returns" Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:141: /// `rest` it is no longer allowed to request other events, like On 2015/06/12 01:24:25, nweiz wrote: > "it is no longer allowed to" -> "the caller may no longer Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:144: if (!_isClosed) { ACK. This is not going to be in any inner loops, so one unpredicted call is unlikely to be a problem. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:157: // We have never listened the source stream, so just return that directly. On 2015/06/12 01:24:26, nweiz wrote: > "listened to" Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:193: /// The [count] value must be non-negative. -> "The [count]". Otherwise it would start the sentence with a lower-case character. That would be bad! https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:216: /// Release the underlying stream subscription. On 2015/06/12 01:24:26, nweiz wrote: > "Release" -> "Cancels" Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:228: Future close() { Good idea! https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:232: assert(_requestQueue.isEmpty); On 2015/06/12 01:24:24, nweiz wrote: > Add a comment explaining why the request queue will be empty here. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:240: throw _failClosed(); True. One reason why I didn't allow that was that calling "cancel" after calling "rest" would not cancel anything. That's not necessarily a problem - we have the state and knows that 'rest' or 'cancel' has been called already, so we can just do nothing. I'll see if I can change it so (now) cancel can be called more than once. Heck, we can even allow you to call rest more than once (getting an empty stream each time after the first - but that might not be that iteresting. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:243: /// Reused error message. On 2015/06/12 01:24:24, nweiz wrote: > Expand on this. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:253: void _listen() { On 2015/06/12 01:24:24, nweiz wrote: > Methods like this and [_pause] that are only called from one place and have a > lot of description to explain the context seem like they might be clearer if > they were just inlined. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:269: // Callbacks receiving the events of the source stream. On 2015/06/12 01:24:25, nweiz wrote: > Nit: If this is referring to a group of methods, add a blank line after it so > that it doesn't look like it's just attached to [_onData]. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:293: assert(_requestQueue.isNotEmpty); True. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:304: if (action.isComplete) { On 2015/06/12 01:24:24, nweiz wrote: > Explain under what circumstances an action that was just added could already be > complete. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:357: abstract class _EventRequest implements EventSink { Agree, I switched to "request" halfway through, and haven't fixed all references. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:358: bool get isComplete; It's inherited from EventSink. I picked EventSink as the interface because I consider making a public _addRequest so you can create custom event collectors. It might not make a difference because they will have to be wrapped anyway, but it's simple to use the standard names for now. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:364: _NextRequest(this.completer); Good idea. Will do. The type parameter needs to go on the request class too then, but that's ok. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:368: completer = null; We have an isCompleted getter? Whoa! I'm too used to work directly on _Future objects :) https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:378: completer.completeError(new StateError("no elements")); Technically not necessary since "close" is always the last method called on a request object - but better for readability. Anyway, rewritten to not use null as a signal. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:450: throw new UnsupportedError("event"); No, we don't support receiving events, that's the problem. Maybe it should really just be "assert(false);" because the functions are/should be unreachable. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:51: /// events after the first error, including done events. It's slightly annoying. The alternative is to not have the parameter, and then not get a done event if the original subscription cancelled on error. But I hear: More text. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:165: typedef _BinaryCallback(e, s); On 2015/06/11 07:57:07, Søren Gjesse wrote: > See comment in other file. Acknowledged. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... File test/stream_events_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:12: main() { On 2015/06/11 07:57:07, Søren Gjesse wrote: > Add check of isDone. Acknowledged. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:24: var r = await Future.wait(new Iterable.generate(4, (n) => e.next)); On 2015/06/11 07:57:07, Søren Gjesse wrote: > n -> _ Acknowledged. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:261: () async{ On 2015/06/11 07:57:07, Søren Gjesse wrote: > Space after async. Done.
PTAL. I really, really, REALLY need a better name for MutableDelegatingStreamSubscriptionController. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:388: int count; On 2015/06/12 01:24:24, nweiz wrote: > Document this. Also consider calling it something more semantic, like > "eventsToSkip". Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:475: throw new UnsupportedError("event"); On 2015/06/12 01:24:26, nweiz wrote: > Ditto. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:32: /// If the original `isCancelOnError` value for the subscription is known, I considered that, but it looks too much like a request that something should be cancelOnError, where it really is telling whether `subscription` already is cancelOnError. Not perfect, but I want it to be different from the `cancelOnError` of `listen`. Maybe sourceCancelsOnError ? https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:51: /// events after the first error, including done events. I adapted the paragraph, it's more readable than the four cases I had before. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:52: SubscriptionStream(StreamSubscription subscription, Like Iterable.from, the argument is more permissive than the result. This allows you to pass a StreamSubscription<number> to a SubscriptionStream<int> if you know that the subscription only produces integers. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:53: {bool isCancelOnError: false}) And I want it to not match because it's not really requesting cancelOnError, but rather describing whether subscription is (assumed to be) cancelOnError. Alternatives welcome, but I'd prefer not to use the same name with two subtly different meanings. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:75: if (subscription is StreamSubscription<T>) { The problem is that we promise to return a StreamSubscription<T>, and we (deliberately) accept a StreamSubscription so we can accept both sub- and super-classes of T. So, what I'm testing here is really whether subscription is assignable to StreamSubscription<T> so that the user won't get an ugly surprise when I return it. The alternative is to just drop this and always wrap in a DelegatingStreamSubscription. That's probably both safer and clearer, so let's just do that. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:96: class _CancelOnErrorSubscriptionWrapper<T> On 2015/06/12 01:24:26, nweiz wrote: > Document this. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:110: Function _cancelBeforeError(Function handleError) { I generally like to put functionality in separate functions with meaningful names - but the original function is so short that the call is really adding more noise than signal. So, done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:111: return (e, s) { On 2015/06/12 01:24:26, nweiz wrote: > Use full words for variables. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:122: /// Subscription wrapper that assumes wrapped subscription is cancel-on-error. But then it doesn't fit on one line?!? :) Rewritten. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:131: var _onDone; On 2015/06/12 01:24:26, nweiz wrote: > Nit: swap this with the previous blank line. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:145: Function _doneAfterError(Function errorHandler) { Done. https://codereview.chromium.org/1149563010/diff/40001/pubspec.yaml File pubspec.yaml (right): https://codereview.chromium.org/1149563010/diff/40001/pubspec.yaml#newcode7 pubspec.yaml:7: test: ">=0.12.0 <0.14.0" On 2015/06/12 01:24:27, nweiz wrote: > The upper bound should be 0.13.0 (or just merge with master). Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... File test/stream_completer_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:13: test("early link", () async { On 2015/06/12 01:24:27, nweiz wrote: > Rather than "early" and "late" consider spelling out exactly at what point in > the lifecycle a link is being made. The test API is designed explicitly to make > it possible to use full sentences or sentence fragments; for example, "a source > stream is set before the completer is listened to." Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:14: var c = new StreamCompleter(); On 2015/06/12 01:24:27, nweiz wrote: > Use full words for variables. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:15: var s = c.stream; On 2015/06/12 01:24:27, nweiz wrote: > Pulling this out into a variable makes this harder to follow, especially when > the variable is declared so far away from where it's used. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:18: await s.listen(r.add).asFuture(); On 2015/06/12 01:24:27, nweiz wrote: > Consider just: > > expect(s.toList(), completion(equals([1, 2, 3, 4]))); > > Since you're not writing tests for the Stream base class, it's fine to assume > that [Stream.toList] will translate reasonably into a call to [listen]. > > Same goes elsewhere in this file. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:27: await sleep(250); Switched to flushing microtasks (aka timer 0 future). https://codereview.chromium.org/1149563010/diff/40001/test/stream_completer_t... test/stream_completer_test.dart:75: } Good points. Many more tests added. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... File test/stream_events_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:12: main() { On 2015/06/12 13:04:23, Lasse Reichstein Nielsen wrote: > On 2015/06/11 07:57:07, Søren Gjesse wrote: > > Add check of isDone. > > Acknowledged. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:19: expect(e.next, throws); On 2015/06/12 01:24:27, nweiz wrote: > throwsStateError. In general, check that the exceptions have the types you > expect throughout this file (or in the case of thrown strings, that they're the > strings you expect). Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:22: test("request mutiple", () async { On 2015/06/12 01:24:28, nweiz wrote: > "mutiple" -> "multiple" Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:24: var r = await Future.wait(new Iterable.generate(4, (n) => e.next)); It's also shorter. Until I start using longer names for "e". https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:26: await e.close(); True. It's good style, and I'd like to encourage it, but in this case it's probably not necessary. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:37: }); On 2015/06/12 01:24:28, nweiz wrote: > Test that the underlying subscription is paused when there are no [next] calls > pending. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:40: test("simple-2", () async { On 2015/06/12 01:24:27, nweiz wrote: > This name isn't very descriptive. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:51: expect(() => e.skip("not a number"), throws); On 2015/06/12 01:24:27, nweiz wrote: > I don't think it's necessary to test type errors. That's the language's job to > enforce, and it would be a huge pain to test everywhere. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:205: }); On 2015/06/12 01:24:27, nweiz wrote: > Also test forwarding pause/cancel to the underlying subscription. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:217: }); On 2015/06/12 01:24:27, nweiz wrote: > Ensure that it cancels the underlying subscription. Done. https://codereview.chromium.org/1149563010/diff/40001/test/stream_events_test... test/stream_events_test.dart:242: completion(new List.generate(20, (i) => counter + i))); On 2015/06/12 01:24:27, nweiz wrote: > It's hard for me to follow what's going on here... can you add some comments? Done. https://codereview.chromium.org/1149563010/diff/40001/test/subscription_strea... File test/subscription_stream_test.dart (right): https://codereview.chromium.org/1149563010/diff/40001/test/subscription_strea... test/subscription_stream_test.dart:34: }); On 2015/06/12 01:24:28, nweiz wrote: > Also test that pausing and canceling forwards properly, and that listening > multiple times throws. Done. https://codereview.chromium.org/1149563010/diff/40001/test/subscription_strea... test/subscription_stream_test.dart:36: group("cancelOnError", () { The onDone w/ both cancelOnErrors true is handled by the yes-yes case. It sets onDone, and it's never called. I should test the controller is actually canceled at the end.
I haven't quite finished responding yet, but I wanted to get this out before I left for the day. https://codereview.chromium.org/1149563010/diff/40001/lib/src/delegates.dart File lib/src/delegates.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/delegates.dart#... lib/src/delegates.dart:5: library async.streams.delegates; On 2015/06/12 13:04:19, Lasse Reichstein Nielsen wrote: > On 2015/06/12 01:24:22, nweiz wrote: > > Library annotations should match the path to the library (here and elsewhere). > > Without the "src" I assume. > I originally expected to have a streams library exposing this, but dropped it > without changing the library name, so this is a bug. > > Not that library names matter the slightest bit ;). They matter exactly inasmuch as tools will complain if they conflict, which is just enough to make it necessary to ensure their uniqueness :-/. > > Also, I'd call this file "delegating_stream_subscription". It would be nice to > > keep this one-class-per-file, but even if not, the name should reflect the > > current contents rather than the future contents. > > That's fine. If there is no particular reason to keep two classees in the same > library, they might as well each get their own. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file As part of moving a bunch of existing async utilities to this package, I realized I had a StreamGroup class that takes a bunch of streams and merges their outputs. Its functionality is a superset of this and could be used to implement it very straightforwardly. It would probably be a little less efficient due to more indirection, but it would cut down on the code size a lot. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:60: var _streamOrSubscription; On 2015/06/12 13:04:20, Lasse Reichstein Nielsen wrote: > I think the way I have (now) encapsulated it should make it readable. > You should never read this field directly, and instead use a _stream or > _subscription getter that tests that you are in a compatible state. It's definitely better, but it's still a lot of extra machinery for a very small benefit. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:79: int state = _state; On 2015/06/12 13:04:21, Lasse Reichstein Nielsen wrote: > Yeah, about that. > I try to use more "var", but I generally still prefer to use the type if it's at > all necessary (if the type isn't obvious from either the name or the right-hand > side, or if I call any function on the object that isn't on object, so I can get > a warning if it's not correct). As Bob likes to say, code is for people to read it; if people can figure out what's going on and tools can't, the tools should be fixed, not the code itself. Of course, this could be (and has been) debated endlessly, but luckily we have a style guide to make the final decision for us. And it's pretty unambiguous: except in exceptional circumstances, local variables shouldn't be type-annotated. > For the state, it's now unnecessary since I don't do integer operations on them. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:143: if (_isCancelled) { On 2015/06/12 13:04:21, Lasse Reichstein Nielsen wrote: > Probably not. > > For one thing, it's unnecessary overhead. Why start doing something potentially > expensive if you *know* you are not going to need it. > > Also, calling cancel may give you a future with an error, and we don't have any > place to report that error. You already got a completed future from the original > cancel call. > > There might be cases where we do want it, so maybe it should be a flag somewhere > to always subscribe, even if canceled. I agree with Søren that the default should be to subscribe and then immediately cancel; the completer's pattern of interaction with the stream it's linked to should have as little dependency on ordering as possible. If we're going to make it configurable, I think it would be better for the default to be easier to reason about but slightly less efficient, and allow users to opt in to the more difficult but potentially faster version. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:37: class StreamEvents<T> { On 2015/06/12 13:04:22, Lasse Reichstein Nielsen wrote: > It's not a stream, so "PullStream" won't work. > There are other constraints, but something that mentions the events of a stream, > and doesn't end in a nounified verb (no "Puller", "Manager", "Handler", etc.). > I haven't found a good alternative. > > I also don't have a problem with plural names :) > > The usage of this will be: > > var events = new StreamEvents(someStream); > var first = await events.next; > > I think it works. Bob suggested "StreamPump", using the water analogy: where a stream flows on its own accord, you explicitly pull water out of a pump one unit at a time. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:88: bool get isDone => _isDone; On 2015/06/12 13:04:23, Lasse Reichstein Nielsen wrote: > On 2015/06/12 01:24:25, nweiz wrote: > > What's the use case for exposing this to the user? It seems like it violates > the > > encapsulation of the inner stream. > > It might be a little speculative because it doesn't actually guarantee that > there will be a next data/error event - the next event might be the done event. > > Let's remove it - it looks like it allows you to check if .next will work, which > it doesn't. > > > > Also, consider adding a [hasNext] getter that (asynchronously) returns whether > a > > [next] can be called without getting a [StateError]. This will make it easier > > for users to safely iterate through the stream. > > I don't think that would work with this abstraction. > We don't know if there is a next event until that event arrives. We are not > buffering events, so we need to put the event somewhere when it arrives. The version of this I wrote for ScheduledTest has [hasNext], and it works well. It is the case that it has to wait for an actual event to arrive, but that's the case anyway when you're calling [next], and it's really nice to have a way to do so while guaranteeing that it won't throw. It also enables the following pattern: while (... && await events.hasNext) { values.add(await events.next); } > Maybe have a more complex nextOrElse function: > Future nextOrElse(orElse()); > which returns the next event as it arrives, and completes with the value of > orElse() if the streams is done before that. What "next" does is to make that an > error, but we could allow the user to define what happens in the done case, so > they can recognize it. > The default could be returning null or throwing the same state error as "next". This would also be useful, but I don't think it obviates the need for [hasNext]. Being able to do while loops like the one above is really nice, as is being able to check that the object is in a valid state before passing it to something else that actually calls [next]. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:51: /// events after the first error, including done events. On 2015/06/15 15:46:24, Lasse Reichstein Nielsen wrote: > I adapted the paragraph, it's more readable than the four cases I had before. The more I think about this, the more I think the right answer here is to just omit the parameter entirely. The correct solution in the long term is clearly to define and use [StreamSubscription.cancelOnError], at which point any explicit configuration here will need to be deprecated and eventually removed. An easy way to avoid that painful process would be to just not support it in the first place. Do we think anyone will want to wrap cancel-on-error stream subscriptions in the near future in practice? If not, I think we should just say "they're not supported for now" and plan to fix it for 1.12. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:110: Function _cancelBeforeError(Function handleError) { On 2015/06/15 15:46:24, Lasse Reichstein Nielsen wrote: > I generally like to put functionality in separate functions with meaningful > names - but the original function is so short that the call is really adding > more noise than signal. So, done. That's interesting... I tend to prefer keeping them inline unless there's a strong semantic distinction between the functionality of the pieces, to make it read more like straight-line code. https://codereview.chromium.org/1149563010/diff/80001/lib/async.dart File lib/async.dart (right): https://codereview.chromium.org/1149563010/diff/80001/lib/async.dart#newcode13 lib/async.dart:13: export "src/stream_events.dart"; Move this stuff above stream_zip to keep the exports alphabetical. https://codereview.chromium.org/1149563010/diff/80001/lib/src/delegating_stre... File lib/src/delegating_stream_subscription.dart (right): https://codereview.chromium.org/1149563010/diff/80001/lib/src/delegating_stre... lib/src/delegating_stream_subscription.dart:70: class MutableDelegatingStreamSubscriptionController<T> { I think it's quite unlikely that people will want to use this outside this package. It would be nice to keep it package-private so we don't commit to maintaining it as part of the public API. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:36: final Stream<T> stream = new _CompleterStream<T>(); I should have mentioned this in the last pass (sorry!), but: is it worth all the complexity here to avoid just making this a stream controller and manually forwarding events to it? That would substantially cut down on the size of the code here, make it look more like user-authored async code, and avoid duplicating a lot of stuff that's already in dart:async. I'm sure it wouldn't be quite as efficient, but I doubt it would be enough to matter. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:49: onError: (e, s) { "e, s" -> "_" https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:75: /// is paused as many times, and then all events and callbacks are forwarded It's kind of strange that this forwards the same *number* of pauses to the source stream. StreamController folds all pauses on its subscription into a single [onPaused] event, and it feels like that's what should happen here. At least this shouldn't guarantee the number of pause calls that are passed to the subscription. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:109: /// Flag marking that the stream has been listened to. Nit: separate these with newlines. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:143: /// but the stream has not been listened to yet (state is [_streamOnly]); ";" -> "." https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:153: /// but the source stream has not been set to yet (state is [_listenerOnly]); ";" -> "." https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:175: void _setListened(MutableDelegatingStreamSubscriptionController subscription) { Long line. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:219: controller.sourceSubscription = Why are you assigning this rather than passing it in as a parameter? https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:228: if (_state == _streamOnly) { Nit: "else if", for compactness and to match other places in the code. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:264: /// This both marks the subscription as canceled and allows returning Nit: newline above this. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:302: var cancelFuture = new Future.value(); Why return a future here? cancel() is allowed to return null right?
Didn't get to do any fixes today, but good comments! https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file True. And if I was just using a controller this could be much simpler: class StreamCompleter { var _controller; bool _listened = false; var _stream; StreamCompleter() { _controller = new StreamController(sync: true, onListen: _listen); } Stream get stream => _controller.stream; void _listen() { _listened = true; if (_stream != null) { _stream.pipe(_controller); } } void setSourceStream(Stream source) { if (_stream != null) throw new StateError(""); _stream = source; if (_listened) { if (_controller.hasListener) { source.pipe(_controller); } else { // Has already been listened to and canceled again. source.listen(null).cancel().catchError((_){}); } } } } Maybe I'm trying too hard to avoid overhead? Nah! Impossible! https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:60: var _streamOrSubscription; True. I think I'll try rewriting it without the state. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:79: int state = _state; True. It's mainly for myself. I like to see the type of a variable declaration without having to look at the right-hand side. Good naming goes a long way towards fixing that - then you can just look a the variable, not even the declaration. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:143: if (_isCancelled) { Ok. The only problem with that is that listen+cancel may return a future with an error that we have to swallow. That won't happen if we don't listen at all. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:37: class StreamEvents<T> { I'm not a great fan of analogies in nameing - it only makes sense if you think the same way as the person doing the naming. Something more objective would be better. It introduces a new concept that the user need to understand. What is a "Pump"? Do we have other pumps? Should we? Maybe StreamEventQueue? Except it's not a Queue in the collection sense. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:88: bool get isDone => _isDone; It doesn't solve the problem of what to do with the event value when it arrives if there is no later event waiting for it. That requires buffering the event (value or error) until the next request is made. If the next request is "rest", then it needs to push the event back on the subscription before creating the SubscriptionStream. I guess I could do a wrapper class for that, wrapping a subscription with a list of prior events to provide before the real events start. Still complicated, but I can see the wish for a `hasNext`. https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/subscription_st... lib/src/subscription_stream.dart:51: /// events after the first error, including done events. On 2015/06/16 01:05:23, nweiz wrote: > The more I think about this, the more I think the right answer here is to just > omit the parameter entirely. I tend to agree. It just creates ugliness all the way down. If you use a subscription that doesn't provide a done event, that's the stream you get. > The correct solution in the long term is clearly to > define and use [StreamSubscription.cancelOnError], at which point any explicit > configuration here will need to be deprecated and eventually removed. An easy > way to avoid that painful process would be to just not support it in the first > place. That would be 2.0 at the earliest. > > Do we think anyone will want to wrap cancel-on-error stream subscriptions in the > near future in practice? If not, I think we should just say "they're not > supported for now" and plan to fix it for 1.12. This class is probably one you use because you planned for it, so it shouldn't be hard to not use cancelOnError.
https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file On 2015/06/16 13:05:45, Lasse Reichstein Nielsen wrote: > True. And if I was just using a controller this could be much simpler: > > class StreamCompleter { > var _controller; > bool _listened = false; > var _stream; > StreamCompleter() { > _controller = new StreamController(sync: true, > onListen: _listen); > } > Stream get stream => _controller.stream; > void _listen() { > _listened = true; > if (_stream != null) { > _stream.pipe(_controller); > } > } > void setSourceStream(Stream source) { > if (_stream != null) throw new StateError(""); > _stream = source; > if (_listened) { > if (_controller.hasListener) { > source.pipe(_controller); > } else { > // Has already been listened to and canceled again. > source.listen(null).cancel().catchError((_){}); > } > } > } > } > > Maybe I'm trying too hard to avoid overhead? Nah! Impossible! Here's a (rough untested) sketch of what it would look like with StreamGroup: class StreamCompleter { final _group = new StreamGroup(); Stream get stream => _group.stream; Future setSourceStream(Stream source) { var future = _group.add(source); _group.close(); return future; } } https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:37: class StreamEvents<T> { On 2015/06/16 13:05:45, Lasse Reichstein Nielsen wrote: > I'm not a great fan of analogies in nameing - it only makes sense if you think > the same way as the person doing the naming. Something more objective would be > better. > It introduces a new concept that the user need to understand. What is a "Pump"? > Do we have other pumps? Should we? All names in programming are analogies at some level, but I take your point: using existing vocabulary is better. > Maybe StreamEventQueue? Except it's not a Queue in the collection sense. That's true, but despite being overloaded "queue" is a well-known term in concurrent programming that fits this concept pretty well. I'm down with it, although I'd call it "StreamQueue" to be more concise. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:88: bool get isDone => _isDone; On 2015/06/16 13:05:45, Lasse Reichstein Nielsen wrote: > It doesn't solve the problem of what to do with the event value when it arrives > if there is no later event waiting for it. > That requires buffering the event (value or error) until the next request is > made. If the next request is "rest", then it needs to push the event back on the > subscription before creating the SubscriptionStream. > > I guess I could do a wrapper class for that, wrapping a subscription with a list > of prior events to provide before the real events start. Still complicated, but > I can see the wish for a `hasNext`. It's unlikely that someone will call [hasNext] followed immediately by [rest]—at that point why not just call [rest] on its own and check for an early completion? So you can probably afford to make that case do a bunch of extra wrapping to keep it simple. For example: var controller = new StreamController(sync: true); controller.add(_bufferedEvent); controller.addStream(new SubscriptionStream(_subscription)); return controller.stream;
https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file And, to make it even more obvious that this is a good idea, I actually do want a StreamGroup instead of a StreamCompleter for implementing hasNext, because then I can just add a stream of the initial event, and then the remaining stream. The specialized StreamCompleter can still optimize the one case where you set the stream before listening, where you can forward the listen directly to the source stream without an intermediate controller. So, please do add StreamGroup! https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.dart File lib/src/stream_events.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:17: /// The individual requests are served in the order they are requested. On 2015/06/12 01:24:25, nweiz wrote: > Explain how this is different from a [StreamIterator]. Also, document that the > underlying stream is paused as long as there are no pending requests. Done. https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_events.d... lib/src/stream_events.dart:37: class StreamEvents<T> { I'm trying out StreamQueue now. It is slightly irksome because it sounds like a queue of streams. StreamEventQueue would sound like a queue of stream events (which is closer to the truth) and "event queue" is a concept on its own which helps distance it from the collection Queue. Let's see if I can get used to StreamQueue.
https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/40001/lib/src/stream_complete... lib/src/stream_completer.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file On 2015/06/17 11:08:29, Lasse Reichstein Nielsen wrote: > And, to make it even more obvious that this is a good idea, I actually do want a > StreamGroup instead of a StreamCompleter for implementing hasNext, because then > I can just add a stream of the initial event, and then the remaining stream. > > The specialized StreamCompleter can still optimize the one case where you set > the stream before listening, where you can forward the listen directly to the > source stream without an intermediate controller. > > So, please do add StreamGroup! > Oh dang, I thought I published that CL yesterday, but I forgot. Here it is: https://codereview.chromium.org/1178793006/
https://codereview.chromium.org/1149563010/diff/80001/lib/src/delegating_stre... File lib/src/delegating_stream_subscription.dart (right): https://codereview.chromium.org/1149563010/diff/80001/lib/src/delegating_stre... lib/src/delegating_stream_subscription.dart:70: class MutableDelegatingStreamSubscriptionController<T> { I'm not using it any more, so I'll just remove it. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:36: final Stream<T> stream = new _CompleterStream<T>(); Ack. Done. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:49: onError: (e, s) { On 2015/06/16 01:05:23, nweiz wrote: > "e, s" -> "_" Done. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:75: /// is paused as many times, and then all events and callbacks are forwarded On 2015/06/16 01:05:23, nweiz wrote: > It's kind of strange that this forwards the same *number* of pauses to the > source stream. StreamController folds all pauses on its subscription into a > single [onPaused] event, and it feels like that's what should happen here. If you pause three times, you need to resume three times to continue listening. The controller will only be called once, it doesn't need to know how many times the subscription is paused, just that it shouldn't push any events on it. Here the subscription is a proxy for another, later, subscription, so it should record the actual number of pauses and ensure that the real subscription requires the same number of resumes. (Rewritten completely anyway, so no longer an issue). > At > least this shouldn't guarantee the number of pause calls that are passed to the > subscription. We have to. But we shouldn't put it in the public documentation - it should just work behind the scenes. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:109: /// Flag marking that the stream has been listened to. Gone. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:153: /// but the source stream has not been set to yet (state is [_listenerOnly]); On 2015/06/16 01:05:23, nweiz wrote: > ";" -> "." Acknowledged. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:175: void _setListened(MutableDelegatingStreamSubscriptionController subscription) { Class really needs a better name. I think I'll remove the class again, now that I don't use it. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:219: controller.sourceSubscription = Because of the cyclic dependency, the state object refers to the controller so it can forward delayed calls back through it. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:228: if (_state == _streamOnly) { Then I think I should also put the final throw in an "else" block. I thought that would make the analyzer complain that my non-void function didn't end with a return or throw, but apparently that's not a problem. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:264: /// This both marks the subscription as canceled and allows returning On 2015/06/16 01:05:23, nweiz wrote: > Nit: newline above this. Acknowledged. https://codereview.chromium.org/1149563010/diff/80001/lib/src/stream_complete... lib/src/stream_completer.dart:302: var cancelFuture = new Future.value(); Yes, sadly. That's a consequence of backwards compatibility - we added the future after cancel had already existed for a while with a void return type, so we had to accept null. It's annoying to have to check, so I try to not return null, in the naïve hope that we can eventually get rid of nulls. Ooooh, and non-null-by-default might actually get us there!
PTAL again. All bit-fields are gone now (*sniff*).
This is looking great :). https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_complet... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_complet... lib/src/stream_completer.dart:120: cancelOnError: cancelOnError); As written, if a completer is completed with a broadcast stream and the user calls [listen] multiple times on the result stream, that will throw a StateError if the first listen is before the stream is completed and succeed if it's after. The easiest way to work around this is probably to only directly call [listen] if the source stream is non-broadcast, so that [StreamCompleter.stream] is always single-subscription no matter what. Also, that would be a good thing to document. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:19: /// and after all previous requestes have been fulfilled, stream events "requestes" -> "requests" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:23: /// will be completed by the next two unreserved events from the stream. "unreserved" -> "unrequested"? Just to keep the terminology consistent https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:33: /// and the stream subscription is paused when there are no active requests. Both of these are already mentioned above, so this sentence is probably unnecessary. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:44: /// out of a stream and continue using the rest of the stream as a stream. "continue using" -> "continuing to use" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:70: /// such a stream and stopping to request events, will cause memory to fill up "stopping" -> "ceasing", get rid of the comma https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:71: /// unnecessarily. This paragraph also seems redundant with the sixth paragraph above. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:75: // the current event queue when it becomes active. "it" -> "the request" (to clarify whether you're referring to that or the event queue) https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:76: // If it returns true, it's done and will be removed from the request queue. "it" -> "the request" again. The next "it" is fine, though https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:111: /// Access through methods below to ensure consistency. Nit: add a newline above https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:114: /// Create a `StreamQueue` of the events of source. "source" -> "[source]" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:120: /// before the first request. Consider allowing "prefetch: -1" to mean "never pause the underlying stream". https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:121: StreamQueue(Stream source, {int prefetch: 0}) Consider making [prefetch] default to null and assigning it to 0 in the body. For non-boolean optional/named parameters, always defaulting to null makes it easier for wrapping functions to clearly say "use the default value" without encoding their own default. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:135: _HasNextRequest hasNextRequest = new _HasNextRequest(); Nit: "var" (also below) https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:155: /// first events that were not used by previous requeusts. "used" -> "consumed" (slightly more correct for requests like [hasNext] that technically look at an event but don't consume it) https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:171: /// Using `rest` closes the stream events object. After getting the "the stream events object" -> "this stream queue" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:284: /// Add a new request to the queue. "Add" -> "Adds" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:298: Nit: extra newline https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:303: /// Resumes subscription on [_sourceStream], or create it if necessary. "create" -> "creates" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:315: /// Remove all requests and close them. "Remove" -> "Removes", "close" -> "closes" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:325: if (!request.addEvents(_eventQueue)) { Isn't this guaranteed to return false? [request.addEvents] was presumably already called with the same queue when the last event was added, and it must have returned false then or it would have been removed from the request queue. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:347: /// Extracts the subscription and makes the events object unusable. "events object" -> "stream queue" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:352: StreamSubscription subscription = _subscription; Nit: "var" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:377: /// should only access the queue from the start, e.g., using [removeFirst]. "access the queue from the start" -> "remove events from the start of the queue" (alow below) I'm assuming it's legal to look ahead in the queue without removing events, as in your example above of consuming events if the fifth one matches. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:379: /// Returns `true` if if the request is completed, or `false` if it needs "if if" -> "if" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:385: /// queue, and if it returns `false`, it's called again every time an event "an event" -> "a new event" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:386: /// becomes available. "and finally when the stream closes" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:398: /// If the requests kept events in the queue after an [addEvents] call, "requests" -> "request" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:410: /// Set to `null` when it is completed, to mark it as already complete. This is no longer accurate. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:413: _NextRequest() : _completer = new Completer<T>(); Nit: assign _completer in the declaration https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:424: _completer.completeError(new StateError("no elements")); "no" -> "No" Also include a stack trace here so the user can tell this came from a StreamQueue. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:555: _RestRequest(this._streamQueue) : _completer = new StreamCompleter<T>(); Nit: newline above. Also move [_completer]'s initialization to its declaration. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:557: /// The future which will contain the remaining events of [_streamQueue]. "future" -> "stream" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:570: Stream getRestStream() { Why is this a local method (as opposed to a private instance method)? It doesn't close over any local state. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:572: return new Stream<T>.empty(); If you're going to use this here, you'll need to upgrade the SDK constraint in the pubspec to exclude SDKs that don't support it. What's probably a better course of action, though, is to avoid it entirely for now until [Stream.empty] has been released in a stable version. If you release this package now using [Stream.empty], other packages won't be able to use the new APIs until SDKs including [Stream.empty] are widely-available. This would especially suck for me, since I want [FutureGroup] for the test package. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:590: StreamController controller = new StreamController<T>(); Nit: "var" https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:591: for (var event in events) event.addTo(controller); I like this style of loop, but unfortunately it's against the style guide: https://www.dartlang.org/articles/style-guide/#do-use-curly-braces-for-all-fl... https://codereview.chromium.org/1149563010/diff/180001/lib/src/subscription_s... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/subscription_s... lib/src/subscription_stream.dart:47: {bool isCancelOnError: false}) I thought you were getting rid of this? https://codereview.chromium.org/1149563010/diff/180001/test/all.dart File test/all.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/all.dart#newcode1 test/all.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file Now that this is using the new test runner, this isn't necessary. "pub run test" will run all the tests. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... File test/stream_completer_test.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:19: Future done = completer.stream.toList(); Nit: "var" https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:33: test("listen and pause before linking stream", () async { Also test that this pause triggers "onPause" in the source stream. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:70: var controller = new StreamController(onListen: () { listened = true; }, It's not actually in the style guide right now, but I asked Bob and Kathy and the intention is that "() { ... }" isn't allowed (since there should always be a newline after a statement). I think Kathy's going to update the style guide to reflect that. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:74: (v) { "v" -> "value" https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:123: test("source stream isn't listened to until completer stream is.", () async { Nit: no period https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:125: bool listened = false; Nit: "var" https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:129: () async { controller.close(); } (); // In later microtask. I think "scheduleMicrotask" is a little more explicit here. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:265: test("Listening more than once after setting stream", () async { Nit: "listening" (also below) https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:344: Future flushMicrotasks() => new Future.delayed(Duration.ZERO); Consider moving this (and maybe [unreachable] and [UnusuableStream] as well) into a test/utils.dart file. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... File test/stream_queue_test.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:16: expect(controller.hasListener, isFalse); I didn't realize these getters existed! Consider using these instead of setting variables from the callbacks in some of the tests for StreamCompleter. I'll switch over my StreamGroup tests as well. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:32: expect(controller.hasListener, isFalse); A few blank lines would go a long way in making this code easier to read. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:57: expect(events.next, throws); Check the error's value. It would be awkward if the error-handling code had a bug and this were catching an ArgumentError or something. (Also below) https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:145: sequence(expectedValue, sequenceIndex) => (v) { "v" -> "value" https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:155: await Future.wait([skip1, skip2, skip3, skip4]); Won't the previous await wait for all the futures to complete anyway? https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:171: expect(events.take(0), completion([])); I tend to prefer "isEmpty" to "equals([])". It produces slightly nicer output when it fails. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:301: }); Another test that could use some blank lines. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:338: test("true false at end", () async { "true false" -> "false" https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:570: } Also test passing in "prefetch". https://codereview.chromium.org/1149563010/diff/180001/test/subscription_stre... File test/subscription_stream_test.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/subscription_stre... test/subscription_stream_test.dart:22: var c = new Completer(); "c" -> "completer" https://codereview.chromium.org/1149563010/diff/180001/test/subscription_stre... test/subscription_stream_test.dart:29: Stream<int> ss = await c.future; "ss" -> "stream" or "subscriptionStream"
Finally got back to this CL. PTAL! https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_complet... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_complet... lib/src/stream_completer.dart:120: cancelOnError: cancelOnError); On 2015/06/18 23:44:25, nweiz wrote: > As written, if a completer is completed with a broadcast stream and the user > calls [listen] multiple times on the result stream, that will throw a StateError > if the first listen is before the stream is completed and succeed if it's after. > > The easiest way to work around this is probably to only directly call [listen] > if the source stream is non-broadcast, so that [StreamCompleter.stream] is > always single-subscription no matter what. Also, that would be a good thing to > document. Acknowledged. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:19: /// and after all previous requestes have been fulfilled, stream events On 2015/06/18 23:44:26, nweiz wrote: > "requestes" -> "requests" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:23: /// will be completed by the next two unreserved events from the stream. On 2015/06/18 23:44:25, nweiz wrote: > "unreserved" -> "unrequested"? Just to keep the terminology consistent Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:33: /// and the stream subscription is paused when there are no active requests. On 2015/06/18 23:44:27, nweiz wrote: > Both of these are already mentioned above, so this sentence is probably > unnecessary. Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:44: /// out of a stream and continue using the rest of the stream as a stream. On 2015/06/18 23:44:27, nweiz wrote: > "continue using" -> "continuing to use" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:71: /// unnecessarily. On 2015/06/18 23:44:26, nweiz wrote: > This paragraph also seems redundant with the sixth paragraph above. Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:75: // the current event queue when it becomes active. On 2015/06/18 23:44:27, nweiz wrote: > "it" -> "the request" (to clarify whether you're referring to that or the event > queue) Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:76: // If it returns true, it's done and will be removed from the request queue. On 2015/06/18 23:44:26, nweiz wrote: > "it" -> "the request" again. The next "it" is fine, though Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:111: /// Access through methods below to ensure consistency. On 2015/06/18 23:44:26, nweiz wrote: > Nit: add a newline above Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:114: /// Create a `StreamQueue` of the events of source. On 2015/06/18 23:44:26, nweiz wrote: > "source" -> "[source]" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:120: /// before the first request. I don't like -1 being magical, the alternative would be to use null, which is no less magical. And not pausing would be a great default for broadcast streams anyway. Maybe having the prefetch is silly. I can make a BufferStream class (and I plan to do so) that allows you to let the source stream keep a buffer until it pauses. That's better than putting it into all the classes that consume streams. So, I'll remove it for now. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:121: StreamQueue(Stream source, {int prefetch: 0}) True. I'll just remove it for now (effectively fixing the value to 0). https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:135: _HasNextRequest hasNextRequest = new _HasNextRequest(); On 2015/06/18 23:44:26, nweiz wrote: > Nit: "var" (also below) Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:155: /// first events that were not used by previous requeusts. On 2015/06/18 23:44:26, nweiz wrote: > "used" -> "consumed" (slightly more correct for requests like [hasNext] that > technically look at an event but don't consume it) Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:171: /// Using `rest` closes the stream events object. After getting the On 2015/06/18 23:44:26, nweiz wrote: > "the stream events object" -> "this stream queue" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:284: /// Add a new request to the queue. On 2015/06/18 23:44:26, nweiz wrote: > "Add" -> "Adds" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:298: On 2015/06/18 23:44:26, nweiz wrote: > Nit: extra newline Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:303: /// Resumes subscription on [_sourceStream], or create it if necessary. On 2015/06/18 23:44:26, nweiz wrote: > "create" -> "creates" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:315: /// Remove all requests and close them. On 2015/06/18 23:44:26, nweiz wrote: > "Remove" -> "Removes", "close" -> "closes" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:325: if (!request.addEvents(_eventQueue)) { It's only guaranteed to return false for the first request in the queue. Later requests in this loop may return true. I found doing it this way simpler than duplicating the core functionality in the close call. This way the addEvents method is always called at least once, and the close method is only called when the addEvents method has returned false on the current events, so it doesn't have to check those again. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:347: /// Extracts the subscription and makes the events object unusable. On 2015/06/18 23:44:26, nweiz wrote: > "events object" -> "stream queue" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:352: StreamSubscription subscription = _subscription; On 2015/06/18 23:44:25, nweiz wrote: > Nit: "var" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:377: /// should only access the queue from the start, e.g., using [removeFirst]. On 2015/06/18 23:44:27, nweiz wrote: > "access the queue from the start" -> "remove events from the start of the queue" > (alow below) > > I'm assuming it's legal to look ahead in the queue without removing events, as > in your example above of consuming events if the fifth one matches. Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:379: /// Returns `true` if if the request is completed, or `false` if it needs On 2015/06/18 23:44:27, nweiz wrote: > "if if" -> "if" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:385: /// queue, and if it returns `false`, it's called again every time an event On 2015/06/18 23:44:25, nweiz wrote: > "an event" -> "a new event" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:386: /// becomes available. On 2015/06/18 23:44:27, nweiz wrote: > "and finally when the stream closes" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:398: /// If the requests kept events in the queue after an [addEvents] call, On 2015/06/18 23:44:26, nweiz wrote: > "requests" -> "request" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:413: _NextRequest() : _completer = new Completer<T>(); On 2015/06/18 23:44:26, nweiz wrote: > Nit: assign _completer in the declaration Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:424: _completer.completeError(new StateError("no elements")); On 2015/06/18 23:44:27, nweiz wrote: > "no" -> "No" > > Also include a stack trace here so the user can tell this came from a > StreamQueue. Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:555: _RestRequest(this._streamQueue) : _completer = new StreamCompleter<T>(); On 2015/06/18 23:44:27, nweiz wrote: > Nit: newline above. > > Also move [_completer]'s initialization to its declaration. Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:557: /// The future which will contain the remaining events of [_streamQueue]. On 2015/06/18 23:44:26, nweiz wrote: > "future" -> "stream" Done. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:570: Stream getRestStream() { It's only used here, so I didn't see a need to give it a larger scope. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:572: return new Stream<T>.empty(); Ack. I SOOO wish 1.11 would be released soon. All the goodies I added because I needed them - and then I can't use them anyway. :( https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:591: for (var event in events) event.addTo(controller); Do they want me to write events.forEach((event) => event.addTo(controller)) ? Because that's how they get me to write that. https://codereview.chromium.org/1149563010/diff/180001/lib/src/subscription_s... File lib/src/subscription_stream.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/subscription_s... lib/src/subscription_stream.dart:47: {bool isCancelOnError: false}) Ack, I was. Got lost in the other changes, I guess. https://codereview.chromium.org/1149563010/diff/180001/test/all.dart File test/all.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/all.dart#newcode1 test/all.dart:1: // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file Cool. I thought it was some github testing thing that needed it, but I'm all for removing redundant repetitions. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... File test/stream_completer_test.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:19: Future done = completer.stream.toList(); I have actually found full typing to catch bugs in my tests. But ok :) https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:33: test("listen and pause before linking stream", () async { On 2015/06/18 23:44:27, nweiz wrote: > Also test that this pause triggers "onPause" in the source stream. Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:70: var controller = new StreamController(onListen: () { listened = true; }, That's silly. I'll rewrite it to () => listened = true which is being less precise (it suggests that the return value is significant) just to get around the style guide. So, mark me up as opposed to that ruling. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:74: (v) { On 2015/06/18 23:44:27, nweiz wrote: > "v" -> "value" Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:123: test("source stream isn't listened to until completer stream is.", () async { On 2015/06/18 23:44:27, nweiz wrote: > Nit: no period Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:125: bool listened = false; On 2015/06/18 23:44:27, nweiz wrote: > Nit: "var" Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:129: () async { controller.close(); } (); // In later microtask. And it has exactly the same length. Scary. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:265: test("Listening more than once after setting stream", () async { On 2015/06/18 23:44:27, nweiz wrote: > Nit: "listening" (also below) Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_completer_... test/stream_completer_test.dart:344: Future flushMicrotasks() => new Future.delayed(Duration.ZERO); On 2015/06/18 23:44:27, nweiz wrote: > Consider moving this (and maybe [unreachable] and [UnusuableStream] as well) > into a test/utils.dart file. Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... File test/stream_queue_test.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:16: expect(controller.hasListener, isFalse); Will try. It's not always possible to see the difference between never having had a listener and being canceled in a controller, so at least the listened may be necessary. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:32: expect(controller.hasListener, isFalse); On 2015/06/18 23:44:28, nweiz wrote: > A few blank lines would go a long way in making this code easier to read. Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:57: expect(events.next, throws); On 2015/06/18 23:44:28, nweiz wrote: > Check the error's value. It would be awkward if the error-handling code had a > bug and this were catching an ArgumentError or something. (Also below) Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:145: sequence(expectedValue, sequenceIndex) => (v) { On 2015/06/18 23:44:28, nweiz wrote: > "v" -> "value" Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:155: await Future.wait([skip1, skip2, skip3, skip4]); Yes, this line is redundant. The previous line was probably a rewrite of this line, which should then have been removed. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:171: expect(events.take(0), completion([])); On 2015/06/18 23:44:28, nweiz wrote: > I tend to prefer "isEmpty" to "equals([])". It produces slightly nicer output > when it fails. Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:301: }); On 2015/06/18 23:44:28, nweiz wrote: > Another test that could use some blank lines. Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:338: test("true false at end", () async { On 2015/06/18 23:44:28, nweiz wrote: > "true false" -> "false" Done. https://codereview.chromium.org/1149563010/diff/180001/test/stream_queue_test... test/stream_queue_test.dart:570: } Prefetch was removed. https://codereview.chromium.org/1149563010/diff/180001/test/subscription_stre... File test/subscription_stream_test.dart (right): https://codereview.chromium.org/1149563010/diff/180001/test/subscription_stre... test/subscription_stream_test.dart:29: Stream<int> ss = await c.future; On 2015/06/18 23:44:28, nweiz wrote: > "ss" -> "stream" or "subscriptionStream" Done.
lgtm https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:570: Stream getRestStream() { On 2015/06/30 10:34:14, Lasse Reichstein Nielsen wrote: > It's only used here, so I didn't see a need to give it a larger scope. There are a lot of private methods at the top level of classes that are only used in one place. Factoring them out makes the method that uses them smaller and reduces indentation. https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:572: return new Stream<T>.empty(); On 2015/06/30 10:34:13, Lasse Reichstein Nielsen wrote: > Ack. I SOOO wish 1.11 would be released soon. All the goodies I added because I > needed them - and then I can't use them anyway. :( Yeah, this is one of the burdens of developing in package land. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... lib/src/stream_completer.dart:54: /// is set. Document that this is always single-subscription, regardless of whether the stream is completed with a single-subscription or a broadcast stream. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... lib/src/stream_completer.dart:116: final Completer _doneCompleter = new Completer(); It looks like this is never completed. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... lib/src/stream_completer.dart:149: Future _setSourceStream(Stream<T> sourceStream) { It looks like this return value is unused. Should [StreamCompleter.setSourceStream] return it? https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_queue.d... lib/src/stream_queue.dart:68: // If the request returns true, it's complaete and will be removed from the "complaete" -> "complete"
kevmoo@google.com changed reviewers: + kevmoo@google.com
DBC – don't forget to bump the version in pubspec.yaml and add entries to the changelog https://codereview.chromium.org/1149563010/diff/200001/lib/async.dart File lib/async.dart (right): https://codereview.chromium.org/1149563010/diff/200001/lib/async.dart#newcode12 lib/async.dart:12: export "src/delegating_stream_subscription.dart"; sort these
Also: don't forget to mention the new classes in the CHANGELOG.
https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1149563010/diff/180001/lib/src/stream_queue.d... lib/src/stream_queue.dart:570: Stream getRestStream() { Agree. Moved to private instance method. https://codereview.chromium.org/1149563010/diff/200001/lib/async.dart File lib/async.dart (right): https://codereview.chromium.org/1149563010/diff/200001/lib/async.dart#newcode12 lib/async.dart:12: export "src/delegating_stream_subscription.dart"; On 2015/07/01 00:12:12, kevmoo wrote: > sort these Done. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... File lib/src/stream_completer.dart (right): https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... lib/src/stream_completer.dart:54: /// is set. On 2015/06/30 23:39:48, nweiz wrote: > Document that this is always single-subscription, regardless of whether the > stream is completed with a single-subscription or a broadcast stream. Done. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... lib/src/stream_completer.dart:116: final Completer _doneCompleter = new Completer(); True. It should be completed when the source stream subscription is done or canceled. I don't think it's necessary for anything, so I'll remove it. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_complet... lib/src/stream_completer.dart:149: Future _setSourceStream(Stream<T> sourceStream) { I removed the _doneCompleter entirely. If someone needs to know when a stream is done, they should wrap it instead of expecting the user of the stream to notify them. https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1149563010/diff/200001/lib/src/stream_queue.d... lib/src/stream_queue.dart:68: // If the request returns true, it's complaete and will be removed from the On 2015/06/30 23:39:48, nweiz wrote: > "complaete" -> "complete" Done.
LGTM! I especially appreciate the amount of documentation.
Message was sent while issue was closed.
Committed patchset #12 (id:220001) manually as dc58de5505ed02563370f03f7f17377f2bfc9e98 (presubmit successful). |