| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 325 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 336 assert(!_isCanceled); | 336 assert(!_isCanceled); |
| 337 assert(!_isPaused); | 337 assert(!_isPaused); |
| 338 assert(!_inCallback); | 338 assert(!_inCallback); |
| 339 bool wasInputPaused = _isInputPaused; | 339 bool wasInputPaused = _isInputPaused; |
| 340 | 340 |
| 341 void sendError() { | 341 void sendError() { |
| 342 // If the subscription has been canceled while waiting for the cancel | 342 // If the subscription has been canceled while waiting for the cancel |
| 343 // future to finish we must not report the error. | 343 // future to finish we must not report the error. |
| 344 if (_isCanceled && !_waitsForCancel) return; | 344 if (_isCanceled && !_waitsForCancel) return; |
| 345 _state |= _STATE_IN_CALLBACK; | 345 _state |= _STATE_IN_CALLBACK; |
| 346 // TODO(floitsch): this dynamic should be 'void'. |
| 346 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | 347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
| 347 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | 348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
| 348 as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | 349 as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
| 349 _zone.runBinaryGuarded(errorCallback, error, stackTrace); | 350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
| 350 } else { | 351 } else { |
| 351 _zone.runUnaryGuarded<dynamic, dynamic>( | 352 _zone.runUnaryGuarded<dynamic, Object>( |
| 352 _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | 353 _onError as Object/*=ZoneUnaryCallback<dynamic, Object>*/, error); |
| 353 } | 354 } |
| 354 _state &= ~_STATE_IN_CALLBACK; | 355 _state &= ~_STATE_IN_CALLBACK; |
| 355 } | 356 } |
| 356 | 357 |
| 357 if (_cancelOnError) { | 358 if (_cancelOnError) { |
| 358 _state |= _STATE_WAIT_FOR_CANCEL; | 359 _state |= _STATE_WAIT_FOR_CANCEL; |
| 359 _cancel(); | 360 _cancel(); |
| 360 if (_cancelFuture is Future && | 361 if (_cancelFuture is Future && |
| 361 !identical(_cancelFuture, Future._nullFuture)) { | 362 !identical(_cancelFuture, Future._nullFuture)) { |
| 362 _cancelFuture.whenComplete(sendError); | 363 _cancelFuture.whenComplete(sendError); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 394 } | 395 } |
| 395 } | 396 } |
| 396 | 397 |
| 397 /** | 398 /** |
| 398 * Call a hook function. | 399 * Call a hook function. |
| 399 * | 400 * |
| 400 * The call is properly wrapped in code to avoid other callbacks | 401 * The call is properly wrapped in code to avoid other callbacks |
| 401 * during the call, and it checks for state changes after the call | 402 * during the call, and it checks for state changes after the call |
| 402 * that should cause further callbacks. | 403 * that should cause further callbacks. |
| 403 */ | 404 */ |
| 404 void _guardCallback(callback) { | 405 void _guardCallback(void callback()) { |
| 405 assert(!_inCallback); | 406 assert(!_inCallback); |
| 406 bool wasInputPaused = _isInputPaused; | 407 bool wasInputPaused = _isInputPaused; |
| 407 _state |= _STATE_IN_CALLBACK; | 408 _state |= _STATE_IN_CALLBACK; |
| 408 callback(); | 409 callback(); |
| 409 _state &= ~_STATE_IN_CALLBACK; | 410 _state &= ~_STATE_IN_CALLBACK; |
| 410 _checkState(wasInputPaused); | 411 _checkState(wasInputPaused); |
| 411 } | 412 } |
| 412 | 413 |
| 413 /** | 414 /** |
| 414 * Check if the input needs to be informed of state changes. | 415 * Check if the input needs to be informed of state changes. |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 544 } | 545 } |
| 545 } | 546 } |
| 546 | 547 |
| 547 // Internal helpers. | 548 // Internal helpers. |
| 548 | 549 |
| 549 // Types of the different handlers on a stream. Types used to type fields. | 550 // Types of the different handlers on a stream. Types used to type fields. |
| 550 typedef void _DataHandler<T>(T value); | 551 typedef void _DataHandler<T>(T value); |
| 551 typedef void _DoneHandler(); | 552 typedef void _DoneHandler(); |
| 552 | 553 |
| 553 /** Default data handler, does nothing. */ | 554 /** Default data handler, does nothing. */ |
| 554 void _nullDataHandler(var value) {} | 555 void _nullDataHandler(Object value) {} |
| 555 | 556 |
| 556 /** Default error handler, reports the error to the current zone's handler. */ | 557 /** Default error handler, reports the error to the current zone's handler. */ |
| 557 void _nullErrorHandler(error, [StackTrace stackTrace]) { | 558 void _nullErrorHandler(Object error, [StackTrace stackTrace]) { |
| 558 Zone.current.handleUncaughtError(error, stackTrace); | 559 Zone.current.handleUncaughtError(error, stackTrace); |
| 559 } | 560 } |
| 560 | 561 |
| 561 /** Default done handler, does nothing. */ | 562 /** Default done handler, does nothing. */ |
| 562 void _nullDoneHandler() {} | 563 void _nullDoneHandler() {} |
| 563 | 564 |
| 564 /** A delayed event on a buffering stream subscription. */ | 565 /** A delayed event on a buffering stream subscription. */ |
| 565 abstract class _DelayedEvent<T> { | 566 abstract class _DelayedEvent<T> { |
| 566 /** Added as a linked list on the [StreamController]. */ | 567 /** Added as a linked list on the [StreamController]. */ |
| 567 _DelayedEvent next; | 568 _DelayedEvent next; |
| (...skipping 472 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1040 | 1041 |
| 1041 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1042 /** An empty broadcast stream, sending a done event as soon as possible. */ |
| 1042 class _EmptyStream<T> extends Stream<T> { | 1043 class _EmptyStream<T> extends Stream<T> { |
| 1043 const _EmptyStream() : super._internal(); | 1044 const _EmptyStream() : super._internal(); |
| 1044 bool get isBroadcast => true; | 1045 bool get isBroadcast => true; |
| 1045 StreamSubscription<T> listen(void onData(T data), | 1046 StreamSubscription<T> listen(void onData(T data), |
| 1046 {Function onError, void onDone(), bool cancelOnError}) { | 1047 {Function onError, void onDone(), bool cancelOnError}) { |
| 1047 return new _DoneStreamSubscription<T>(onDone); | 1048 return new _DoneStreamSubscription<T>(onDone); |
| 1048 } | 1049 } |
| 1049 } | 1050 } |
| OLD | NEW |