Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(223)

Side by Side Diff: packages/async/lib/src/stream_subscription_transformer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/async/lib/src/stream_splitter.dart ('k') | packages/async/lib/src/stream_zip.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'async_memoizer.dart';
8
9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner);
10
11 typedef void _VoidHandler<T>(StreamSubscription<T> inner);
12
13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to
14 /// a stream.
15 ///
16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or
17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked.
18 /// By default, handlers just forward to the underlying subscription.
19 ///
20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the
21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the
22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has
23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether
24 /// the subscription is paused already or not.
25 ///
26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must
27 /// synchronously call the corresponding method** on the inner
28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
29 /// must call `pause()`, and [handleResume] must call `resume()`.
30 StreamTransformer<T, T> subscriptionTransformer<T>(
31 {Future handleCancel(StreamSubscription<T> inner),
32 void handlePause(StreamSubscription<T> inner),
33 void handleResume(StreamSubscription<T> inner)}) {
34 return new StreamTransformer((stream, cancelOnError) {
35 return new _TransformedSubscription(
36 stream.listen(null, cancelOnError: cancelOnError),
37 handleCancel ?? (inner) => inner.cancel(),
38 handlePause ??
39 (inner) {
40 inner.pause();
41 },
42 handleResume ??
43 (inner) {
44 inner.resume();
45 });
46 });
47 }
48
49 /// A [StreamSubscription] wrapper that calls callbacks for subscription
50 /// methods.
51 class _TransformedSubscription<T> implements StreamSubscription<T> {
52 /// The wrapped subscription.
53 StreamSubscription<T> _inner;
54
55 /// The callback to run when [cancel] is called.
56 final _AsyncHandler<T> _handleCancel;
57
58 /// The callback to run when [pause] is called.
59 final _VoidHandler<T> _handlePause;
60
61 /// The callback to run when [resume] is called.
62 final _VoidHandler<T> _handleResume;
63
64 bool get isPaused => _inner?.isPaused ?? false;
65
66 _TransformedSubscription(
67 this._inner, this._handleCancel, this._handlePause, this._handleResume);
68
69 void onData(void handleData(T data)) {
70 _inner?.onData(handleData);
71 }
72
73 void onError(Function handleError) {
74 _inner?.onError(handleError);
75 }
76
77 void onDone(void handleDone()) {
78 _inner?.onDone(handleDone);
79 }
80
81 Future cancel() => _cancelMemoizer.runOnce(() {
82 var inner = _inner;
83 _inner.onData(null);
84 _inner.onDone(null);
85
86 // Setting onError to null will cause errors to be top-leveled.
87 _inner.onError((_, __) {});
88 _inner = null;
89 return _handleCancel(inner);
90 });
91 final _cancelMemoizer = new AsyncMemoizer();
92
93 void pause([Future resumeFuture]) {
94 if (_cancelMemoizer.hasRun) return;
95 if (resumeFuture != null) resumeFuture.whenComplete(resume);
96 _handlePause(_inner);
97 }
98
99 void resume() {
100 if (_cancelMemoizer.hasRun) return;
101 _handleResume(_inner);
102 }
103
104 Future<E> asFuture<E>([E futureValue]) =>
105 _inner?.asFuture(futureValue) ?? new Completer<E>().future;
106 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/stream_splitter.dart ('k') | packages/async/lib/src/stream_zip.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698