Chromium Code Reviews| Index: lib/src/subscription_stream.dart |
| diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..4458b99b3f561b05328ad53bf9fed335ef226f21 |
| --- /dev/null |
| +++ b/lib/src/subscription_stream.dart |
| @@ -0,0 +1,165 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.streams.subscription_stream; |
| + |
| +import 'dart:async'; |
| + |
| +import "delegates.dart"; |
| + |
| +/// A [Stream] adapter for a [StreamSubscription]. |
| +/// |
| +/// This class allows as `StreamSubscription` to be treated as a `Stream`. |
| +/// |
| +/// The subscription is paused until the stream is listened to, |
| +/// then it is resumed and the events are passed on to the |
| +/// stream's new subscription. |
| +/// |
| +/// This class assumes that is has control over the original subscription. |
| +/// If other code is accessing the subscription, results may be unpredictable. |
| +class SubscriptionStream<T> extends Stream<T> { |
| + StreamSubscription _source; |
| + final bool _isCancelOnError; |
| + bool get isBroadcastStream => false; |
| + |
| + /// Create a `Stream` from [subscription]. |
| + /// |
| + /// The subscription should not be paused. This class will not resume prior |
| + /// pauses, so being paused is indistinguishable from not providing any |
| + /// events. |
| + /// |
| + /// If the original `isCancelOnError` value for the subscription is known, |
|
nweiz
2015/06/12 01:24:26
"isCancelOnError" -> "cancelOnError"
Lasse Reichstein Nielsen
2015/06/15 15:46:24
I considered that, but it looks too much like a re
|
| + /// it can be passed as the [isCancelOnError] parameter. |
| + /// |
| + /// If the original subscription cancels on an error, this stream's |
| + /// subscription will also do so, whether it's requested in the [listen] call |
| + /// or not. |
| + /// |
| + /// If the `SubscriptionStream` knows this the original subscription's |
| + /// error behavior, it can adapt the new stream's subscription. |
| + /// |
| + /// If the original cancels on an error and the new subscription shouldn't, |
| + /// an extra done event is added after the first error, when the |
| + /// original subscription is being cancelled. |
| + /// |
| + /// If the original doesn't cancel on an error and the new subscripton does, |
| + /// a cancel is added on the first error. |
| + /// |
| + /// The default is to assume the subscription will not stop after an error. |
| + /// If that assumption is wrong, the result is a stream that stops providing |
| + /// events after the first error, including done events. |
|
Søren Gjesse
2015/06/11 07:57:07
I find that the requirement to know the cancel on
nweiz
2015/06/12 01:24:26
I also found this pretty hard to follow. If [Strea
Lasse Reichstein Nielsen
2015/06/12 13:04:23
It's slightly annoying. The alternative is to not
Lasse Reichstein Nielsen
2015/06/15 15:46:24
I adapted the paragraph, it's more readable than t
nweiz
2015/06/16 01:05:23
The more I think about this, the more I think the
Lasse Reichstein Nielsen
2015/06/16 13:05:45
I tend to agree. It just creates ugliness all the
|
| + SubscriptionStream(StreamSubscription subscription, |
|
nweiz
2015/06/12 01:24:26
Shouldn't this be a StreamSubscription<T>?
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Like Iterable.from, the argument is more permissiv
|
| + {bool isCancelOnError: false}) |
|
nweiz
2015/06/12 01:24:26
I'd call this "cancelOnError" to match the paramet
Lasse Reichstein Nielsen
2015/06/15 15:46:24
And I want it to not match because it's not really
|
| + : _source = subscription, |
| + _isCancelOnError = isCancelOnError { |
| + _source.pause(); |
| + // Clear callbacks to avoid keeping them alive unnecessarily. |
| + _source.onData(null); |
| + _source.onError(null); |
| + _source.onDone(null); |
| + } |
| + |
| + StreamSubscription<T> listen(void onData(T event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + if (_source == null) { |
| + throw new StateError("Stream has already been listened to."); |
| + } |
| + cancelOnError = (true == cancelOnError); |
| + var subscription = _source; |
| + _source = null; |
| + var result; |
| + if (cancelOnError == _isCancelOnError) { |
| + if (subscription is StreamSubscription<T>) { |
|
nweiz
2015/06/12 01:24:26
I don't understand why [subscription] isn't typed
Lasse Reichstein Nielsen
2015/06/15 15:46:24
The problem is that we promise to return a StreamS
|
| + result = subscription; |
| + } else { |
| + // Type parameters don't match - cast by wrapping. |
| + result = new DelegatingStreamSubscription<T>(subscription); |
| + } |
| + } else if (cancelOnError) { |
| + assert(!_isCancelOnError); |
| + result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
| + } else { |
| + assert(!cancelOnError && _isCancelOnError); |
| + result = new _DoneAfterErrorSubscriptionWrapper(subscription); |
| + } |
| + result.onData(onData); |
| + result.onError(onError); |
| + result.onDone(onDone); |
| + subscription.resume(); |
| + return result; |
| + } |
| +} |
| + |
| +class _CancelOnErrorSubscriptionWrapper<T> |
|
nweiz
2015/06/12 01:24:26
Document this.
Lasse Reichstein Nielsen
2015/06/15 15:46:24
Done.
|
| + extends DelegatingStreamSubscription<T> { |
| + _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) |
| + : super(subscription); |
| + |
| + void onError(Function handleError) { |
| + handleError = _cancelBeforeError(handleError); |
| + super.onError(handleError); |
| + } |
| + |
| + /// Helper function used by [onError]. |
| + /// |
| + /// Returns an error handler which cancels the stream when it receives an |
| + /// error. |
| + Function _cancelBeforeError(Function handleError) { |
|
nweiz
2015/06/12 01:24:26
Why not just inline this?
Lasse Reichstein Nielsen
2015/06/15 15:46:24
I generally like to put functionality in separate
nweiz
2015/06/16 01:05:23
That's interesting... I tend to prefer keeping the
|
| + return (e, s) { |
|
nweiz
2015/06/12 01:24:26
Use full words for variables.
Lasse Reichstein Nielsen
2015/06/15 15:46:24
Done.
|
| + super.cancel(); |
| + if (handleError is _BinaryCallback) { |
| + handleError(e, s); |
| + } else { |
| + handleError(e); |
| + } |
| + }; |
| + } |
| +} |
| + |
| +/// Subscription wrapper that assumes wrapped subscription is cancel-on-error. |
|
nweiz
2015/06/12 01:24:27
"assumes wrapped" -> "assumes the wrapped"
Lasse Reichstein Nielsen
2015/06/15 15:46:24
But then it doesn't fit on one line?!? :)
Rewritte
|
| +/// |
| +/// This adapts a cancel-on-error subscription as non-cancel-on-error, |
| +/// by introducing a done event after the error event that terminated |
| +/// the wrapped subscription. |
| +class _DoneAfterErrorSubscriptionWrapper<T> |
| + extends DelegatingStreamSubscription<T> { |
| + // Store the done handler so it can be called after an error. |
| + |
| + var _onDone; |
|
nweiz
2015/06/12 01:24:26
Nit: swap this with the previous blank line.
Lasse Reichstein Nielsen
2015/06/15 15:46:24
Done.
|
| + _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) |
| + : super(subscription); |
| + |
| + void onDone(void handleDone()) { |
| + super.onDone(handleDone); |
| + _onDone = handleDone; |
| + } |
| + |
| + void onError(Function errorHandler) { |
| + errorHandler = _doneAfterError(errorHandler); |
| + super.onError(errorHandler); |
| + } |
| + |
| + Function _doneAfterError(Function errorHandler) { |
|
nweiz
2015/06/12 01:24:26
This also seems like it would be clearer if it wer
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
|
| + return (e, s) { |
| + // Ensure that the subscription is really cancelled |
| + // so we don't get two done events ever - even if the |
| + // class is used incorrectly. |
| + super.cancel(); |
| + scheduleMicrotask(() { |
| + if (_onDone != null) { |
| + _onDone(); |
| + } |
| + }); |
| + if (errorHandler is _BinaryCallback) { |
| + errorHandler(e, s); |
| + } else { |
| + errorHandler(e); |
| + } |
| + }; |
| + } |
| +} |
| + |
| +typedef _BinaryCallback(e, s); |
|
Søren Gjesse
2015/06/11 07:57:07
See comment in other file.
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Acknowledged.
|