|
|
Created:
5 years, 5 months ago by nweiz Modified:
5 years, 4 months ago Reviewers:
Lasse Reichstein Nielsen CC:
reviews_dartlang.org Base URL:
git@github.com:dart-lang/async.git@master Target Ref:
refs/heads/master Visibility:
Public. |
DescriptionAdd StreamQueue.fork and ForkableStream.
StramQueue.fork is a very useful operation for creating complex and composable
user-defined operations over stream queues. It allows arbitrary lookahead to be
performed without modifying the semantics of the original stream, providing for
higher-order operations like "check for this sequence of values or, if they
don't exist, this other distinct sequence".
Committed: https://github.com/dart-lang/async/commit/312d39641225b64b275e57d167b57a87b335654a
Patch Set 1 #
Total comments: 24
Messages
Total messages: 11 (1 generated)
nweiz@google.com changed reviewers: + lrn@google.com
https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dart File lib/src/forkable_stream.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:23: /// then canceled. It seems it will be cancelled immediately when the primary stream is canceled? https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:24: class ForkableStream<T> extends StreamView<T> { AFAICS this differs from just making it a broadcast stream only in the pause behavior - if all subscriptions are paused, then the original is paused. How about just making it a special broadcast stream that forwards pauses, instead of adding the extra fork method. That way you don't need to extend the interface, and it will "just work" in any setting that accepts a broadcast stream. You can still cancel the original if at any point after the first subscription, all subscriptions are canceled, and the original can still be a single-sub stream. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:34: bool _isClosed = false; We usually use "isClosed" for when "close" has been called on a controller/the done event has arrived. Use "isCanceled"? https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:71: throw new StateError("Can't fork a closed or canceled stream."); Alternatively just return an empty stream (new Stream<T>.empty()). That's a stream containing the same events as the rest of the primary stream (none). (I don't really consider "done" an event as much as a way of telling someone that there are no more events). This also has the advantage that code won't have to guess whether the forkable stream is closed or not. You should generally not throw a StateError for a state that the caller isn't able to check first, and the caller doesn't know whether the onDone event just arrived one microtask ago. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:109: if (primary) _isClosed = true; So if the primary stream is canceled, you can't fork any more, but existing forks keep running and getting new events until they are all canceled or done? https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:111: if (controller.isClosed) return null; A controller is only closed by the "onDone" handler. That means that if any controller is closed, all controllers are, right? (If that's an invariant you are using, consider documenting it - is it equivalent to _controllers.isEmpty?). https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:139: for (var controller in _controllers.toSet()) { I would use `toList()` here. It should have a lower overhead than creating a new set. Maybe even `toList(growable: false)`. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:158: controller.close(); You may have a race condition here. The _isClosed is only set to true when you close the primary subscription, but that subscription's onDone handler may create a new fork which goes into _controllers after the toSet - so that subscription is not closed, and then it's removed by the .clear() call below while still being active. The primary stream controller is the first one in the (linked) set, so it will be the first one to be called, but it's still a little risky. Maybe set _isClosed to true before the loop to prevent onDone handlers from forking.
https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... lib/src/stream_queue.dart:110: : new ForkableStream(source); Would it be possible to make the fork method without it costing when you don't use it? Maybe after calling fork, all events are forwarded to a different controller as well as being put in the event queue, but if you don't fork, you won't create the forkable stream. Also, if you do "rest" as the first thing, I think we return the original stream, which would now be a ForkableStream.
Message was sent while issue was closed.
Committed patchset #1 (id:1) manually as 312d39641225b64b275e57d167b57a87b335654a (presubmit successful).
Message was sent while issue was closed.
I accidentally landed this CL instead of uploading it for further review. Oops! I'll make another CL with further changes as necessary. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dart File lib/src/forkable_stream.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:23: /// then canceled. On 2015/07/15 20:07:11, Lasse Reichstein Nielsen wrote: > It seems it will be cancelled immediately when the primary stream is canceled? Only if [_controllers] is empty—that is, there are no forks, so the primary stream is the only branch. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:24: class ForkableStream<T> extends StreamView<T> { On 2015/07/15 20:07:11, Lasse Reichstein Nielsen wrote: > AFAICS this differs from just making it a broadcast stream only in the pause > behavior - if all subscriptions are paused, then the original is paused. > > How about just making it a special broadcast stream that forwards pauses, > instead of adding the extra fork method. That way you don't need to extend the > interface, and it will "just work" in any setting that accepts a broadcast > stream. > > You can still cancel the original if at any point after the first subscription, > all subscriptions are canceled, and the original can still be a single-sub > stream. I think the idea of a forkable stream is clearer than a broadcast stream that has special pause behavior *and* cancels the underlying stream when it has no listeners. It seems like that particular combination of behavior would be difficult to justify as an intuitive package, whereas "forking" is a concept that people are pretty familiar with from various contexts. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:34: bool _isClosed = false; On 2015/07/15 20:07:12, Lasse Reichstein Nielsen wrote: > We usually use "isClosed" for when "close" has been called on a controller/the > done event has arrived. Use "isCanceled"? Done. I was trying to use it in the same sense as in StreamQueue. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:71: throw new StateError("Can't fork a closed or canceled stream."); On 2015/07/15 20:07:11, Lasse Reichstein Nielsen wrote: > Alternatively just return an empty stream (new Stream<T>.empty()). > That's a stream containing the same events as the rest of the primary stream > (none). > (I don't really consider "done" an event as much as a way of telling someone > that there are no more events). > > This also has the advantage that code won't have to guess whether the forkable > stream is closed or not. You should generally not throw a StateError for a state > that the caller isn't able to check first, and the caller doesn't know whether > the onDone event just arrived one microtask ago. Done. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:109: if (primary) _isClosed = true; On 2015/07/15 20:07:11, Lasse Reichstein Nielsen wrote: > So if the primary stream is canceled, you can't fork any more, but existing > forks keep running and getting new events until they are all canceled or done? That's right, although you can still fork if you wrapped one of the existing forks in its own ForkableStream, which is what StreamQueue does. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:111: if (controller.isClosed) return null; On 2015/07/15 20:07:11, Lasse Reichstein Nielsen wrote: > A controller is only closed by the "onDone" handler. > That means that if any controller is closed, all controllers are, right? > (If that's an invariant you are using, consider documenting it - is it > equivalent to _controllers.isEmpty?). Not quite—while dispatching _onDone, it's possible for a handler to cancel a subscription, which will get us here before all controllers have been closed and before _controllers.isEmpty is true. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:139: for (var controller in _controllers.toSet()) { On 2015/07/15 20:07:12, Lasse Reichstein Nielsen wrote: > I would use `toList()` here. It should have a lower overhead than creating a new > set. Maybe even `toList(growable: false)`. Done. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:158: controller.close(); On 2015/07/15 20:07:11, Lasse Reichstein Nielsen wrote: > You may have a race condition here. > The _isClosed is only set to true when you close the primary subscription, but > that subscription's onDone handler may create a new fork which goes into > _controllers after the toSet - so that subscription is not closed, and then it's > removed by the .clear() call below while still being active. > > The primary stream controller is the first one in the (linked) set, so it will > be the first one to be called, but it's still a little risky. Because the controller is first and the controller's onCancel is fired before controller.stream fires onDone, this works out okay—there's even a test for it. > Maybe set _isClosed to true before the loop to prevent onDone handlers from > forking. Done. https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... lib/src/stream_queue.dart:110: : new ForkableStream(source); On 2015/07/15 20:10:51, Lasse Reichstein Nielsen wrote: > Would it be possible to make the fork method without it costing when you don't > use it? > Maybe after calling fork, all events are forwarded to a different controller as > well as being put in the event queue, but if you don't fork, you won't create > the forkable stream. > > Also, if you do "rest" as the first thing, I think we return the original > stream, which would now be a ForkableStream. I don't think this is possible without drastically complicating the implementation. The reason I created ForkableStream in the first place was because the complexity overhead of trying to manage a bunch of fork information in StreamQueue, including pausing and closing the underlying stream at the right time, became unmanageable. Hopefully forwarding via a single synchronous StreamController isn't that much overhead, though, since that's a common pattern across most higher-order stream manipulation.
Message was sent while issue was closed.
https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dart File lib/src/forkable_stream.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:62: Stream<T> fork() => _fork(primary: false); I don't think fork belongs on Stream. A stream is a passive object that you can ask for a subscription, the subscription is the active part and the one that has the position that the fork will also continue from. A single subscription stream does not know where its subscription is, so forking the stream should not retain the subscription's position. The fork should, if anything, be on the subscription. I admit we do obscure the distinction between stream and subscription by having broadcast streams that act like they do have state, since subscribing later only gets later events, and single subscription streams are indistinguishable from their subscription because there is a one-to-one correspondance. https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... lib/src/stream_queue.dart:110: : new ForkableStream(source); I still think this could/should be handled at a different level than wrapping the original stream. For example you could "fork" the subscription when needed, into two subscriptions providing the same values, and with individual buffering and shared pause if all are paused. Then you could replace the original subscription with one fork and convert the other fork to a stream using SubscriptionStream. Or have a ForkableSubscription wrapper that you apply the first time you fork the stream queue, then you can do further forks on that. That way you would still only pay if you use it. See example: https://codereview.chromium.org/1238503004/ When the stream is active, it's actually the subscription object that's the active part, the Stream object can be discarded. https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... lib/src/stream_queue.dart:230: StreamQueue<T> fork() { I have another problem with fork: It may cause buffering. The StreamQueue is desinged so that it reads events until it can complete a request. It never buffers events that it doesn't need. When you fork, you have two independent consumers of the same events, so unless they are in perfect sync, one of them will have to buffer events while its paused - you add events to a paused completer beause you have nowhere else to put them. So if you fork and don't use the forked queue, it will accumulate events forever. I don't like it when that can happen due to an innocuously looking API function - it should be something you opt in to deliberately, not by accident. For that reason, I'd prefer to not have fork at all.
Message was sent while issue was closed.
So, in general, I'm not convinced. Maybe revert it for now, and we can look at it again after vacations?
Message was sent while issue was closed.
I've reverted the change, but I do think this is very important for the reasons outlined below. https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dart File lib/src/forkable_stream.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... lib/src/forkable_stream.dart:62: Stream<T> fork() => _fork(primary: false); On 2015/07/16 14:01:23, Lasse Reichstein Nielsen wrote: > I don't think fork belongs on Stream. > > A stream is a passive object that you can ask for a subscription, the > subscription is the active part and the one that has the position that the fork > will also continue from. > A single subscription stream does not know where its subscription is, so forking > the stream should not retain the subscription's position. > The fork should, if anything, be on the subscription. > > > I admit we do obscure the distinction between stream and subscription by having > broadcast streams that act like they do have state, since subscribing later only > gets later events, and single subscription streams are indistinguishable from > their subscription because there is a one-to-one correspondance. I disagree. It's important that the fork can be created *before* the stream has been listened to, it's important that the fork itself doesn't start in a listened state, and it's important that the fork can be passed to code expecting a Stream. None of this works if only the subscription is forkable. I understand your uneasiness with considering the state of the subscription tightly bound to the state of the stream, but I really do think it's the best way to go. This correspondence already exists implicitly; even single-subscription streams preserve state about whether [listen] is called. It's not a huge leap to have them also preserve state about whether events have been emitted. https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart File lib/src/stream_queue.dart (right): https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... lib/src/stream_queue.dart:110: : new ForkableStream(source); On 2015/07/16 14:01:23, Lasse Reichstein Nielsen wrote: > I still think this could/should be handled at a different level than wrapping > the original stream. > > For example you could "fork" the subscription when needed, into two > subscriptions providing the same values, and with individual buffering and > shared pause if all are paused. Then you could replace the original subscription > with one fork and convert the other fork to a stream using SubscriptionStream. > > Or have a ForkableSubscription wrapper that you apply the first time you fork > the stream queue, then you can do further forks on that. > That way you would still only pay if you use it. > > See example: https://codereview.chromium.org/1238503004/ > > When the stream is active, it's actually the subscription object that's the > active part, the Stream object can be discarded. In your example, just forking a stream causes it to listen, which is a fundamental problem with dealing with subscriptions—they only exist when a stream has been listened to. The underlying stream should only be listened to once someone has actually requested a value from it; that shouldn't change based on how many forks exist. https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... lib/src/stream_queue.dart:230: StreamQueue<T> fork() { On 2015/07/16 14:01:23, Lasse Reichstein Nielsen wrote: > I have another problem with fork: It may cause buffering. > > The StreamQueue is desinged so that it reads events until it can complete a > request. It never buffers events that it doesn't need. > > When you fork, you have two independent consumers of the same events, so unless > they are in perfect sync, one of them will have to buffer events while its > paused - you add events to a paused completer beause you have nowhere else to > put them. > So if you fork and don't use the forked queue, it will accumulate events > forever. > I don't like it when that can happen due to an innocuously looking API function > - it should be something you opt in to deliberately, not by accident. > > For that reason, I'd prefer to not have fork at all. I can't emphasize enough how important forking is for creating higher-order operations over stream queues, or how important those higher-order operations are for testing streams. Right now we have *no* standard facilities for making assertions against streams other than converting them to lists, which obviously doesn't work for things like a process's standard output. The closest thing we have—what pub uses—is scheduled_test's ScheduledStream and stream matchers, which are essentially the same idea as StreamQueue. These actually work really well, and there's one thing at the heart that makes that possible: forking. Without fork(), we couldn't write matchers with the flexibility necessary to handle the heterogeneous output that crops up in real-world applications. I also think it's pretty clear when buffering will happen—users expect there to be buffering somewhere when there's an unlistened single-subscription stream, and the same goes for a StreamQueue. It's also pretty likely that even a single stream queue will have some buffering when it's paused, and there's even less visibility into that. Ultimately—and I say this as one of the heaviest users of Dart streams in practice—I don't think that buffering is a huge concern for users as long as it's reasonably straightforward to track down where it's happening and fix it if it becomes a problem.
Message was sent while issue was closed.
On 2015/07/17 20:30:16, nweiz wrote: > I've reverted the change, but I do think this is very important for the reasons > outlined below. > > https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dart > File lib/src/forkable_stream.dart (right): > > https://codereview.chromium.org/1241723003/diff/1/lib/src/forkable_stream.dar... > lib/src/forkable_stream.dart:62: Stream<T> fork() => _fork(primary: false); > On 2015/07/16 14:01:23, Lasse Reichstein Nielsen wrote: > > > > I admit we do obscure the distinction between stream and subscription by > having > > broadcast streams that act like they do have state, since subscribing later > only > > gets later events, and single subscription streams are indistinguishable from > > their subscription because there is a one-to-one correspondance. > > I disagree. It's important that the fork can be created *before* the stream has > been listened to, it's important that the fork itself doesn't start in a > listened state, and it's important that the fork can be passed to code expecting > a Stream. None of this works if only the subscription is forkable. That's a forkable stream in general, not its use in StreamQueue. I can accept a ForkableStream. I would prefer a different design (a stream factory that can create forks of a source stream rather than making the stream itself forkable). Passing a Stream to something, and then deep down in recursive calls checking to see if that Stream is a ForkableStream, is not a pretty. Having the class StreamForker { Stream fork(); void close(); } would allow you to provide a source of streams to code expecting that, and they can then pass pure streams to other code. You can always create a new StreamForker from a Stream, and the StreamForker may recognize the stream as already being a fork and merging itself with the original forker. > I understand your uneasiness with considering the state of the subscription > tightly bound to the state of the stream, but I really do think it's the best > way to go. This correspondence already exists implicitly; even > single-subscription streams preserve state about whether [listen] is called. > It's not a huge leap to have them also preserve state about whether events have > been emitted. I agree that I might be quite alone with that particular view on Streams/Subscriptions. It is also a consistent view to identify the two, the way things have been designed. > > https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart > File lib/src/stream_queue.dart (right): > > https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... > lib/src/stream_queue.dart:110: : new ForkableStream(source); > On 2015/07/16 14:01:23, Lasse Reichstein Nielsen wrote: ... > > > > See example: https://codereview.chromium.org/1238503004/ > > > > When the stream is active, it's actually the subscription object that's the > > active part, the Stream object can be discarded. > > In your example, just forking a stream causes it to listen, which is a > fundamental problem with dealing with subscriptions—they only exist when a > stream has been listened to. The underlying stream should only be listened to > once someone has actually requested a value from it; that shouldn't change based > on how many forks exist. That's because I'm only looking at the use in StreamQueue. When you make a request on a StreamQueue, you do listen to the stream. Maybe the problem is that "fork" looks like a request. When you use it as the first request, it doesn't act as a request as much as as a clone operation on the stream queue. In any case, for stream-queue this can be fixed by using both the ForkableStream and ForkableSubscription approaches, depending on whether the original stream has been listened to or not. That would only add overhead if you actually use "fork", and not at all if you don't. For library code, unnecessary overhead should not be accepted, even if it's "small", because the user can't fix the code if it turns out to be a bottleneck. That's different from "application code" where you are writing something that you are going to run yourself - you can go back and rewrite parts after profiling. Libraries don't have that luxury, and should aim to not have inefficiencies if they can be avoided. I'd still prefer to not have the fork operation on StreamQueue. I'd at least prefer to delegate it to a ForkableStreamQueue. How about fork returning StreamQueues based on the original, but not based on a forkable stream? It's the buffering behavior of the forkable stream that I object to (even if it's perfectly reasonable for some direct uses of Stream, it doesn't fit with StreamQueue to keep events after all the queues have seen them). So, implement forking on StreamQueue directly, with the fork and the original both getting all events as long as one of them is unpaused, but each can discard the event after it has been processed, and when both have discarded it, the event can be GCed. > > https://codereview.chromium.org/1241723003/diff/1/lib/src/stream_queue.dart#n... > lib/src/stream_queue.dart:230: StreamQueue<T> fork() { > On 2015/07/16 14:01:23, Lasse Reichstein Nielsen wrote: > > I have another problem with fork: It may cause buffering. ... > > > > For that reason, I'd prefer to not have fork at all. > > I can't emphasize enough how important forking is for creating higher-order > operations over stream queues, or how important those higher-order operations > are for testing streams. Right now we have *no* standard facilities for making > assertions against streams other than converting them to lists, which obviously > doesn't work for things like a process's standard output. Why does it not work? (I.e., not obvious to me). If it's because you need to see stdout before the process is done, then I can see the problem. It could be solved by a Queue<Result>, but I can see that we don't have a direct way to get that (resultsLib.captureStream(stream).forEach(queue.add)) I might prefer to not keep it all in memory if it's large, but the forkable stream will do that anyway. The StreamQueue was designed as a tool to allow inspecting a stream's events individually, specifically for testing. If you need to run two tests on the same stream, I can see the use of "fork". > The closest thing we > have—what pub uses—is scheduled_test's ScheduledStream and stream matchers, > which are essentially the same idea as StreamQueue. These actually work really > well, and there's one thing at the heart that makes that possible: forking. > Without fork(), we couldn't write matchers with the flexibility necessary to > handle the heterogeneous output that crops up in real-world applications. > > I also think it's pretty clear when buffering will happen—users expect there to > be buffering somewhere when there's an unlistened single-subscription stream, They should not assume buffering, and any buffering of an unlistened stream is likely to be a bug or leak. Until you listen, nothing should happen. I really, really regret allowing adding to a StreamController before its stream has been listened to. Buffering should not be implicit because it can take arbitrary amounts of memory. If a program wants to buffer, it can do so (a Queue<Result> is a great buffer of events). Pausing a stream subscription can cause buffering, but only if the source does not honor the pause, a well behaved source should not cause any unnecessary buffering, So, I'm still not sold on having a "fork" operation that implicitly stores all events indefinitely (until *all* forks have completed, if just one of them pauses just before the done event, all events since the fork happened are kept alive). I'm not against doing that, but it should not be hidden in an innocuously looking "fork" method. That's too expensive an operation to add to an otherwise lightweight class. > and the same goes for a StreamQueue. It's also pretty likely that even a single > stream queue will have some buffering when it's paused, It should not. A stream created using async* should pause when it is ready to create the next event, and wait until it's resumed. A use of StreamController that ignores the onPause callback may cause buffering, but that's just not good design. > and there's even less visibility into that. Not really, no. You have one stream and it's paused. It may be buffering (but shouldn't be) and it will start again immediately when you resume, and when it's done, there won't be anything buffered. The forkable stream keeps all the events alive because another fork may be created later. There is no centralized way to control it, you have to cancel each individual stream by itself (what happens if a stream is created by forking and never listened to - will it too keep the events alive?). > Ultimately—and I say this as one of the heaviest users of > Dart streams in practice—I don't think that buffering is a huge concern for > users as long as it's reasonably straightforward to track down where it's > happening and fix it if it becomes a problem. The problem with library code is that a user can't fix it. Even if they have the code, there is no way most programmers could (or would) change the code in practice. That's why library code should not have minimal overhead everywhere. Even if fork is on StreamQueue, it should not have any cost if you don't use it. Even if it's only a single extra controller, it does matter, and if the user finds that that extra controller is his bottleneck, the only solution is to stop using the library. So, you can add ForkableStream which preserves all events, but I would prefer not to use it in StreamQueue. I'd rather build the fork operation directly for that.
Message was sent while issue was closed.
I've been trying to see if I can find a way to make a forkable stream queue without overhead for, and without complicating, the non-forked use case. So far it hasn't worked out very well. I will maintain that forking a stream is not something that belongs on the stream. There is nothing wrong with taking a stream and explicitly creating more streams from its events, but having it implicit in the stream (by having a "fork" on the stream) is hiding the complexity. The Stream has one job - deliver events. Putting extra functionality onto the stream is muddying that and creating a new type of stream that nobody else knows about anyway. Same for StreamQueue - it's intended as a better StreamIterator, and it provides the events of a single stream. I believe that forking is useful when testing, but I don't see a way of making it a simple and efficient basic operation, so perhaps we should instead make something specialized for testing instead of trying to fit testing functionality into existing classes. All in all, I don't think forkable stream or stream-queue is a good enough idea and/or choice of design that it is worth the complexity that it introduces. |