Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1058)

Side by Side Diff: lib/src/stream_sink_transformer/handler_transformer.dart

Issue 1566603002: Add a StreamSinkTransformer class. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_sink_transformer.handler_transformer;
6
7 import 'dart:async';
8
9 import '../stream_sink_transformer.dart';
10 import '../delegate/stream_sink.dart';
11
12 /// The type of the callback for handling data events.
13 typedef void HandleData<S, T>(S data, EventSink<T> sink);
14
15 /// The type of the callback for handling error events.
16 typedef void HandleError<T>(
17 Object error, StackTrace stackTrace, EventSink<T> sink);
18
19 /// The type of the callback for handling done events.
20 typedef void HandleDone<T>(EventSink<T> sink);
21
22 /// A [StreamSinkTransformer] that delegates events to the given handlers.
23 class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
24 /// The handler for data events.
25 final HandleData<S, T> _handleData;
26
27 /// The handler for error events.
28 final HandleError<T> _handleError;
29
30 /// The handler for done events.
31 final HandleDone<T> _handleDone;
32
33 HandlerTransformer(
34 this._handleData, this._handleError, this._handleDone);
35
36 StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
37 }
38
39 /// A sink created by [HandlerTransformer].
40 class _HandlerSink<S, T> implements StreamSink<S> {
41 /// The transformer that created this sink.
42 final HandlerTransformer<S, T> _transformer;
43
44 /// The original sink that's being transformed.
45 final StreamSink<T> _inner;
46
47 /// The wrapper for [_inner] whose [StreamSink.close] method can't emit
48 /// errors.
49 final StreamSink<T> _safeCloseInner;
50
51 Future get done => _inner.done;
52
53 _HandlerSink(this._transformer, StreamSink<T> inner)
54 : _inner = inner,
55 _safeCloseInner = new _SafeCloseSink<T>(inner);
56
57 void add(S event) {
58 if (_transformer._handleData == null) {
59 // [event] is an S and [_inner.add] takes a T. This style of conversion
60 // will throw an error in checked mode if [_inner] is actually a
61 // [StreamSink<T>], but will work if [_inner] isn't reified and won't add
62 // an extra branch in unchecked mode.
Lasse Reichstein Nielsen 2016/01/08 07:23:06 branch -> check ?
nweiz 2016/01/11 21:06:14 Done.
63 dynamic eventUntyped = event;
64 _inner.add(eventUntyped);
65 } else {
66 _transformer._handleData(event, _safeCloseInner);
67 }
68 }
69
70 void addError(error, [StackTrace stackTrace]) {
71 if (_transformer._handleError == null) {
72 _inner.addError(error, stackTrace);
73 } else {
74 _transformer._handleError(error, stackTrace, _safeCloseInner);
75 }
76 }
77
78 Future addStream(Stream<S> stream) {
79 return _inner.addStream(stream.transform(
80 new StreamTransformer<S, T>.fromHandlers(
81 handleData: _transformer._handleData,
82 handleError: _transformer._handleError,
83 handleDone: (sink) => sink.close())));
84 }
85
86 Future close() {
87 if (_transformer._handleDone == null) return _inner.close();
88
89 _transformer._handleDone(_safeCloseInner);
90 return _inner.done;
91 }
92 }
93
94 /// A wrapper for [StreamSink]s that swallows any errors returned by [close].
95 ///
96 /// [HandlerTransformer] passes this to its handlers to ensure that when they
97 /// call [close], they don't leave any dangling [Future]s behind that might emit
98 /// unhandleable errors.
99 class _SafeCloseSink<T> extends DelegatingStreamSink<T> {
100 _SafeCloseSink(StreamSink<T> inner) : super(inner);
101
102 Future close() => super.close().catchError((_) {});
103 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698