Chromium Code Reviews| Index: sdk/lib/async/stream_transformers.dart |
| diff --git a/sdk/lib/async/stream_transformers.dart b/sdk/lib/async/stream_transformers.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..4f1ad0a66dcbbd1ec83ec397b33e01adc6f39285 |
| --- /dev/null |
| +++ b/sdk/lib/async/stream_transformers.dart |
| @@ -0,0 +1,311 @@ |
| +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +part of dart.async; |
| + |
| +/** |
| + * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
| + */ |
| +class _EventSinkAdapter<T> implements EventSink<T> { |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Just to be pedantic (with myself, since I named th
floitsch
2013/10/10 15:39:57
Renamed to _EventSinkWrapper. Yey for dartEditor's
|
| + _EventSink _sink; |
| + _EventSinkAdapter(this._sink); |
| + |
| + void add(T data) { _sink._add(data); } |
| + void addError(error, [StackTrace stackTrace]) { |
| + _sink._addError(error, stackTrace); |
| + } |
| + void close() { _sink._close(); } |
| +} |
| + |
| +/** |
| + * A StreamSubscription that pipes data through a sink. |
| + * |
| + * The constructor of this class takes a [_SinkMapper] which maps from |
| + * [EventSink] to [EventSink]. The input to the mapper is the output of |
| + * the transformation. The returned sink is the transformation's input. |
| + */ |
| +class _SinkTransformerStreamSubscription<S, T> |
| + extends _BufferingStreamSubscription<T> { |
| + /// The transformer's input sink. |
| + EventSink _transformerSink; |
| + |
| + /// The subscription to the input stream. |
| + StreamSubscription<S> _subscription; |
| + |
| + _SinkTransformerStreamSubscription(Stream<S> source, |
| + _SinkMapper mapper, |
| + void onData(T data), |
| + Function onError, |
| + void onDone(), |
| + bool cancelOnError) |
| + // We set the adapter's target only when the user is allowed to send data. |
| + : super(onData, onError, onDone, cancelOnError) { |
| + _EventSinkAdapter<T> eventSink = new _EventSinkAdapter<T>(this); |
| + _transformerSink = mapper(eventSink); |
| + _subscription = source.listen(_handleData, |
| + onError: _handleError, |
| + onDone: _handleDone); |
| + } |
| + |
| + /** Whether this subscription is still subscribed to its source. */ |
| + bool get _isSubscribed => _subscription != null; |
| + |
| + // _EventSink interface. |
| + |
| + /** |
| + * Adds an event to this subscriptions. |
| + * |
| + * Contrary to normal [_BufferingStreamSubscription]s we may receive |
| + * events when the stream is already closed. Report them as state |
| + * error. |
| + */ |
| + void _add(T data) { |
| + if (_isClosed) { |
| + throw new StateError("Stream is already closed"); |
| + } |
| + super._add(data); |
| + } |
| + |
| + /** |
| + * Adds an error event to this subscriptions. |
| + * |
| + * Contrary to normal [_BufferingStreamSubscription]s we may receive |
| + * events when the stream is already closed. Report them as state |
| + * error. |
| + */ |
| + void _addError(Object error, StackTrace stackTrace) { |
| + if (_isClosed) { |
| + throw new StateError("Stream is already closed"); |
| + } |
| + super._addError(error, stackTrace); |
| + } |
| + |
| + /** |
| + * Adds a close event to this subscriptions. |
| + * |
| + * Contrary to normal [_BufferingStreamSubscription]s we may receive |
| + * events when the stream is already closed. Report them as state |
| + * error. |
| + */ |
| + void _close() { |
| + if (_isClosed) { |
| + throw new StateError("Stream is already closed"); |
| + } |
| + super._close(); |
| + } |
| + |
| + // _BufferingStreamSubscription hooks. |
| + |
| + void _onPause() { |
| + if (_isSubscribed) _subscription.pause(); |
| + } |
| + |
| + void _onResume() { |
| + if (_isSubscribed) _subscription.resume(); |
| + } |
| + |
| + void _onCancel() { |
| + if (_isSubscribed) { |
| + StreamSubscription subscription = _subscription; |
| + _subscription = null; |
| + subscription.cancel(); |
| + } |
| + } |
| + |
| + void _handleData(S data) { |
| + try { |
| + _transformerSink.add(data); |
| + } catch (e, s) { |
| + _addError(_asyncError(e, s), s); |
| + } |
| + } |
| + |
| + void _handleError(error, [stackTrace]) { |
| + try { |
| + _transformerSink.addError(error, stackTrace); |
| + } catch (e, s) { |
| + if (identical(e, error)) { |
| + _addError(error, stackTrace); |
| + } else { |
| + _addError(_asyncError(e, s), s); |
| + } |
| + } |
| + } |
| + |
| + void _handleDone() { |
| + try { |
| + _subscription = null; |
| + _transformerSink.close(); |
| + } catch (e, s) { |
| + _addError(_asyncError(e, s), s); |
| + } |
| + } |
| +} |
| + |
| + |
| +typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output); |
| + |
| +/** |
| + * A StreamTransformer for Sink-mappers. |
| + * |
| + * A Sink-mapper takes an [EventSink] (its output) and returns another |
| + * EventSink (its input). |
| + * |
| + * Note that this class can be `const`. |
| + */ |
| +class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> { |
| + final _SinkMapper<S, T> _sinkMapper; |
| + const _StreamSinkTransformer(this._sinkMapper); |
| + |
| + Stream<T> bind(Stream<S> stream) |
| + => new _BoundSinkStream<S, T>(stream, _sinkMapper); |
| +} |
| + |
| +/** |
| + * The result of binding a StreamTransformer for Sink-mappers. |
| + * |
| + * It contains the bound Stream and the sink-mapper. Only when the user starts |
| + * listening to this stream is the sink-mapper invoked. The result is used |
| + * to create a StreamSubscription that transforms events. |
| + */ |
| +class _BoundSinkStream<S, T> extends Stream<T> { |
| + final _SinkMapper<S, T> _sinkMapper; |
| + final Stream<S> _stream; |
| + |
| + _BoundSinkStream(this._stream, this._sinkMapper); |
| + |
| + StreamSubscription<T> listen(void onData(T event), |
| + { Function onError, |
| + void onDone(), |
| + bool cancelOnError }) { |
| + cancelOnError = identical(true, cancelOnError); |
| + return new _SinkTransformerStreamSubscription( |
| + _stream, _sinkMapper, onData, onError, onDone, cancelOnError); |
| + } |
| +} |
| + |
| +/// Data-handler coming from [StreamTransformer.fromHandlers]. |
| +typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| +/// Error-handler coming from [StreamTransformer.fromHandlers]. |
| +typedef void _TransformErrorHandler<T>( |
| + Object error, StackTrace stackTrace, EventSink<T> sink); |
| +/// Done-handler coming from [StreamTransformer.fromHandlers]. |
| +typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
| + |
| +/** |
| + * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. |
| + * |
| + * This way we can reuse the code from [_StreamSinkTransformer]. |
| + */ |
| +class _HandlerEventSink<S, T> implements EventSink<T> { |
| + final _TransformDataHandler<S, T> _handleData; |
| + final _TransformErrorHandler<T> _handleError; |
| + final _TransformDoneHandler<T> _handleDone; |
| + |
| + /// The output sink where the handlers should send their data into. |
| + final EventSink<T> _sink; |
| + |
| + _HandlerEventSink(this._handleData, this._handleError, this._handleDone, |
| + this._sink); |
| + |
| + void add(S data) => _handleData(data, _sink); |
| + void addError(Object error, [StackTrace stackTrace]) |
| + => _handleError(error, stackTrace, _sink); |
| + void close() => _handleDone(_sink); |
| +} |
| + |
| +/** |
| + * A StreamTransformer that transformers events with the given handlers. |
| + * |
| + * Note that this transformer can only be used once. |
| + */ |
| +class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { |
| + bool _hasBeenUsed = false; |
| + |
| + _StreamHandlerTransformer({ |
| + void handleData(S data, EventSink<T> sink), |
| + void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
| + void handleDone(EventSink<T> sink)}) |
| + : super((EventSink<T> outputSink) { |
| + if (handleData == null) handleData = _defaultHandleData; |
| + if (handleError == null) handleError = _defaultHandleError; |
| + if (handleDone == null) handleDone = _defaultHandleDone; |
| + return new _HandlerEventSink<S, T>( |
| + handleData, handleError, handleDone, outputSink); |
| + }); |
| + |
| + Stream<T> bind(Stream<S> stream) { |
| + if (_hasBeenUsed) { |
| + throw new StateError("Transformer has already been used."); |
| + } |
| + _hasBeenUsed = true; |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
This was the reason it could only be used once?
C
floitsch
2013/10/10 15:39:57
Done.
|
| + return super.bind(stream); |
| + } |
| + |
| + /** Default data handler forwards all data. */ |
| + static void _defaultHandleData(var data, EventSink sink) { |
| + sink.add(data); |
| + } |
| + |
| + /** Default error handler forwards all errors. */ |
| + static void _defaultHandleError(error, StackTrace stackTrace, |
| + EventSink sink) { |
| + sink.addError(error); |
| + } |
| + |
| + /** Default done handler forwards done. */ |
| + static void _defaultHandleDone(EventSink sink) { |
| + sink.close(); |
| + } |
| +} |
| + |
| +/// A closure mapping a stream and cancelOnError to a StreamSubscription. |
| +typedef StreamSubscription<T> _SubscriptionTransformer<S, T>( |
| + Stream<S> stream, bool cancelOnError); |
| + |
| +/** |
| + * This class is a fully generic [StreamTransformer]. |
| + * |
| + * Instead of implementing three classes: a [StreamTransformer], a [Stream] |
| + * (as the result of a `bind` call) and a [StreamSubscription] (which does the |
| + * actual work), this class only requires a closure that is invoked when the |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
closure -> function.
Using "closure" suggests that
floitsch
2013/10/10 15:39:57
Done.
|
| + * last bit (the subscription) of the transformer-workflow is needed. |
| + * |
| + * The given transformer closure maps from Stream, cancelOnError to a |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
"Stream, cancelOnError" -> "Stream and cancelOnErr
floitsch
2013/10/10 15:39:57
Done.
|
| + * StreamSubscription. As such it can also act on `cancel` events, making it |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
`StreamSubscription`.
floitsch
2013/10/10 15:39:57
Done.
|
| + * fully generic. |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
"fully generic" -> "fully general".
"Generic" soun
floitsch
2013/10/10 15:39:57
Done.
|
| + */ |
| +class _StreamSubscriptionTransformer<S, T> implements StreamTransformer<S, T> { |
| + final _SubscriptionTransformer<S, T> _transformer; |
| + |
| + const _StreamSubscriptionTransformer(this._transformer); |
| + |
| + Stream<T> bind(Stream<S> stream) |
| + => new _BoundSubscriptionStream<S, T>(stream, _transformer); |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Style guide wants '=>' on the previous line.
floitsch
2013/10/10 15:39:57
Done.
|
| +} |
| + |
| +/** |
| + * The bound version of [_StreamSubscriptionTransformer]. |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Not obvious what "bound" means. I guess it refers
floitsch
2013/10/10 15:39:57
Done.
|
| + * |
| + * This class is a [Stream] that has already an input stream. The transformer |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
"has already" -> "already has". Maybe drop "alread
floitsch
2013/10/10 15:39:57
Rewritten.
|
| + * closure is, however, only invoked when this stream is listened to. |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Drop ", however,". It suggests that this is surpri
floitsch
2013/10/10 15:39:57
Rewritten.
|
| + */ |
| +class _BoundSubscriptionStream<S, T> extends Stream<T> { |
| + final _SubscriptionTransformer<S, T> _transformer; |
| + final Stream<S> _stream; |
| + |
| + _BoundSubscriptionStream(this._stream, this._transformer); |
| + |
| + StreamSubscription<T> listen(void onData(T event), |
| + { Function onError, |
| + void onDone(), |
| + bool cancelOnError }) { |
| + cancelOnError = identical(true, cancelOnError); |
| + StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
| + result.onData(onData); |
| + result.onError(onError); |
| + result.onDone(onDone); |
| + return result; |
| + } |
| +} |