Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: sdk/lib/async/stream_transformers.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** 7 /**
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
9 */ 9 */
10 class _EventSinkWrapper<T> implements EventSink<T> { 10 class _EventSinkWrapper<T> implements EventSink<T> {
11 _EventSink _sink; 11 _EventSink _sink;
12 _EventSinkWrapper(this._sink); 12 _EventSinkWrapper(this._sink);
13 13
14 void add(T data) { _sink._add(data); } 14 void add(T data) {
15 _sink._add(data);
16 }
17
15 void addError(error, [StackTrace stackTrace]) { 18 void addError(error, [StackTrace stackTrace]) {
16 _sink._addError(error, stackTrace); 19 _sink._addError(error, stackTrace);
17 } 20 }
18 void close() { _sink._close(); } 21
22 void close() {
23 _sink._close();
24 }
19 } 25 }
20 26
21 /** 27 /**
22 * A StreamSubscription that pipes data through a sink. 28 * A StreamSubscription that pipes data through a sink.
23 * 29 *
24 * The constructor of this class takes a [_SinkMapper] which maps from 30 * The constructor of this class takes a [_SinkMapper] which maps from
25 * [EventSink] to [EventSink]. The input to the mapper is the output of 31 * [EventSink] to [EventSink]. The input to the mapper is the output of
26 * the transformation. The returned sink is the transformation's input. 32 * the transformation. The returned sink is the transformation's input.
27 */ 33 */
28 class _SinkTransformerStreamSubscription<S, T> 34 class _SinkTransformerStreamSubscription<S, T>
29 extends _BufferingStreamSubscription<T> { 35 extends _BufferingStreamSubscription<T> {
30 /// The transformer's input sink. 36 /// The transformer's input sink.
31 EventSink<S> _transformerSink; 37 EventSink<S> _transformerSink;
32 38
33 /// The subscription to the input stream. 39 /// The subscription to the input stream.
34 StreamSubscription<S> _subscription; 40 StreamSubscription<S> _subscription;
35 41
36 _SinkTransformerStreamSubscription(Stream<S> source, 42 _SinkTransformerStreamSubscription(Stream<S> source, _SinkMapper<S, T> mapper,
37 _SinkMapper<S, T> mapper, 43 void onData(T data), Function onError, void onDone(), bool cancelOnError)
38 void onData(T data),
39 Function onError,
40 void onDone(),
41 bool cancelOnError)
42 // We set the adapter's target only when the user is allowed to send data. 44 // We set the adapter's target only when the user is allowed to send data.
43 : super(onData, onError, onDone, cancelOnError) { 45 : super(onData, onError, onDone, cancelOnError) {
44 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); 46 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this);
45 _transformerSink = mapper(eventSink); 47 _transformerSink = mapper(eventSink);
46 _subscription = source.listen(_handleData, 48 _subscription =
47 onError: _handleError, 49 source.listen(_handleData, onError: _handleError, onDone: _handleDone);
48 onDone: _handleDone);
49 } 50 }
50 51
51 /** Whether this subscription is still subscribed to its source. */ 52 /** Whether this subscription is still subscribed to its source. */
52 bool get _isSubscribed => _subscription != null; 53 bool get _isSubscribed => _subscription != null;
53 54
54 // _EventSink interface. 55 // _EventSink interface.
55 56
56 /** 57 /**
57 * Adds an event to this subscriptions. 58 * Adds an event to this subscriptions.
58 * 59 *
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
137 void _handleDone() { 138 void _handleDone() {
138 try { 139 try {
139 _subscription = null; 140 _subscription = null;
140 _transformerSink.close(); 141 _transformerSink.close();
141 } catch (e, s) { 142 } catch (e, s) {
142 _addError(e, s); 143 _addError(e, s);
143 } 144 }
144 } 145 }
145 } 146 }
146 147
147
148 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output); 148 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
149 149
150 /** 150 /**
151 * A StreamTransformer for Sink-mappers. 151 * A StreamTransformer for Sink-mappers.
152 * 152 *
153 * A Sink-mapper takes an [EventSink] (its output) and returns another 153 * A Sink-mapper takes an [EventSink] (its output) and returns another
154 * EventSink (its input). 154 * EventSink (its input).
155 * 155 *
156 * Note that this class can be `const`. 156 * Note that this class can be `const`.
157 */ 157 */
158 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> { 158 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> {
159 final _SinkMapper<S, T> _sinkMapper; 159 final _SinkMapper<S, T> _sinkMapper;
160 const _StreamSinkTransformer(this._sinkMapper); 160 const _StreamSinkTransformer(this._sinkMapper);
161 161
162 Stream<T> bind(Stream<S> stream) 162 Stream<T> bind(Stream<S> stream) =>
163 => new _BoundSinkStream<S, T>(stream, _sinkMapper); 163 new _BoundSinkStream<S, T>(stream, _sinkMapper);
164 } 164 }
165 165
166 /** 166 /**
167 * The result of binding a StreamTransformer for Sink-mappers. 167 * The result of binding a StreamTransformer for Sink-mappers.
168 * 168 *
169 * It contains the bound Stream and the sink-mapper. Only when the user starts 169 * It contains the bound Stream and the sink-mapper. Only when the user starts
170 * listening to this stream is the sink-mapper invoked. The result is used 170 * listening to this stream is the sink-mapper invoked. The result is used
171 * to create a StreamSubscription that transforms events. 171 * to create a StreamSubscription that transforms events.
172 */ 172 */
173 class _BoundSinkStream<S, T> extends Stream<T> { 173 class _BoundSinkStream<S, T> extends Stream<T> {
174 final _SinkMapper<S, T> _sinkMapper; 174 final _SinkMapper<S, T> _sinkMapper;
175 final Stream<S> _stream; 175 final Stream<S> _stream;
176 176
177 bool get isBroadcast => _stream.isBroadcast; 177 bool get isBroadcast => _stream.isBroadcast;
178 178
179 _BoundSinkStream(this._stream, this._sinkMapper); 179 _BoundSinkStream(this._stream, this._sinkMapper);
180 180
181 StreamSubscription<T> listen(void onData(T event), 181 StreamSubscription<T> listen(void onData(T event),
182 { Function onError, 182 {Function onError, void onDone(), bool cancelOnError}) {
183 void onDone(),
184 bool cancelOnError }) {
185 cancelOnError = identical(true, cancelOnError); 183 cancelOnError = identical(true, cancelOnError);
186 StreamSubscription<T> subscription = 184 StreamSubscription<T> subscription =
187 new _SinkTransformerStreamSubscription<S, T>( 185 new _SinkTransformerStreamSubscription<S, T>(
188 _stream, _sinkMapper, onData, onError, onDone, cancelOnError); 186 _stream, _sinkMapper, onData, onError, onDone, cancelOnError);
189 return subscription; 187 return subscription;
190 } 188 }
191 } 189 }
192 190
193 /// Data-handler coming from [StreamTransformer.fromHandlers]. 191 /// Data-handler coming from [StreamTransformer.fromHandlers].
194 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); 192 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
193
195 /// Error-handler coming from [StreamTransformer.fromHandlers]. 194 /// Error-handler coming from [StreamTransformer.fromHandlers].
196 typedef void _TransformErrorHandler<T>( 195 typedef void _TransformErrorHandler<T>(
197 Object error, StackTrace stackTrace, EventSink<T> sink); 196 Object error, StackTrace stackTrace, EventSink<T> sink);
197
198 /// Done-handler coming from [StreamTransformer.fromHandlers]. 198 /// Done-handler coming from [StreamTransformer.fromHandlers].
199 typedef void _TransformDoneHandler<T>(EventSink<T> sink); 199 typedef void _TransformDoneHandler<T>(EventSink<T> sink);
200 200
201 /** 201 /**
202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`. 202 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`.
203 * 203 *
204 * This way we can reuse the code from [_StreamSinkTransformer]. 204 * This way we can reuse the code from [_StreamSinkTransformer].
205 */ 205 */
206 class _HandlerEventSink<S, T> implements EventSink<S> { 206 class _HandlerEventSink<S, T> implements EventSink<S> {
207 final _TransformDataHandler<S, T> _handleData; 207 final _TransformDataHandler<S, T> _handleData;
208 final _TransformErrorHandler<T> _handleError; 208 final _TransformErrorHandler<T> _handleError;
209 final _TransformDoneHandler<T> _handleDone; 209 final _TransformDoneHandler<T> _handleDone;
210 210
211 /// The output sink where the handlers should send their data into. 211 /// The output sink where the handlers should send their data into.
212 final EventSink<T> _sink; 212 final EventSink<T> _sink;
213 213
214 _HandlerEventSink(this._handleData, this._handleError, this._handleDone, 214 _HandlerEventSink(
215 this._sink); 215 this._handleData, this._handleError, this._handleDone, this._sink);
216 216
217 void add(S data) { _handleData(data, _sink); } 217 void add(S data) {
218 _handleData(data, _sink);
219 }
220
218 void addError(Object error, [StackTrace stackTrace]) { 221 void addError(Object error, [StackTrace stackTrace]) {
219 _handleError(error, stackTrace, _sink); 222 _handleError(error, stackTrace, _sink);
220 } 223 }
221 void close() { _handleDone(_sink); } 224
225 void close() {
226 _handleDone(_sink);
227 }
222 } 228 }
223 229
224 /** 230 /**
225 * A StreamTransformer that transformers events with the given handlers. 231 * A StreamTransformer that transformers events with the given handlers.
226 * 232 *
227 * Note that this transformer can only be used once. 233 * Note that this transformer can only be used once.
228 */ 234 */
229 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> { 235 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> {
230 236 _StreamHandlerTransformer(
231 _StreamHandlerTransformer({ 237 {void handleData(S data, EventSink<T> sink),
232 void handleData(S data, EventSink<T> sink),
233 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), 238 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
234 void handleDone(EventSink<T> sink)}) 239 void handleDone(EventSink<T> sink)})
235 : super((EventSink<T> outputSink) { 240 : super((EventSink<T> outputSink) {
236 if (handleData == null) handleData = _defaultHandleData; 241 if (handleData == null) handleData = _defaultHandleData;
237 if (handleError == null) handleError = _defaultHandleError; 242 if (handleError == null) handleError = _defaultHandleError;
238 if (handleDone == null) handleDone = _defaultHandleDone; 243 if (handleDone == null) handleDone = _defaultHandleDone;
239 return new _HandlerEventSink<S, T>( 244 return new _HandlerEventSink<S, T>(
240 handleData, handleError, handleDone, outputSink); 245 handleData, handleError, handleDone, outputSink);
241 }); 246 });
242 247
243 Stream<T> bind(Stream<S> stream) { 248 Stream<T> bind(Stream<S> stream) {
244 return super.bind(stream); 249 return super.bind(stream);
245 } 250 }
246 251
247 /** Default data handler forwards all data. */ 252 /** Default data handler forwards all data. */
248 static void _defaultHandleData(var data, EventSink sink) { 253 static void _defaultHandleData(var data, EventSink sink) {
249 sink.add(data); 254 sink.add(data);
250 } 255 }
251 256
252 /** Default error handler forwards all errors. */ 257 /** Default error handler forwards all errors. */
253 static void _defaultHandleError(error, StackTrace stackTrace, 258 static void _defaultHandleError(
254 EventSink sink) { 259 error, StackTrace stackTrace, EventSink sink) {
255 sink.addError(error, stackTrace); 260 sink.addError(error, stackTrace);
256 } 261 }
257 262
258 /** Default done handler forwards done. */ 263 /** Default done handler forwards done. */
259 static void _defaultHandleDone(EventSink sink) { 264 static void _defaultHandleDone(EventSink sink) {
260 sink.close(); 265 sink.close();
261 } 266 }
262 } 267 }
263 268
264 /// A closure mapping a stream and cancelOnError to a StreamSubscription. 269 /// A closure mapping a stream and cancelOnError to a StreamSubscription.
(...skipping 28 matching lines...) Expand all
293 * the stored [_stream]. Usually the transformer starts listening at this 298 * the stored [_stream]. Usually the transformer starts listening at this
294 * moment. 299 * moment.
295 */ 300 */
296 class _BoundSubscriptionStream<S, T> extends Stream<T> { 301 class _BoundSubscriptionStream<S, T> extends Stream<T> {
297 final _SubscriptionTransformer<S, T> _transformer; 302 final _SubscriptionTransformer<S, T> _transformer;
298 final Stream<S> _stream; 303 final Stream<S> _stream;
299 304
300 _BoundSubscriptionStream(this._stream, this._transformer); 305 _BoundSubscriptionStream(this._stream, this._transformer);
301 306
302 StreamSubscription<T> listen(void onData(T event), 307 StreamSubscription<T> listen(void onData(T event),
303 { Function onError, 308 {Function onError, void onDone(), bool cancelOnError}) {
304 void onDone(),
305 bool cancelOnError }) {
306 cancelOnError = identical(true, cancelOnError); 309 cancelOnError = identical(true, cancelOnError);
307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); 310 StreamSubscription<T> result = _transformer(_stream, cancelOnError);
308 result.onData(onData); 311 result.onData(onData);
309 result.onError(onError); 312 result.onError(onError);
310 result.onDone(onDone); 313 result.onDone(onDone);
311 return result; 314 return result;
312 } 315 }
313 } 316 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698