| OLD | NEW |
| (Empty) |
| 1 part of dart.async; | |
| 2 _runUserCode(userCode(), onSuccess(value), onError(error, StackTrace stackTrace
)) { | |
| 3 try { | |
| 4 onSuccess(userCode()); | |
| 5 } | |
| 6 catch (e, s) { | |
| 7 AsyncError replacement = Zone.current.errorCallback(e, s); | |
| 8 if (replacement == null) { | |
| 9 onError(e, s); | |
| 10 } | |
| 11 else { | |
| 12 var error = _nonNullError(replacement.error); | |
| 13 var stackTrace = replacement.stackTrace; | |
| 14 onError(error, stackTrace); | |
| 15 } | |
| 16 } | |
| 17 } | |
| 18 void _cancelAndError(StreamSubscription subscription, _Future future, error, St
ackTrace stackTrace) { | |
| 19 var cancelFuture = subscription.cancel(); | |
| 20 if (cancelFuture is Future) { | |
| 21 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); | |
| 22 } | |
| 23 else { | |
| 24 future._completeError(error, stackTrace); | |
| 25 } | |
| 26 } | |
| 27 void _cancelAndErrorWithReplacement(StreamSubscription subscription, _Future fu
ture, error, StackTrace stackTrace) { | |
| 28 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
| 29 if (replacement != null) { | |
| 30 error = _nonNullError(replacement.error); | |
| 31 stackTrace = replacement.stackTrace; | |
| 32 } | |
| 33 _cancelAndError(subscription, future, error, stackTrace); | |
| 34 } | |
| 35 _cancelAndErrorClosure(StreamSubscription subscription, _Future future) => ((er
ror, StackTrace stackTrace) => _cancelAndError(subscription, future, error, stac
kTrace)); | |
| 36 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | |
| 37 var cancelFuture = subscription.cancel(); | |
| 38 if (cancelFuture is Future) { | |
| 39 cancelFuture.whenComplete(() => future._complete(value)); | |
| 40 } | |
| 41 else { | |
| 42 future._complete(value); | |
| 43 } | |
| 44 } | |
| 45 abstract class _ForwardingStream<S, T> extends Stream<T> {final Stream<S> _sour
ce; | |
| 46 _ForwardingStream(this._source); | |
| 47 bool get isBroadcast => _source.isBroadcast; | |
| 48 StreamSubscription<T> listen(void onData(T value), { | |
| 49 Function onError, void onDone(), bool cancelOnError} | |
| 50 ) { | |
| 51 cancelOnError = identical(true, cancelOnError); | |
| 52 return _createSubscription(onData, onError, onDone, cancelOnError); | |
| 53 } | |
| 54 StreamSubscription<T> _createSubscription(void onData(T data), Function onError
, void onDone(), bool cancelOnError) { | |
| 55 return new _ForwardingStreamSubscription<S, T>(this, onData, onError, onDone,
cancelOnError); | |
| 56 } | |
| 57 void _handleData(S data, _EventSink<T> sink) { | |
| 58 dynamic outputData = data; | |
| 59 sink._add(DEVC$RT.cast(outputData, dynamic, T, "CompositeCast", """line 104,
column 15 of dart:async/stream_pipe.dart: """, outputData is T, false)); | |
| 60 } | |
| 61 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | |
| 62 sink._addError(error, stackTrace); | |
| 63 } | |
| 64 void _handleDone(_EventSink<T> sink) { | |
| 65 sink._close(); | |
| 66 } | |
| 67 } | |
| 68 class _ForwardingStreamSubscription<S, T> extends _BufferingStreamSubscription<
T> {final _ForwardingStream<S, T> _stream; | |
| 69 StreamSubscription<S> _subscription; | |
| 70 _ForwardingStreamSubscription(this._stream, void onData(T data), Function onErr
or, void onDone(), bool cancelOnError) : super(onData, onError, onDone, cancelOn
Error) { | |
| 71 _subscription = _stream._source.listen(_handleData, onError: _handleError, onDon
e: _handleDone); | |
| 72 } | |
| 73 void _add(T data) { | |
| 74 if (_isClosed) return; super._add(data); | |
| 75 } | |
| 76 void _addError(Object error, StackTrace stackTrace) { | |
| 77 if (_isClosed) return; super._addError(error, stackTrace); | |
| 78 } | |
| 79 void _onPause() { | |
| 80 if (_subscription == null) return; _subscription.pause(); | |
| 81 } | |
| 82 void _onResume() { | |
| 83 if (_subscription == null) return; _subscription.resume(); | |
| 84 } | |
| 85 Future _onCancel() { | |
| 86 if (_subscription != null) { | |
| 87 StreamSubscription subscription = _subscription; | |
| 88 _subscription = null; | |
| 89 subscription.cancel(); | |
| 90 } | |
| 91 return null; | |
| 92 } | |
| 93 void _handleData(S data) { | |
| 94 _stream._handleData(data, this); | |
| 95 } | |
| 96 void _handleError(error, StackTrace stackTrace) { | |
| 97 _stream._handleError(error, stackTrace, this); | |
| 98 } | |
| 99 void _handleDone() { | |
| 100 _stream._handleDone(this); | |
| 101 } | |
| 102 } | |
| 103 typedef bool _Predicate<T>(T value); | |
| 104 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { | |
| 105 AsyncError replacement = Zone.current.errorCallback(error, DEVC$RT.cast(stackTra
ce, dynamic, StackTrace, "DynamicCast", """line 191, column 62 of dart:async/str
eam_pipe.dart: """, stackTrace is StackTrace, true)); | |
| 106 if (replacement != null) { | |
| 107 error = _nonNullError(replacement.error); | |
| 108 stackTrace = replacement.stackTrace; | |
| 109 } | |
| 110 sink._addError(error, DEVC$RT.cast(stackTrace, dynamic, StackTrace, "DynamicCas
t", """line 196, column 25 of dart:async/stream_pipe.dart: """, stackTrace is St
ackTrace, true)); | |
| 111 } | |
| 112 class _WhereStream<T> extends _ForwardingStream<T, T> {final _Predicate<T> _tes
t; | |
| 113 _WhereStream(Stream<T> source, bool test(T value)) : _test = test, super(source
); | |
| 114 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 115 bool satisfies; | |
| 116 try { | |
| 117 satisfies = _test(inputEvent); | |
| 118 } | |
| 119 catch (e, s) { | |
| 120 _addErrorWithReplacement(sink, e, s); | |
| 121 return;} | |
| 122 if (satisfies) { | |
| 123 sink._add(inputEvent); | |
| 124 } | |
| 125 } | |
| 126 } | |
| 127 typedef T _Transformation<S, T>(S value); | |
| 128 class _MapStream<S, T> extends _ForwardingStream<S, T> {final _Transformation _
transform; | |
| 129 _MapStream(Stream<S> source, T transform(S event)) : this._transform = transfor
m, super(source); | |
| 130 void _handleData(S inputEvent, _EventSink<T> sink) { | |
| 131 T outputEvent; | |
| 132 try { | |
| 133 outputEvent = ((__x58) => DEVC$RT.cast(__x58, dynamic, T, "CompositeCast", """li
ne 235, column 21 of dart:async/stream_pipe.dart: """, __x58 is T, false))(_tran
sform(inputEvent)); | |
| 134 } | |
| 135 catch (e, s) { | |
| 136 _addErrorWithReplacement(sink, e, s); | |
| 137 return;} | |
| 138 sink._add(outputEvent); | |
| 139 } | |
| 140 } | |
| 141 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {final _Transformatio
n<S, Iterable<T>> _expand; | |
| 142 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) : this._expand = e
xpand, super(source); | |
| 143 void _handleData(S inputEvent, _EventSink<T> sink) { | |
| 144 try { | |
| 145 for (T value in _expand(inputEvent)) { | |
| 146 sink._add(value); | |
| 147 } | |
| 148 } | |
| 149 catch (e, s) { | |
| 150 _addErrorWithReplacement(sink, e, s); | |
| 151 } | |
| 152 } | |
| 153 } | |
| 154 typedef bool _ErrorTest(error); | |
| 155 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {final Function _tr
ansform; | |
| 156 final _ErrorTest _test; | |
| 157 _HandleErrorStream(Stream<T> source, Function onError, bool test(error)) : this
._transform = onError, this._test = test, super(source); | |
| 158 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | |
| 159 bool matches = true; | |
| 160 if (_test != null) { | |
| 161 try { | |
| 162 matches = _test(error); | |
| 163 } | |
| 164 catch (e, s) { | |
| 165 _addErrorWithReplacement(sink, e, s); | |
| 166 return;} | |
| 167 } | |
| 168 if (matches) { | |
| 169 try { | |
| 170 _invokeErrorHandler(_transform, error, stackTrace); | |
| 171 } | |
| 172 catch (e, s) { | |
| 173 if (identical(e, error)) { | |
| 174 sink._addError(error, stackTrace); | |
| 175 } | |
| 176 else { | |
| 177 _addErrorWithReplacement(sink, e, s); | |
| 178 } | |
| 179 return;} | |
| 180 } | |
| 181 else { | |
| 182 sink._addError(error, stackTrace); | |
| 183 } | |
| 184 } | |
| 185 } | |
| 186 class _TakeStream<T> extends _ForwardingStream<T, T> {int _remaining; | |
| 187 _TakeStream(Stream<T> source, int count) : this._remaining = count, super(sourc
e) { | |
| 188 if (count is! int) throw new ArgumentError(count); | |
| 189 } | |
| 190 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 191 if (_remaining > 0) { | |
| 192 sink._add(inputEvent); | |
| 193 _remaining -= 1; | |
| 194 if (_remaining == 0) { | |
| 195 sink._close(); | |
| 196 } | |
| 197 } | |
| 198 } | |
| 199 } | |
| 200 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {final _Predicate<T>
_test; | |
| 201 _TakeWhileStream(Stream<T> source, bool test(T value)) : this._test = test, sup
er(source); | |
| 202 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 203 bool satisfies; | |
| 204 try { | |
| 205 satisfies = _test(inputEvent); | |
| 206 } | |
| 207 catch (e, s) { | |
| 208 _addErrorWithReplacement(sink, e, s); | |
| 209 sink._close(); | |
| 210 return;} | |
| 211 if (satisfies) { | |
| 212 sink._add(inputEvent); | |
| 213 } | |
| 214 else { | |
| 215 sink._close(); | |
| 216 } | |
| 217 } | |
| 218 } | |
| 219 class _SkipStream<T> extends _ForwardingStream<T, T> {int _remaining; | |
| 220 _SkipStream(Stream<T> source, int count) : this._remaining = count, super(sourc
e) { | |
| 221 if (count is! int || count < 0) throw new ArgumentError(count); | |
| 222 } | |
| 223 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 224 if (_remaining > 0) { | |
| 225 _remaining--; | |
| 226 return;} | |
| 227 sink._add(inputEvent); | |
| 228 } | |
| 229 } | |
| 230 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {final _Predicate<T>
_test; | |
| 231 bool _hasFailed = false; | |
| 232 _SkipWhileStream(Stream<T> source, bool test(T value)) : this._test = test, sup
er(source); | |
| 233 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 234 if (_hasFailed) { | |
| 235 sink._add(inputEvent); | |
| 236 return;} | |
| 237 bool satisfies; | |
| 238 try { | |
| 239 satisfies = _test(inputEvent); | |
| 240 } | |
| 241 catch (e, s) { | |
| 242 _addErrorWithReplacement(sink, e, s); | |
| 243 _hasFailed = true; | |
| 244 return;} | |
| 245 if (!satisfies) { | |
| 246 _hasFailed = true; | |
| 247 sink._add(inputEvent); | |
| 248 } | |
| 249 } | |
| 250 } | |
| 251 typedef bool _Equality<T>(T a, T b); | |
| 252 class _DistinctStream<T> extends _ForwardingStream<T, T> {static var _SENTINEL
= new Object(); | |
| 253 _Equality<T> _equals; | |
| 254 var _previous = _SENTINEL; | |
| 255 _DistinctStream(Stream<T> source, bool equals(T a, T b)) : _equals = equals, su
per(source); | |
| 256 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 257 if (identical(_previous, _SENTINEL)) { | |
| 258 _previous = inputEvent; | |
| 259 return sink._add(inputEvent); | |
| 260 } | |
| 261 else { | |
| 262 bool isEqual; | |
| 263 try { | |
| 264 if (_equals == null) { | |
| 265 isEqual = (_previous == inputEvent); | |
| 266 } | |
| 267 else { | |
| 268 isEqual = _equals(DEVC$RT.cast(_previous, Object, T, "CompositeCast", """line 42
6, column 29 of dart:async/stream_pipe.dart: """, _previous is T, false), inputE
vent); | |
| 269 } | |
| 270 } | |
| 271 catch (e, s) { | |
| 272 _addErrorWithReplacement(sink, e, s); | |
| 273 return null; | |
| 274 } | |
| 275 if (!isEqual) { | |
| 276 sink._add(inputEvent); | |
| 277 _previous = inputEvent; | |
| 278 } | |
| 279 } | |
| 280 } | |
| 281 } | |
| OLD | NEW |