Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Side by Side Diff: lib/src/stream_sink_transformer/handler_transformer.dart

Issue 1566603002: Add a StreamSinkTransformer class. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_sink_transformer.handler_transformer;
6
7 import 'dart:async';
8
9 import '../stream_sink_transformer.dart';
10 import '../delegate/stream_sink.dart';
11
12 /// The type of the callback for handling data events.
13 typedef void HandleData<S, T>(S data, EventSink<T> sink);
14
15 /// The type of the callback for handling error events.
16 typedef void HandleError<T>(
17 Object error, StackTrace stackTrace, EventSink<T> sink);
18
19 /// The type of the callback for handling done events.
20 typedef void HandleDone<T>(EventSink<T> sink);
21
22 /// A [StreamSinkTransformer] that delegates events to the given handlers.
23 class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
24 /// The handler for data events.
25 final HandleData<S, T> _handleData;
26
27 /// The handler for error events.
28 final HandleError<T> _handleError;
29
30 /// The handler for done events.
31 final HandleDone<T> _handleDone;
32
33 HandlerTransformer(
34 this._handleData, this._handleError, this._handleDone);
35
36 StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
37 }
38
39 /// A sink created by [HandlerTransformer].
40 class _HandlerSink<S, T> implements StreamSink<S> {
41 /// The transformer that created this sink.
42 final HandlerTransformer<S, T> _transformer;
43
44 /// The original sink that's being transformed.
45 final StreamSink<T> _inner;
46
47 /// The wrapper for [_inner] whose [StreamSink.close] method can't emit
48 /// errors.
49 final StreamSink<T> _safeCloseInner;
50
51 Future get done => _inner.done;
52
53 _HandlerSink(this._transformer, StreamSink<T> inner)
54 : _inner = inner,
55 _safeCloseInner = new _SafeCloseSink<T>(inner);
56
57 void add(S event) {
58 if (_transformer._handleData == null) {
59 // [event] is an S and [_inner.add] takes a T. This style of conversion
60 // will throw an error in checked mode if [_inner] is actually a
61 // [StreamSink<T>], but will work if [_inner] isn't reified and won't add
62 // an extra check in unchecked mode.
63 _inner.add(event as dynamic);
64 } else {
65 _transformer._handleData(event, _safeCloseInner);
66 }
67 }
68
69 void addError(error, [StackTrace stackTrace]) {
70 if (_transformer._handleError == null) {
71 _inner.addError(error, stackTrace);
72 } else {
73 _transformer._handleError(error, stackTrace, _safeCloseInner);
74 }
75 }
76
77 Future addStream(Stream<S> stream) {
78 return _inner.addStream(stream.transform(
79 new StreamTransformer<S, T>.fromHandlers(
80 handleData: _transformer._handleData,
81 handleError: _transformer._handleError,
82 handleDone: _closeSink)));
83 }
84
85 Future close() {
86 if (_transformer._handleDone == null) return _inner.close();
87
88 _transformer._handleDone(_safeCloseInner);
89 return _inner.done;
90 }
91 }
92
93 /// A wrapper for [StreamSink]s that swallows any errors returned by [close].
94 ///
95 /// [HandlerTransformer] passes this to its handlers to ensure that when they
96 /// call [close], they don't leave any dangling [Future]s behind that might emit
97 /// unhandleable errors.
98 class _SafeCloseSink<T> extends DelegatingStreamSink<T> {
99 _SafeCloseSink(StreamSink<T> inner) : super(inner);
100
101 Future close() => super.close().catchError((_) {});
102 }
103
104 /// A function to pass as a [StreamTransformer]'s `handleDone` callback.
105 void _closeSink(EventSink sink) {
106 sink.close();
107 }
OLDNEW
« no previous file with comments | « lib/src/stream_sink_transformer.dart ('k') | lib/src/stream_sink_transformer/stream_transformer_wrapper.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698