Chromium Code Reviews| Index: lib/src/stream_sink_transformer/handler_transformer.dart |
| diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..b1c6668061f389796a77b3aab3404e4a9d3b9801 |
| --- /dev/null |
| +++ b/lib/src/stream_sink_transformer/handler_transformer.dart |
| @@ -0,0 +1,98 @@ |
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.stream_sink_transformer.handler_transformer; |
| + |
| +import 'dart:async'; |
| + |
| +import '../stream_sink_transformer.dart'; |
| +import '../delegate/stream_sink.dart'; |
| + |
| +/// The type of the callback for handling data events. |
| +typedef void HandleData<S, T>(S data, EventSink<T> sink); |
| + |
| +/// The type of the callback for handling error events. |
| +typedef void HandleError<T>( |
| + Object error, StackTrace stackTrace, EventSink<T> sink); |
| + |
| +/// The type of the callback for handling done events. |
| +typedef void HandleDone<T>(EventSink<T> sink); |
| + |
| +/// A [StreamSinkTransformer] that delegates events to the given handlers. |
| +class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> { |
| + /// The handler for data events. |
| + final HandleData<S, T> _handleData; |
| + |
| + /// The handler for error events. |
| + final HandleError<T> _handleError; |
| + |
| + /// The handler for done events. |
| + final HandleDone<T> _handleDone; |
| + |
| + HandlerTransformer( |
| + this._handleData, this._handleError, this._handleDone); |
| + |
| + StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink); |
| +} |
| + |
| +/// A sink created by [HandlerTransformer]. |
| +class _HandlerSink<S, T> implements StreamSink<S> { |
| + /// The transformer that created this sink. |
| + final HandlerTransformer<S, T> _transformer; |
| + |
| + /// The original sink that's being transformed. |
| + final StreamSink<T> _inner; |
| + |
| + /// The wrapper for [_inner] whose [StreamSink.close] method can't emit |
| + /// errors. |
| + final StreamSink<T> _safeCloseInner; |
| + |
| + Future get done => _inner.done; |
| + |
| + _HandlerSink(this._transformer, StreamSink<T> inner) |
| + : _inner = inner, |
| + _safeCloseInner = new _SafeCloseSink<T>(inner); |
| + |
| + void add(S event) { |
| + if (_transformer._handleData == null) { |
| + _inner.add(event as T); |
|
Lasse Reichstein Nielsen
2016/01/07 07:37:02
This does a type check, even in production mode.
I
nweiz
2016/01/07 21:32:26
Done.
|
| + } else { |
| + _transformer._handleData(event, _safeCloseInner); |
| + } |
| + } |
| + |
| + void addError(error, [StackTrace stackTrace]) { |
| + if (_transformer._handleError == null) { |
| + _inner.addError(error, stackTrace); |
| + } else { |
| + _transformer._handleError(error, stackTrace, _safeCloseInner); |
| + } |
| + } |
| + |
| + Future addStream(Stream<S> stream) { |
| + return _inner.addStream(stream.transform( |
| + new StreamTransformer<S, T>.fromHandlers( |
| + handleData: _transformer._handleData, |
| + handleError: _transformer._handleError, |
| + handleDone: (sink) => sink.close()))); |
|
Lasse Reichstein Nielsen
2016/01/07 07:37:02
Consider having a static/top-level helper function
nweiz
2016/01/07 21:32:26
It's easy to mechanically tell that this doesn't a
Lasse Reichstein Nielsen
2016/01/08 07:23:06
The implementation will not avoid creating a new f
nweiz
2016/01/11 21:06:14
Done.
|
| + } |
| + |
| + Future close() { |
| + if (_transformer._handleDone == null) return _inner.close(); |
| + |
| + _transformer._handleDone(_safeCloseInner); |
| + return _inner.done; |
| + } |
| +} |
| + |
| +/// A wrapper for [StreamSink]s that swallows any errors returned by [close]. |
| +/// |
| +/// [HandlerTransformer] passes this to its handlers to ensure that when they |
| +/// call [close], they don't leave any dangling [Future]s behind that might emit |
| +/// unhandleable errors. |
| +class _SafeCloseSink<T> extends DelegatingStreamSink<T> { |
| + _SafeCloseSink(StreamSink<T> inner) : super(inner); |
| + |
| + Future close() => super.close().catchError((_) {}); |
| +} |