OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ |
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { |
9 if (error is AsyncError) return error; | 9 if (error is AsyncError) return error; |
10 if (cause == null) return new AsyncError(error, stackTrace); | 10 if (cause == null) return new AsyncError(error, stackTrace); |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
44 * to an underlying stream, and wraps the returned subscription to | 44 * to an underlying stream, and wraps the returned subscription to |
45 * modify the events on the way. | 45 * modify the events on the way. |
46 * | 46 * |
47 * This class is intended for internal use only. | 47 * This class is intended for internal use only. |
48 */ | 48 */ |
49 abstract class _ForwardingStream<S, T> extends Stream<T> { | 49 abstract class _ForwardingStream<S, T> extends Stream<T> { |
50 final Stream<S> _source; | 50 final Stream<S> _source; |
51 | 51 |
52 _ForwardingStream(this._source); | 52 _ForwardingStream(this._source); |
53 | 53 |
54 bool get isSingleSubscription => _source.isSingleSubscription; | 54 bool get isBroadcast => _source.isBroadcast; |
| 55 |
| 56 bool asBroadcastStream() => _source.asBroadcastStream; |
55 | 57 |
56 StreamSubscription listen(void onData(T value), | 58 StreamSubscription listen(void onData(T value), |
57 { void onError(AsyncError error), | 59 { void onError(AsyncError error), |
58 void onDone(), | 60 void onDone(), |
59 bool unsubscribeOnError }) { | 61 bool unsubscribeOnError }) { |
60 if (onData == null) onData = _nullDataHandler; | 62 if (onData == null) onData = _nullDataHandler; |
61 if (onError == null) onError = _nullErrorHandler; | 63 if (onError == null) onError = _nullErrorHandler; |
62 if (onDone == null) onDone = _nullDoneHandler; | 64 if (onDone == null) onDone = _nullDoneHandler; |
63 unsubscribeOnError = identical(true, unsubscribeOnError); | 65 unsubscribeOnError = identical(true, unsubscribeOnError); |
64 StreamSubscription subscription = | 66 StreamSubscription subscription = |
(...skipping 435 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
500 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ | 502 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ |
501 class _StreamImplSink<T> implements StreamSink<T> { | 503 class _StreamImplSink<T> implements StreamSink<T> { |
502 _StreamImpl<T> _target; | 504 _StreamImpl<T> _target; |
503 _StreamImplSink(this._target); | 505 _StreamImplSink(this._target); |
504 void add(T data) { _target._add(data); } | 506 void add(T data) { _target._add(data); } |
505 void signalError(AsyncError error) { _target._signalError(error); } | 507 void signalError(AsyncError error) { _target._signalError(error); } |
506 void close() { _target._close(); } | 508 void close() { _target._close(); } |
507 } | 509 } |
508 | 510 |
509 | 511 |
OLD | NEW |