| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.async; | |
| 6 | |
| 7 /** Runs user code and takes actions depending on success or failure. */ | |
| 8 _runUserCode(userCode(), | |
| 9 onSuccess(value), | |
| 10 onError(error, StackTrace stackTrace)) { | |
| 11 try { | |
| 12 onSuccess(userCode()); | |
| 13 } catch (e, s) { | |
| 14 AsyncError replacement = Zone.current.errorCallback(e, s); | |
| 15 if (replacement == null) { | |
| 16 onError(e, s); | |
| 17 } else { | |
| 18 var error = _nonNullError(replacement.error); | |
| 19 var stackTrace = replacement.stackTrace; | |
| 20 onError(error, stackTrace); | |
| 21 } | |
| 22 } | |
| 23 } | |
| 24 | |
| 25 /** Helper function to cancel a subscription and wait for the potential future, | |
| 26 before completing with an error. */ | |
| 27 void _cancelAndError(StreamSubscription subscription, | |
| 28 _Future future, | |
| 29 error, | |
| 30 StackTrace stackTrace) { | |
| 31 var cancelFuture = subscription.cancel(); | |
| 32 if (cancelFuture is Future) { | |
| 33 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); | |
| 34 } else { | |
| 35 future._completeError(error, stackTrace); | |
| 36 } | |
| 37 } | |
| 38 | |
| 39 void _cancelAndErrorWithReplacement(StreamSubscription subscription, | |
| 40 _Future future, | |
| 41 error, StackTrace stackTrace) { | |
| 42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
| 43 if (replacement != null) { | |
| 44 error = _nonNullError(replacement.error); | |
| 45 stackTrace = replacement.stackTrace; | |
| 46 } | |
| 47 _cancelAndError(subscription, future, error, stackTrace); | |
| 48 } | |
| 49 | |
| 50 typedef void _ErrorCallback(error, StackTrace stackTrace); | |
| 51 | |
| 52 /** Helper function to make an onError argument to [_runUserCode]. */ | |
| 53 _ErrorCallback _cancelAndErrorClosure( | |
| 54 StreamSubscription subscription, _Future future) { | |
| 55 return (error, StackTrace stackTrace) { | |
| 56 _cancelAndError(subscription, future, error, stackTrace); | |
| 57 }; | |
| 58 } | |
| 59 | |
| 60 /** Helper function to cancel a subscription and wait for the potential future, | |
| 61 before completing with a value. */ | |
| 62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | |
| 63 var cancelFuture = subscription.cancel(); | |
| 64 if (cancelFuture is Future) { | |
| 65 cancelFuture.whenComplete(() => future._complete(value)); | |
| 66 } else { | |
| 67 future._complete(value); | |
| 68 } | |
| 69 } | |
| 70 | |
| 71 | |
| 72 /** | |
| 73 * A [Stream] that forwards subscriptions to another stream. | |
| 74 * | |
| 75 * This stream implements [Stream], but forwards all subscriptions | |
| 76 * to an underlying stream, and wraps the returned subscription to | |
| 77 * modify the events on the way. | |
| 78 * | |
| 79 * This class is intended for internal use only. | |
| 80 */ | |
| 81 abstract class _ForwardingStream<S, T> extends Stream<T> { | |
| 82 final Stream<S> _source; | |
| 83 | |
| 84 _ForwardingStream(this._source); | |
| 85 | |
| 86 bool get isBroadcast => _source.isBroadcast; | |
| 87 | |
| 88 StreamSubscription<T> listen(void onData(T value), | |
| 89 { Function onError, | |
| 90 void onDone(), | |
| 91 bool cancelOnError }) { | |
| 92 cancelOnError = identical(true, cancelOnError); | |
| 93 return _createSubscription(onData, onError, onDone, cancelOnError); | |
| 94 } | |
| 95 | |
| 96 StreamSubscription<T> _createSubscription( | |
| 97 void onData(T data), | |
| 98 Function onError, | |
| 99 void onDone(), | |
| 100 bool cancelOnError) { | |
| 101 return new _ForwardingStreamSubscription<S, T>( | |
| 102 this, onData, onError, onDone, cancelOnError); | |
| 103 } | |
| 104 | |
| 105 // Override the following methods in subclasses to change the behavior. | |
| 106 | |
| 107 void _handleData(S data, _EventSink<T> sink) { | |
| 108 sink._add(data as Object /*=T*/); | |
| 109 } | |
| 110 | |
| 111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | |
| 112 sink._addError(error, stackTrace); | |
| 113 } | |
| 114 | |
| 115 void _handleDone(_EventSink<T> sink) { | |
| 116 sink._close(); | |
| 117 } | |
| 118 } | |
| 119 | |
| 120 /** | |
| 121 * Abstract superclass for subscriptions that forward to other subscriptions. | |
| 122 */ | |
| 123 class _ForwardingStreamSubscription<S, T> | |
| 124 extends _BufferingStreamSubscription<T> { | |
| 125 final _ForwardingStream<S, T> _stream; | |
| 126 | |
| 127 StreamSubscription<S> _subscription; | |
| 128 | |
| 129 _ForwardingStreamSubscription(this._stream, void onData(T data), | |
| 130 Function onError, void onDone(), | |
| 131 bool cancelOnError) | |
| 132 : super(onData, onError, onDone, cancelOnError) { | |
| 133 _subscription = _stream._source.listen(_handleData, | |
| 134 onError: _handleError, | |
| 135 onDone: _handleDone); | |
| 136 } | |
| 137 | |
| 138 // _StreamSink interface. | |
| 139 // Transformers sending more than one event have no way to know if the stream | |
| 140 // is canceled or closed after the first, so we just ignore remaining events. | |
| 141 | |
| 142 void _add(T data) { | |
| 143 if (_isClosed) return; | |
| 144 super._add(data); | |
| 145 } | |
| 146 | |
| 147 void _addError(Object error, StackTrace stackTrace) { | |
| 148 if (_isClosed) return; | |
| 149 super._addError(error, stackTrace); | |
| 150 } | |
| 151 | |
| 152 // StreamSubscription callbacks. | |
| 153 | |
| 154 void _onPause() { | |
| 155 if (_subscription == null) return; | |
| 156 _subscription.pause(); | |
| 157 } | |
| 158 | |
| 159 void _onResume() { | |
| 160 if (_subscription == null) return; | |
| 161 _subscription.resume(); | |
| 162 } | |
| 163 | |
| 164 Future _onCancel() { | |
| 165 if (_subscription != null) { | |
| 166 StreamSubscription subscription = _subscription; | |
| 167 _subscription = null; | |
| 168 return subscription.cancel(); | |
| 169 } | |
| 170 return null; | |
| 171 } | |
| 172 | |
| 173 // Methods used as listener on source subscription. | |
| 174 | |
| 175 void _handleData(S data) { | |
| 176 _stream._handleData(data, this); | |
| 177 } | |
| 178 | |
| 179 void _handleError(error, StackTrace stackTrace) { | |
| 180 _stream._handleError(error, stackTrace, this); | |
| 181 } | |
| 182 | |
| 183 void _handleDone() { | |
| 184 _stream._handleDone(this); | |
| 185 } | |
| 186 } | |
| 187 | |
| 188 // ------------------------------------------------------------------- | |
| 189 // Stream transformers used by the default Stream implementation. | |
| 190 // ------------------------------------------------------------------- | |
| 191 | |
| 192 typedef bool _Predicate<T>(T value); | |
| 193 | |
| 194 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { | |
| 195 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
| 196 if (replacement != null) { | |
| 197 error = _nonNullError(replacement.error); | |
| 198 stackTrace = replacement.stackTrace; | |
| 199 } | |
| 200 sink._addError(error, stackTrace); | |
| 201 } | |
| 202 | |
| 203 | |
| 204 class _WhereStream<T> extends _ForwardingStream<T, T> { | |
| 205 final _Predicate<T> _test; | |
| 206 | |
| 207 _WhereStream(Stream<T> source, bool test(T value)) | |
| 208 : _test = test, super(source); | |
| 209 | |
| 210 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 211 bool satisfies; | |
| 212 try { | |
| 213 satisfies = _test(inputEvent); | |
| 214 } catch (e, s) { | |
| 215 _addErrorWithReplacement(sink, e, s); | |
| 216 return; | |
| 217 } | |
| 218 if (satisfies) { | |
| 219 sink._add(inputEvent); | |
| 220 } | |
| 221 } | |
| 222 } | |
| 223 | |
| 224 | |
| 225 typedef T _Transformation<S, T>(S value); | |
| 226 | |
| 227 /** | |
| 228 * A stream pipe that converts data events before passing them on. | |
| 229 */ | |
| 230 class _MapStream<S, T> extends _ForwardingStream<S, T> { | |
| 231 final _Transformation<S, T> _transform; | |
| 232 | |
| 233 _MapStream(Stream<S> source, T transform(S event)) | |
| 234 : this._transform = transform, super(source); | |
| 235 | |
| 236 void _handleData(S inputEvent, _EventSink<T> sink) { | |
| 237 T outputEvent; | |
| 238 try { | |
| 239 outputEvent = _transform(inputEvent); | |
| 240 } catch (e, s) { | |
| 241 _addErrorWithReplacement(sink, e, s); | |
| 242 return; | |
| 243 } | |
| 244 sink._add(outputEvent); | |
| 245 } | |
| 246 } | |
| 247 | |
| 248 /** | |
| 249 * A stream pipe that converts data events before passing them on. | |
| 250 */ | |
| 251 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | |
| 252 final _Transformation<S, Iterable<T>> _expand; | |
| 253 | |
| 254 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | |
| 255 : this._expand = expand, super(source); | |
| 256 | |
| 257 void _handleData(S inputEvent, _EventSink<T> sink) { | |
| 258 try { | |
| 259 for (T value in _expand(inputEvent)) { | |
| 260 sink._add(value); | |
| 261 } | |
| 262 } catch (e, s) { | |
| 263 // If either _expand or iterating the generated iterator throws, | |
| 264 // we abort the iteration. | |
| 265 _addErrorWithReplacement(sink, e, s); | |
| 266 } | |
| 267 } | |
| 268 } | |
| 269 | |
| 270 | |
| 271 typedef bool _ErrorTest(error); | |
| 272 | |
| 273 /** | |
| 274 * A stream pipe that converts or disposes error events | |
| 275 * before passing them on. | |
| 276 */ | |
| 277 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | |
| 278 final Function _transform; | |
| 279 final _ErrorTest _test; | |
| 280 | |
| 281 _HandleErrorStream(Stream<T> source, | |
| 282 Function onError, | |
| 283 bool test(error)) | |
| 284 : this._transform = onError, this._test = test, super(source); | |
| 285 | |
| 286 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | |
| 287 bool matches = true; | |
| 288 if (_test != null) { | |
| 289 try { | |
| 290 matches = _test(error); | |
| 291 } catch (e, s) { | |
| 292 _addErrorWithReplacement(sink, e, s); | |
| 293 return; | |
| 294 } | |
| 295 } | |
| 296 if (matches) { | |
| 297 try { | |
| 298 _invokeErrorHandler(_transform, error, stackTrace); | |
| 299 } catch (e, s) { | |
| 300 if (identical(e, error)) { | |
| 301 sink._addError(error, stackTrace); | |
| 302 } else { | |
| 303 _addErrorWithReplacement(sink, e, s); | |
| 304 } | |
| 305 return; | |
| 306 } | |
| 307 } else { | |
| 308 sink._addError(error, stackTrace); | |
| 309 } | |
| 310 } | |
| 311 } | |
| 312 | |
| 313 | |
| 314 class _TakeStream<T> extends _ForwardingStream<T, T> { | |
| 315 final int _count; | |
| 316 | |
| 317 _TakeStream(Stream<T> source, int count) | |
| 318 : this._count = count, super(source) { | |
| 319 // This test is done early to avoid handling an async error | |
| 320 // in the _handleData method. | |
| 321 if (count is! int) throw new ArgumentError(count); | |
| 322 } | |
| 323 | |
| 324 StreamSubscription<T> _createSubscription( | |
| 325 void onData(T data), | |
| 326 Function onError, | |
| 327 void onDone(), | |
| 328 bool cancelOnError) { | |
| 329 return new _StateStreamSubscription<T>( | |
| 330 this, onData, onError, onDone, cancelOnError, _count); | |
| 331 } | |
| 332 | |
| 333 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 334 _StateStreamSubscription<T> subscription = sink; | |
| 335 int count = subscription._count; | |
| 336 if (count > 0) { | |
| 337 sink._add(inputEvent); | |
| 338 count -= 1; | |
| 339 subscription._count = count; | |
| 340 if (count == 0) { | |
| 341 // Closing also unsubscribes all subscribers, which unsubscribes | |
| 342 // this from source. | |
| 343 sink._close(); | |
| 344 } | |
| 345 } | |
| 346 } | |
| 347 } | |
| 348 | |
| 349 /** | |
| 350 * A [_ForwardingStreamSubscription] with one extra state field. | |
| 351 * | |
| 352 * Use by several different classes, some storing an integer, others a bool. | |
| 353 */ | |
| 354 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { | |
| 355 // Raw state field. Typed access provided by getters and setters below. | |
| 356 var _sharedState; | |
| 357 | |
| 358 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), | |
| 359 Function onError, void onDone(), | |
| 360 bool cancelOnError, this._sharedState) | |
| 361 : super(stream, onData, onError, onDone, cancelOnError); | |
| 362 | |
| 363 bool get _flag => _sharedState; | |
| 364 void set _flag(bool flag) { _sharedState = flag; } | |
| 365 int get _count => _sharedState; | |
| 366 void set _count(int count) { _sharedState = count; } | |
| 367 } | |
| 368 | |
| 369 | |
| 370 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | |
| 371 final _Predicate<T> _test; | |
| 372 | |
| 373 _TakeWhileStream(Stream<T> source, bool test(T value)) | |
| 374 : this._test = test, super(source); | |
| 375 | |
| 376 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 377 bool satisfies; | |
| 378 try { | |
| 379 satisfies = _test(inputEvent); | |
| 380 } catch (e, s) { | |
| 381 _addErrorWithReplacement(sink, e, s); | |
| 382 // The test didn't say true. Didn't say false either, but we stop anyway. | |
| 383 sink._close(); | |
| 384 return; | |
| 385 } | |
| 386 if (satisfies) { | |
| 387 sink._add(inputEvent); | |
| 388 } else { | |
| 389 sink._close(); | |
| 390 } | |
| 391 } | |
| 392 } | |
| 393 | |
| 394 class _SkipStream<T> extends _ForwardingStream<T, T> { | |
| 395 final int _count; | |
| 396 | |
| 397 _SkipStream(Stream<T> source, int count) | |
| 398 : this._count = count, super(source) { | |
| 399 // This test is done early to avoid handling an async error | |
| 400 // in the _handleData method. | |
| 401 if (count is! int || count < 0) throw new ArgumentError(count); | |
| 402 } | |
| 403 | |
| 404 StreamSubscription<T> _createSubscription( | |
| 405 void onData(T data), | |
| 406 Function onError, | |
| 407 void onDone(), | |
| 408 bool cancelOnError) { | |
| 409 return new _StateStreamSubscription<T>( | |
| 410 this, onData, onError, onDone, cancelOnError, _count); | |
| 411 } | |
| 412 | |
| 413 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 414 _StateStreamSubscription<T> subscription = sink; | |
| 415 int count = subscription._count; | |
| 416 if (count > 0) { | |
| 417 subscription._count = count - 1; | |
| 418 return; | |
| 419 } | |
| 420 sink._add(inputEvent); | |
| 421 } | |
| 422 } | |
| 423 | |
| 424 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | |
| 425 final _Predicate<T> _test; | |
| 426 | |
| 427 _SkipWhileStream(Stream<T> source, bool test(T value)) | |
| 428 : this._test = test, super(source); | |
| 429 | |
| 430 StreamSubscription<T> _createSubscription( | |
| 431 void onData(T data), | |
| 432 Function onError, | |
| 433 void onDone(), | |
| 434 bool cancelOnError) { | |
| 435 return new _StateStreamSubscription<T>( | |
| 436 this, onData, onError, onDone, cancelOnError, false); | |
| 437 } | |
| 438 | |
| 439 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 440 _StateStreamSubscription<T> subscription = sink; | |
| 441 bool hasFailed = subscription._flag; | |
| 442 if (hasFailed) { | |
| 443 sink._add(inputEvent); | |
| 444 return; | |
| 445 } | |
| 446 bool satisfies; | |
| 447 try { | |
| 448 satisfies = _test(inputEvent); | |
| 449 } catch (e, s) { | |
| 450 _addErrorWithReplacement(sink, e, s); | |
| 451 // A failure to return a boolean is considered "not matching". | |
| 452 subscription._flag = true; | |
| 453 return; | |
| 454 } | |
| 455 if (!satisfies) { | |
| 456 subscription._flag = true; | |
| 457 sink._add(inputEvent); | |
| 458 } | |
| 459 } | |
| 460 } | |
| 461 | |
| 462 typedef bool _Equality<T>(T a, T b); | |
| 463 | |
| 464 class _DistinctStream<T> extends _ForwardingStream<T, T> { | |
| 465 static var _SENTINEL = new Object(); | |
| 466 | |
| 467 _Equality<T> _equals; | |
| 468 var _previous = _SENTINEL; | |
| 469 | |
| 470 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | |
| 471 : _equals = equals, super(source); | |
| 472 | |
| 473 void _handleData(T inputEvent, _EventSink<T> sink) { | |
| 474 if (identical(_previous, _SENTINEL)) { | |
| 475 _previous = inputEvent; | |
| 476 return sink._add(inputEvent); | |
| 477 } else { | |
| 478 bool isEqual; | |
| 479 try { | |
| 480 if (_equals == null) { | |
| 481 isEqual = (_previous == inputEvent); | |
| 482 } else { | |
| 483 isEqual = _equals(_previous as Object /*=T*/, inputEvent); | |
| 484 } | |
| 485 } catch (e, s) { | |
| 486 _addErrorWithReplacement(sink, e, s); | |
| 487 return null; | |
| 488 } | |
| 489 if (!isEqual) { | |
| 490 sink._add(inputEvent); | |
| 491 _previous = inputEvent; | |
| 492 } | |
| 493 } | |
| 494 } | |
| 495 } | |
| OLD | NEW |