Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(324)

Side by Side Diff: lib/src/stream_subscription_transformer.dart

Issue 2660333005: Change generic comment syntax to real generic syntax. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'async_memoizer.dart'; 7 import 'async_memoizer.dart';
8 8
9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); 9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner);
10 10
11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); 11 typedef void _VoidHandler<T>(StreamSubscription<T> inner);
12 12
13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to 13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to
14 /// a stream. 14 /// a stream.
15 /// 15 ///
16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or 16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or
17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. 17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked.
18 /// By default, handlers just forward to the underlying subscription. 18 /// By default, handlers just forward to the underlying subscription.
19 /// 19 ///
20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the 20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the
21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the 21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the
22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has 22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has
23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether 23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether
24 /// the subscription is paused already or not. 24 /// the subscription is paused already or not.
25 /// 25 ///
26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must 26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must
27 /// synchronously call the corresponding method** on the inner 27 /// synchronously call the corresponding method** on the inner
28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] 28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
29 /// must call `pause()`, and [handleResume] must call `resume()`. 29 /// must call `pause()`, and [handleResume] must call `resume()`.
30 StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/( 30 StreamTransformer<T, T> subscriptionTransformer<T>(
31 {Future handleCancel(StreamSubscription/*<T>*/ inner), 31 {Future handleCancel(StreamSubscription<T> inner),
32 void handlePause(StreamSubscription/*<T>*/ inner), 32 void handlePause(StreamSubscription<T> inner),
33 void handleResume(StreamSubscription/*<T>*/ inner)}) { 33 void handleResume(StreamSubscription<T> inner)}) {
34 return new StreamTransformer((stream, cancelOnError) { 34 return new StreamTransformer((stream, cancelOnError) {
35 return new _TransformedSubscription( 35 return new _TransformedSubscription(
36 stream.listen(null, cancelOnError: cancelOnError), 36 stream.listen(null, cancelOnError: cancelOnError),
37 handleCancel ?? (inner) => inner.cancel(), 37 handleCancel ?? (inner) => inner.cancel(),
38 handlePause ?? (inner) { 38 handlePause ??
39 inner.pause(); 39 (inner) {
40 }, 40 inner.pause();
41 handleResume ?? (inner) { 41 },
42 inner.resume(); 42 handleResume ??
43 }); 43 (inner) {
44 inner.resume();
45 });
44 }); 46 });
45 } 47 }
46 48
47 /// A [StreamSubscription] wrapper that calls callbacks for subscription 49 /// A [StreamSubscription] wrapper that calls callbacks for subscription
48 /// methods. 50 /// methods.
49 class _TransformedSubscription<T> implements StreamSubscription<T> { 51 class _TransformedSubscription<T> implements StreamSubscription<T> {
50 /// The wrapped subscription. 52 /// The wrapped subscription.
51 StreamSubscription<T> _inner; 53 StreamSubscription<T> _inner;
52 54
53 /// The callback to run when [cancel] is called. 55 /// The callback to run when [cancel] is called.
54 final _AsyncHandler<T> _handleCancel; 56 final _AsyncHandler<T> _handleCancel;
55 57
56 /// The callback to run when [pause] is called. 58 /// The callback to run when [pause] is called.
57 final _VoidHandler<T> _handlePause; 59 final _VoidHandler<T> _handlePause;
58 60
59 /// The callback to run when [resume] is called. 61 /// The callback to run when [resume] is called.
60 final _VoidHandler<T> _handleResume; 62 final _VoidHandler<T> _handleResume;
61 63
62 bool get isPaused => _inner?.isPaused ?? false; 64 bool get isPaused => _inner?.isPaused ?? false;
63 65
64 _TransformedSubscription(this._inner, this._handleCancel, this._handlePause, 66 _TransformedSubscription(
65 this._handleResume); 67 this._inner, this._handleCancel, this._handlePause, this._handleResume);
66 68
67 void onData(void handleData(T data)) { 69 void onData(void handleData(T data)) {
68 _inner?.onData(handleData); 70 _inner?.onData(handleData);
69 } 71 }
70 72
71 void onError(Function handleError) { 73 void onError(Function handleError) {
72 _inner?.onError(handleError); 74 _inner?.onError(handleError);
73 } 75 }
74 76
75 void onDone(void handleDone()) { 77 void onDone(void handleDone()) {
76 _inner?.onDone(handleDone); 78 _inner?.onDone(handleDone);
77 } 79 }
78 80
79 Future cancel() => _cancelMemoizer.runOnce(() { 81 Future cancel() => _cancelMemoizer.runOnce(() {
80 var inner = _inner; 82 var inner = _inner;
81 _inner.onData(null); 83 _inner.onData(null);
82 _inner.onDone(null); 84 _inner.onDone(null);
83 85
84 // Setting onError to null will cause errors to be top-leveled. 86 // Setting onError to null will cause errors to be top-leveled.
85 _inner.onError((_, __) {}); 87 _inner.onError((_, __) {});
86 _inner = null; 88 _inner = null;
87 return _handleCancel(inner); 89 return _handleCancel(inner);
88 }); 90 });
89 final _cancelMemoizer = new AsyncMemoizer(); 91 final _cancelMemoizer = new AsyncMemoizer();
90 92
91 void pause([Future resumeFuture]) { 93 void pause([Future resumeFuture]) {
92 if (_cancelMemoizer.hasRun) return; 94 if (_cancelMemoizer.hasRun) return;
93 if (resumeFuture != null) resumeFuture.whenComplete(resume); 95 if (resumeFuture != null) resumeFuture.whenComplete(resume);
94 _handlePause(_inner); 96 _handlePause(_inner);
95 } 97 }
96 98
97 void resume() { 99 void resume() {
98 if (_cancelMemoizer.hasRun) return; 100 if (_cancelMemoizer.hasRun) return;
99 _handleResume(_inner); 101 _handleResume(_inner);
100 } 102 }
101 103
102 Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) => 104 Future<E> asFuture<E>([E futureValue]) =>
103 _inner?.asFuture(futureValue) ?? new Completer/*<E>*/().future; 105 _inner?.asFuture(futureValue) ?? new Completer<E>().future;
104 } 106 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698