OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 314 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
325 assert(!_isCanceled); | 325 assert(!_isCanceled); |
326 assert(!_isPaused); | 326 assert(!_isPaused); |
327 assert(!_inCallback); | 327 assert(!_inCallback); |
328 bool wasInputPaused = _isInputPaused; | 328 bool wasInputPaused = _isInputPaused; |
329 _state |= _STATE_IN_CALLBACK; | 329 _state |= _STATE_IN_CALLBACK; |
330 _zone.runUnaryGuarded(_onData, data); | 330 _zone.runUnaryGuarded(_onData, data); |
331 _state &= ~_STATE_IN_CALLBACK; | 331 _state &= ~_STATE_IN_CALLBACK; |
332 _checkState(wasInputPaused); | 332 _checkState(wasInputPaused); |
333 } | 333 } |
334 | 334 |
335 void _sendError(var error, StackTrace stackTrace) { | 335 void _sendError(Object error, StackTrace stackTrace) { |
336 assert(!_isCanceled); | 336 assert(!_isCanceled); |
337 assert(!_isPaused); | 337 assert(!_isPaused); |
338 assert(!_inCallback); | 338 assert(!_inCallback); |
339 bool wasInputPaused = _isInputPaused; | 339 bool wasInputPaused = _isInputPaused; |
340 | 340 |
341 void sendError() { | 341 void sendError() { |
342 // If the subscription has been canceled while waiting for the cancel | 342 // If the subscription has been canceled while waiting for the cancel |
343 // future to finish we must not report the error. | 343 // future to finish we must not report the error. |
344 if (_isCanceled && !_waitsForCancel) return; | 344 if (_isCanceled && !_waitsForCancel) return; |
345 _state |= _STATE_IN_CALLBACK; | 345 _state |= _STATE_IN_CALLBACK; |
346 // TODO(floitsch): this dynamic should be 'void'. | 346 // TODO(floitsch): this dynamic should be 'void'. |
347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | 347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | 348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = |
349 as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | 349 _onError; |
350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); | 350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
351 } else { | 351 } else { |
352 _zone.runUnaryGuarded<dynamic, Object>( | 352 _zone.runUnaryGuarded<Object>(_onError, error); |
353 _onError as Object/*=ZoneUnaryCallback<dynamic, Object>*/, error); | |
354 } | 353 } |
355 _state &= ~_STATE_IN_CALLBACK; | 354 _state &= ~_STATE_IN_CALLBACK; |
356 } | 355 } |
357 | 356 |
358 if (_cancelOnError) { | 357 if (_cancelOnError) { |
359 _state |= _STATE_WAIT_FOR_CANCEL; | 358 _state |= _STATE_WAIT_FOR_CANCEL; |
360 _cancel(); | 359 _cancel(); |
361 if (_cancelFuture is Future && | 360 if (_cancelFuture is Future && |
362 !identical(_cancelFuture, Future._nullFuture)) { | 361 !identical(_cancelFuture, Future._nullFuture)) { |
363 _cancelFuture.whenComplete(sendError); | 362 _cancelFuture.whenComplete(sendError); |
(...skipping 678 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1042 | 1041 |
1043 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1042 /** An empty broadcast stream, sending a done event as soon as possible. */ |
1044 class _EmptyStream<T> extends Stream<T> { | 1043 class _EmptyStream<T> extends Stream<T> { |
1045 const _EmptyStream() : super._internal(); | 1044 const _EmptyStream() : super._internal(); |
1046 bool get isBroadcast => true; | 1045 bool get isBroadcast => true; |
1047 StreamSubscription<T> listen(void onData(T data), | 1046 StreamSubscription<T> listen(void onData(T data), |
1048 {Function onError, void onDone(), bool cancelOnError}) { | 1047 {Function onError, void onDone(), bool cancelOnError}) { |
1049 return new _DoneStreamSubscription<T>(onDone); | 1048 return new _DoneStreamSubscription<T>(onDone); |
1050 } | 1049 } |
1051 } | 1050 } |
OLD | NEW |