|
|
Created:
5 years, 6 months ago by nweiz Modified:
5 years, 6 months ago Reviewers:
Lasse Reichstein Nielsen CC:
reviews_dartlang.org Base URL:
git@github.com:dart-lang/async.git@master Target Ref:
refs/heads/master Visibility:
Public. |
DescriptionAdd a StreamGroup class for merging streams.
R=lrn@google.com
Committed: https://github.com/dart-lang/async/commit/04a8d2e7379790fe85a0894dbb9ac94b0c2231c4
Patch Set 1 #
Total comments: 46
Patch Set 2 : Code review changes #
Total comments: 9
Patch Set 3 : Code review changes #
Messages
Total messages: 9 (1 generated)
nweiz@google.com changed reviewers: + lrn@google.com
Looks good except I think there is a problem with single-sub streams in broadcast groups! https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart File lib/src/stream_group.dart (right): https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:17: /// [stream] will be a single-subscription stream for groups constructed with I find "groups" confusing - using "a group" or "a `StreamGroup`" might be better. Maybe reword as: [stream] is a stream providing the events of the streams added to the [StreamGroup]. If the `StreamGroup` is created using [new StreamGroup], [stream] is a single-subscription stream. If the `StreamGroup` is created using [StreamGroup.broadcast], [stream] is a broadcast stream. In this case ... https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:55: /// dormant. Perhaps state that the map is used to look up the subscription of a stream in order to remove a stream from the group. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:55: /// dormant. It seems that it will still contain single-sub group members of a broadcast group that have been listened to, even while dormant? https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:95: /// a Future, it will be returned from [add]. Otherwise, [add] returns `null`. [Future] https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:104: _streams.add(stream); Here you can add the same stream more than once while dormant, and fixes that by using putIfAbsent when activating them. How about using *only* the map, but having null subscriptions for streams while dormant? That avoid the problems with some streams being in _subscriptions and others in _streams. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:109: return stream.listen(null).cancel(); The listen call can fail (for a single-sub stream already listened to). I Guess that's acceptable. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:124: /// been emitted through this group will not be, even if they were added to Maybe drop the ", even if ..." part, I think it only makes the message less crisp. You cancel now => you don't see any more events from that stream. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:127: /// If [stream]'s subscription was canceled, this returns was => is https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:131: _streams.remove(stream); If this is a broadcast group and a single-sub stream is still active, it is in _subscriptions, not _streams. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:185: /// A callback called when [stream] is canceled. is canceled -> 's last listener is canceled ? Broadcast streams for the confuzing! https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:193: _streams.add(stream); Neat, cancel the broadcast streams, but let the single-subs keep firing into the void :) Maybe make a comment saying that this is what it does? https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:223: if (_streams.isEmpty) _controller.close(); If you are dormant in a broadcast stream with a single-sub group member, it will still be in _subscriptions, not _streams. You should be able to wait for those events too if you listen again. So, only close controller if _subscriptions is empty too? https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:232: /// An enum of possible states of a [StreamGroup]. Would a real enum work? (Not that I particularly like them, but this enum class seems so simple that it just might work). https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart File test/stream_group_test.dart (right): https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:30: await new Future.delayed(Duration.ZERO); I like having a helper function: flushMicrotasks() => new Future.delayed(Duration.ZERO); so I can just write await flushMicrotasks(); :) https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:35: completion(equals(["first", "second"]))); This expects the events in a particular order - which is safe in this case, but is not really important, it's just an artifact of the internal implementation of the stream controller and the order they were added to the group in (I bet if you swap the group adding order, the results swap too). Maybe just expect them to equal unordered? https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:116: }); Try this test again after listening to the group and pausing the subscription - it shouldn't be different from not listening until after events were gone. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:143: test("forwards a cancel futures", () async { a cancel futures -> a cancel future/cancel futures https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:289: }); Add a test where you: add single-sub stream (create controller for the stream) listen on it add a few events and see that you get them. cancel the subscription add a few events (to be ignored) close the group wait listen again add a few events and see that you get those, and not the ones from while canceled. I think the last step will fail because you closed the group and it didn't notice the open single-sub subscription. Also consider a similar test with both single-sub and broadcast streams in the group. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:290: }); Also test: create broadcast group add single-sub stream add broadcast stream listen cancel remove single-sub stream listen see that you don't get anything from the single-sub stream (I think it's broken because the single-sub stream isn't removed.) https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:292: group("regardless of type", () { Could these tests be run for both broadcast and single-sub groups, to check that it is really independent of the type? https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:403: test("forwards a cancel futures", () async { a .. futures https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:425: streamGroup.add(controller.stream); Check that prior events do get through. So controller.add("first"); await new Future.delayed(Duration.ZERO) controller.add("second"); expect(group.remove(controller.stream), isNull); ... ... completion(equals(["first"])); https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:465: // The subscription to [controller.stream] is canceled synchronously, so Long line :) https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:654: expect(merged.toList(), completion(equals(["first", "second"]))); Again, maybe only compare unordered since the order is accidental.
Code review changes
PTAL https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart File lib/src/stream_group.dart (right): https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:17: /// [stream] will be a single-subscription stream for groups constructed with On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > I find "groups" confusing - using "a group" or "a `StreamGroup`" might be > better. > > Maybe reword as: > > [stream] is a stream providing the events of the streams added to the > [StreamGroup]. > If the `StreamGroup` is created using [new StreamGroup], [stream] is a > single-subscription stream. > If the `StreamGroup` is created using [StreamGroup.broadcast], [stream] is a > broadcast stream. In this case ... > Reworded. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:55: /// dormant. On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > It seems that it will still contain single-sub group members of a broadcast > group that have been listened to, even while dormant? That's true, I'll state that explicitly. > Perhaps state that the map is used to look up the subscription of a > stream in order to remove a stream from the group. Done. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:95: /// a Future, it will be returned from [add]. Otherwise, [add] returns `null`. On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > [Future] Done. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:104: _streams.add(stream); On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > Here you can add the same stream more than once while dormant, and fixes that by > using putIfAbsent when activating them. > > How about using *only* the map, but having null subscriptions for streams while > dormant? That avoid the problems with some streams being in _subscriptions and > others in _streams. > > > Good idea! This makes a lot of stuff cleaner. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:109: return stream.listen(null).cancel(); On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > The listen call can fail (for a single-sub stream already listened to). I Guess > that's acceptable. It would fail if the group weren't canceled, too, so I think that's consistent. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:124: /// been emitted through this group will not be, even if they were added to On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > Maybe drop the ", even if ..." part, I think it only makes the message less > crisp. You cancel now => you don't see any more events from that stream. Done. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:127: /// If [stream]'s subscription was canceled, this returns On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > was => is Done. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:131: _streams.remove(stream); On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > If this is a broadcast group and a single-sub stream is still active, it is in > _subscriptions, not _streams. Merging _subscriptions and _streams made this problem go away. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:185: /// A callback called when [stream] is canceled. On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > is canceled -> 's last listener is canceled > ? > Broadcast streams for the confuzing! Done. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:193: _streams.add(stream); On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > Neat, cancel the broadcast streams, but let the single-subs keep firing into the > void :) > > Maybe make a comment saying that this is what it does? Done. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:223: if (_streams.isEmpty) _controller.close(); On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > If you are dormant in a broadcast stream with a single-sub group member, it will > still be in _subscriptions, not _streams. You should be able to wait for those > events too if you listen again. > > So, only close controller if _subscriptions is empty too? Fixed by merging _subscriptions and _streams. https://codereview.chromium.org/1178793006/diff/1/lib/src/stream_group.dart#n... lib/src/stream_group.dart:232: /// An enum of possible states of a [StreamGroup]. On 2015/06/18 10:29:31, Lasse Reichstein Nielsen wrote: > Would a real enum work? > (Not that I particularly like them, but this enum class seems so simple that it > just might work). Yes, technically, but I tend to avoid those on principle. Even if they work now, in my experience something always comes along later and makes them stop working. At the very least it's nice to be able to do "print(_state)" and have it look nice when debugging. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart File test/stream_group_test.dart (right): https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:30: await new Future.delayed(Duration.ZERO); On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > I like having a helper function: > flushMicrotasks() => new Future.delayed(Duration.ZERO); > so I can just write > await flushMicrotasks(); > :) Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:35: completion(equals(["first", "second"]))); On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > This expects the events in a particular order - which is safe in this case, but > is not really important, it's just an artifact of the internal implementation of > the stream controller and the order they were added to the group in (I bet if > you swap the group adding order, the results swap too). > Maybe just expect them to equal unordered? Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:116: }); On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > Try this test again after listening to the group and pausing the subscription - > it shouldn't be different from not listening until after events were gone. I've added the test, but the actual behavior is that the events *are* buffered. Which I think makes sense--based on your documentation for StreamQueue, a subscription to a broadcast stream buffers while paused. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:143: test("forwards a cancel futures", () async { On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > a cancel futures -> a cancel future/cancel futures Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:289: }); On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > Add a test where you: > add single-sub stream (create controller for the stream) > listen on it > add a few events and see that you get them. > cancel the subscription > add a few events (to be ignored) > close the group > wait > listen again > add a few events and see that you get those, and not the ones from while > canceled. > > I think the last step will fail because you closed the group and it didn't > notice the open single-sub subscription. > > Also consider a similar test with both single-sub and broadcast streams in the > group. Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:290: }); On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > Also test: > create broadcast group > add single-sub stream > add broadcast stream > listen > cancel > remove single-sub stream > listen > see that you don't get anything from the single-sub stream > > (I think it's broken because the single-sub stream isn't removed.) > Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:292: group("regardless of type", () { On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > Could these tests be run for both broadcast and single-sub groups, to check that > it is really independent of the type? Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:403: test("forwards a cancel futures", () async { On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > a .. futures Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:465: // The subscription to [controller.stream] is canceled synchronously, so On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > Long line :) Done. https://codereview.chromium.org/1178793006/diff/1/test/stream_group_test.dart... test/stream_group_test.dart:654: expect(merged.toList(), completion(equals(["first", "second"]))); On 2015/06/18 10:29:32, Lasse Reichstein Nielsen wrote: > Again, maybe only compare unordered since the order is accidental. Done.
Excellent! LGTM! https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.dart File lib/src/stream_group.dart (right): https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:25: /// may drop events if a listener is added and later removed.** Broadcast -> may drop events that arrive while [stream] has no listener ? https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:100: if (_state == _StreamGroupState.dormant) { So, if this is a broadcast streamgroup with no listener, and stream is single-sub, then it won't be listened to until the next time the group gets a listener. I think that's fine - it means that we have single-sub streams in the map both with and without subscriptions, but that shouldn't be a problem. https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:137: for (var stream in _subscriptions.keys.toList()) { You don't need to do toList (or avoid forEach). Updating the value of an existing key is not a modification that breaks iteration - only changing the key-set does that. https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:183: for (var stream in _subscriptions.keys.toList()) { Ditto here, no need for toList. https://codereview.chromium.org/1178793006/diff/20001/test/stream_group_test.... File test/stream_group_test.dart (right): https://codereview.chromium.org/1178793006/diff/20001/test/stream_group_test.... test/stream_group_test.dart:407: await flushMicrotasks(); Maybe check that the single-sub stream is canceled when it's removed.
Code review changes
Message was sent while issue was closed.
Committed patchset #3 (id:40001) manually as 04a8d2e7379790fe85a0894dbb9ac94b0c2231c4 (presubmit successful).
Message was sent while issue was closed.
https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.dart File lib/src/stream_group.dart (right): https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:25: /// may drop events if a listener is added and later removed.** Broadcast On 2015/06/19 12:45:13, Lasse Reichstein Nielsen wrote: > -> may drop events that arrive while [stream] has no listener > ? I think that's less accurate, since events *won't* be dropped when it first has no listener, only later on when the last listener is removed. https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:137: for (var stream in _subscriptions.keys.toList()) { On 2015/06/19 12:45:13, Lasse Reichstein Nielsen wrote: > You don't need to do toList (or avoid forEach). > Updating the value of an existing key is not a modification that breaks > iteration - only changing the key-set does that. Done. https://codereview.chromium.org/1178793006/diff/20001/lib/src/stream_group.da... lib/src/stream_group.dart:183: for (var stream in _subscriptions.keys.toList()) { On 2015/06/19 12:45:13, Lasse Reichstein Nielsen wrote: > Ditto here, no need for toList. Done. https://codereview.chromium.org/1178793006/diff/20001/test/stream_group_test.... File test/stream_group_test.dart (right): https://codereview.chromium.org/1178793006/diff/20001/test/stream_group_test.... test/stream_group_test.dart:407: await flushMicrotasks(); On 2015/06/19 12:45:13, Lasse Reichstein Nielsen wrote: > Maybe check that the single-sub stream is canceled when it's removed. Done. |