| Index: packages/async/lib/src/stream_subscription_transformer.dart
|
| diff --git a/packages/async/lib/src/stream_subscription_transformer.dart b/packages/async/lib/src/stream_subscription_transformer.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..1443b183203adda77f3bd44c7baf1cc6a7e955ef
|
| --- /dev/null
|
| +++ b/packages/async/lib/src/stream_subscription_transformer.dart
|
| @@ -0,0 +1,106 @@
|
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +import 'dart:async';
|
| +
|
| +import 'async_memoizer.dart';
|
| +
|
| +typedef Future _AsyncHandler<T>(StreamSubscription<T> inner);
|
| +
|
| +typedef void _VoidHandler<T>(StreamSubscription<T> inner);
|
| +
|
| +/// Creates a [StreamTransformer] that modifies the behavior of subscriptions to
|
| +/// a stream.
|
| +///
|
| +/// When [StreamSubscription.cancel], [StreamSubscription.pause], or
|
| +/// [StreamSubscription.resume] is called, the corresponding handler is invoked.
|
| +/// By default, handlers just forward to the underlying subscription.
|
| +///
|
| +/// Guarantees that none of the [StreamSubscription] callbacks and none of the
|
| +/// callbacks passed to `subscriptionTransformer()` will be invoked once the
|
| +/// transformed [StreamSubscription] has been canceled and `handleCancel()` has
|
| +/// run. The [handlePause] and [handleResume] are invoked regardless of whether
|
| +/// the subscription is paused already or not.
|
| +///
|
| +/// In order to preserve [StreamSubscription] guarantees, **all callbacks must
|
| +/// synchronously call the corresponding method** on the inner
|
| +/// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
|
| +/// must call `pause()`, and [handleResume] must call `resume()`.
|
| +StreamTransformer<T, T> subscriptionTransformer<T>(
|
| + {Future handleCancel(StreamSubscription<T> inner),
|
| + void handlePause(StreamSubscription<T> inner),
|
| + void handleResume(StreamSubscription<T> inner)}) {
|
| + return new StreamTransformer((stream, cancelOnError) {
|
| + return new _TransformedSubscription(
|
| + stream.listen(null, cancelOnError: cancelOnError),
|
| + handleCancel ?? (inner) => inner.cancel(),
|
| + handlePause ??
|
| + (inner) {
|
| + inner.pause();
|
| + },
|
| + handleResume ??
|
| + (inner) {
|
| + inner.resume();
|
| + });
|
| + });
|
| +}
|
| +
|
| +/// A [StreamSubscription] wrapper that calls callbacks for subscription
|
| +/// methods.
|
| +class _TransformedSubscription<T> implements StreamSubscription<T> {
|
| + /// The wrapped subscription.
|
| + StreamSubscription<T> _inner;
|
| +
|
| + /// The callback to run when [cancel] is called.
|
| + final _AsyncHandler<T> _handleCancel;
|
| +
|
| + /// The callback to run when [pause] is called.
|
| + final _VoidHandler<T> _handlePause;
|
| +
|
| + /// The callback to run when [resume] is called.
|
| + final _VoidHandler<T> _handleResume;
|
| +
|
| + bool get isPaused => _inner?.isPaused ?? false;
|
| +
|
| + _TransformedSubscription(
|
| + this._inner, this._handleCancel, this._handlePause, this._handleResume);
|
| +
|
| + void onData(void handleData(T data)) {
|
| + _inner?.onData(handleData);
|
| + }
|
| +
|
| + void onError(Function handleError) {
|
| + _inner?.onError(handleError);
|
| + }
|
| +
|
| + void onDone(void handleDone()) {
|
| + _inner?.onDone(handleDone);
|
| + }
|
| +
|
| + Future cancel() => _cancelMemoizer.runOnce(() {
|
| + var inner = _inner;
|
| + _inner.onData(null);
|
| + _inner.onDone(null);
|
| +
|
| + // Setting onError to null will cause errors to be top-leveled.
|
| + _inner.onError((_, __) {});
|
| + _inner = null;
|
| + return _handleCancel(inner);
|
| + });
|
| + final _cancelMemoizer = new AsyncMemoizer();
|
| +
|
| + void pause([Future resumeFuture]) {
|
| + if (_cancelMemoizer.hasRun) return;
|
| + if (resumeFuture != null) resumeFuture.whenComplete(resume);
|
| + _handlePause(_inner);
|
| + }
|
| +
|
| + void resume() {
|
| + if (_cancelMemoizer.hasRun) return;
|
| + _handleResume(_inner);
|
| + }
|
| +
|
| + Future<E> asFuture<E>([E futureValue]) =>
|
| + _inner?.asFuture(futureValue) ?? new Completer<E>().future;
|
| +}
|
|
|