Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(211)

Side by Side Diff: lib/src/stream_sink_transformer/handler_transformer.dart

Issue 1566603002: Add a StreamSinkTransformer class. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Update pubspec. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_sink_transformer.handler_transformer;
6
7 import 'dart:async';
8
9 import '../stream_sink_transformer.dart';
10 import '../delegate/stream_sink.dart';
11
12 /// The type of the callback for handling data events.
13 typedef void HandleData<S, T>(S data, EventSink<T> sink);
14
15 /// The type of the callback for handling error events.
16 typedef void HandleError<T>(
17 Object error, StackTrace stackTrace, EventSink<T> sink);
18
19 /// The type of the callback for handling done events.
20 typedef void HandleDone<T>(EventSink<T> sink);
21
22 /// A [StreamSinkTransformer] that delegates events to the given handlers.
23 class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
24 /// The handler for data events.
25 final HandleData<S, T> _handleData;
26
27 /// The handler for error events.
28 final HandleError<T> _handleError;
29
30 /// The handler for done events.
31 final HandleDone<T> _handleDone;
32
33 HandlerTransformer(
34 this._handleData, this._handleError, this._handleDone);
35
36 StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
37 }
38
39 /// A sink created by [HandlerTransformer].
40 class _HandlerSink<S, T> implements StreamSink<S> {
41 /// The transformer that created this sink.
42 final HandlerTransformer<S, T> _transformer;
43
44 /// The original sink that's being transformed.
45 final StreamSink<T> _inner;
46
47 /// The wrapper for [_inner] whose [StreamSink.close] method can't emit
48 /// errors.
49 final StreamSink<T> _safeCloseInner;
50
51 Future get done => _inner.done;
52
53 _HandlerSink(this._transformer, StreamSink<T> inner)
54 : _inner = inner,
55 _safeCloseInner = new _SafeCloseSink<T>(inner);
56
57 void add(S event) {
58 if (_transformer._handleData == null) {
59 _inner.add(event as T);
Lasse Reichstein Nielsen 2016/01/07 07:37:02 This does a type check, even in production mode. I
nweiz 2016/01/07 21:32:26 Done.
60 } else {
61 _transformer._handleData(event, _safeCloseInner);
62 }
63 }
64
65 void addError(error, [StackTrace stackTrace]) {
66 if (_transformer._handleError == null) {
67 _inner.addError(error, stackTrace);
68 } else {
69 _transformer._handleError(error, stackTrace, _safeCloseInner);
70 }
71 }
72
73 Future addStream(Stream<S> stream) {
74 return _inner.addStream(stream.transform(
75 new StreamTransformer<S, T>.fromHandlers(
76 handleData: _transformer._handleData,
77 handleError: _transformer._handleError,
78 handleDone: (sink) => sink.close())));
Lasse Reichstein Nielsen 2016/01/07 07:37:02 Consider having a static/top-level helper function
nweiz 2016/01/07 21:32:26 It's easy to mechanically tell that this doesn't a
Lasse Reichstein Nielsen 2016/01/08 07:23:06 The implementation will not avoid creating a new f
nweiz 2016/01/11 21:06:14 Done.
79 }
80
81 Future close() {
82 if (_transformer._handleDone == null) return _inner.close();
83
84 _transformer._handleDone(_safeCloseInner);
85 return _inner.done;
86 }
87 }
88
89 /// A wrapper for [StreamSink]s that swallows any errors returned by [close].
90 ///
91 /// [HandlerTransformer] passes this to its handlers to ensure that when they
92 /// call [close], they don't leave any dangling [Future]s behind that might emit
93 /// unhandleable errors.
94 class _SafeCloseSink<T> extends DelegatingStreamSink<T> {
95 _SafeCloseSink(StreamSink<T> inner) : super(inner);
96
97 Future close() => super.close().catchError((_) {});
98 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698