Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(323)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Changed inheritance back! Now create StreamSink instead of EventSink where we create them. Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) {
9 if (error is AsyncError) return error; 9 if (error is AsyncError) return error;
10 if (cause == null) return new AsyncError(error, stackTrace); 10 if (cause == null) return new AsyncError(error, stackTrace);
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 StreamSubscription<T> _createSubscription(void onData(T value), 67 StreamSubscription<T> _createSubscription(void onData(T value),
68 void onError(AsyncError error), 68 void onError(AsyncError error),
69 void onDone(), 69 void onDone(),
70 bool unsubscribeOnError) { 70 bool unsubscribeOnError) {
71 return new _ForwardingStreamSubscription<S, T>( 71 return new _ForwardingStreamSubscription<S, T>(
72 this, onData, onError, onDone, unsubscribeOnError); 72 this, onData, onError, onDone, unsubscribeOnError);
73 } 73 }
74 74
75 // Override the following methods in subclasses to change the behavior. 75 // Override the following methods in subclasses to change the behavior.
76 76
77 void _handleData(S data, _StreamOutputSink<T> sink) { 77 void _handleData(S data, _EventOutputSink<T> sink) {
78 var outputData = data; 78 var outputData = data;
79 sink._sendData(outputData); 79 sink._sendData(outputData);
80 } 80 }
81 81
82 void _handleError(AsyncError error, _StreamOutputSink<T> sink) { 82 void _handleError(AsyncError error, _EventOutputSink<T> sink) {
83 sink._sendError(error); 83 sink._sendError(error);
84 } 84 }
85 85
86 void _handleDone(_StreamOutputSink<T> sink) { 86 void _handleDone(_EventOutputSink<T> sink) {
87 sink._sendDone(); 87 sink._sendDone();
88 } 88 }
89 } 89 }
90 90
91 /** 91 /**
92 * Common behavior of [StreamSubscription] classes. 92 * Common behavior of [StreamSubscription] classes.
93 * 93 *
94 * Stores and allows updating of the event handlers of a [StreamSubscription]. 94 * Stores and allows updating of the event handlers of a [StreamSubscription].
95 */ 95 */
96 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { 96 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
129 void resume(); 129 void resume();
130 130
131 void cancel(); 131 void cancel();
132 } 132 }
133 133
134 134
135 /** 135 /**
136 * Abstract superclass for subscriptions that forward to other subscriptions. 136 * Abstract superclass for subscriptions that forward to other subscriptions.
137 */ 137 */
138 class _ForwardingStreamSubscription<S, T> 138 class _ForwardingStreamSubscription<S, T>
139 extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> { 139 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
140 final _ForwardingStream<S, T> _stream; 140 final _ForwardingStream<S, T> _stream;
141 final bool _unsubscribeOnError; 141 final bool _unsubscribeOnError;
142 142
143 StreamSubscription<S> _subscription; 143 StreamSubscription<S> _subscription;
144 144
145 _ForwardingStreamSubscription(this._stream, 145 _ForwardingStreamSubscription(this._stream,
146 void onData(T data), 146 void onData(T data),
147 void onError(AsyncError error), 147 void onError(AsyncError error),
148 void onDone(), 148 void onDone(),
149 this._unsubscribeOnError) 149 this._unsubscribeOnError)
(...skipping 22 matching lines...) Expand all
172 } 172 }
173 173
174 void cancel() { 174 void cancel() {
175 if (_subscription == null) { 175 if (_subscription == null) {
176 throw new StateError("Subscription has been unsubscribed"); 176 throw new StateError("Subscription has been unsubscribed");
177 } 177 }
178 _subscription.cancel(); 178 _subscription.cancel();
179 _subscription = null; 179 _subscription = null;
180 } 180 }
181 181
182 // _StreamOutputSink interface. Sends data to this subscription. 182 // _EventOutputSink interface. Sends data to this subscription.
183 183
184 void _sendData(T data) { 184 void _sendData(T data) {
185 _onData(data); 185 _onData(data);
186 } 186 }
187 187
188 void _sendError(AsyncError error) { 188 void _sendError(AsyncError error) {
189 _onError(error); 189 _onError(error);
190 if (_unsubscribeOnError) { 190 if (_unsubscribeOnError) {
191 _subscription.cancel(); 191 _subscription.cancel();
192 _subscription = null; 192 _subscription = null;
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
226 // ------------------------------------------------------------------- 226 // -------------------------------------------------------------------
227 227
228 typedef bool _Predicate<T>(T value); 228 typedef bool _Predicate<T>(T value);
229 229
230 class _WhereStream<T> extends _ForwardingStream<T, T> { 230 class _WhereStream<T> extends _ForwardingStream<T, T> {
231 final _Predicate<T> _test; 231 final _Predicate<T> _test;
232 232
233 _WhereStream(Stream<T> source, bool test(T value)) 233 _WhereStream(Stream<T> source, bool test(T value))
234 : _test = test, super(source); 234 : _test = test, super(source);
235 235
236 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { 236 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
237 bool satisfies; 237 bool satisfies;
238 try { 238 try {
239 satisfies = _test(inputEvent); 239 satisfies = _test(inputEvent);
240 } catch (e, s) { 240 } catch (e, s) {
241 sink._sendError(_asyncError(e, s)); 241 sink._sendError(_asyncError(e, s));
242 return; 242 return;
243 } 243 }
244 if (satisfies) { 244 if (satisfies) {
245 sink._sendData(inputEvent); 245 sink._sendData(inputEvent);
246 } 246 }
247 } 247 }
248 } 248 }
249 249
250 250
251 typedef T _Transformation<S, T>(S value); 251 typedef T _Transformation<S, T>(S value);
252 252
253 /** 253 /**
254 * A stream pipe that converts data events before passing them on. 254 * A stream pipe that converts data events before passing them on.
255 */ 255 */
256 class _MapStream<S, T> extends _ForwardingStream<S, T> { 256 class _MapStream<S, T> extends _ForwardingStream<S, T> {
257 final _Transformation _transform; 257 final _Transformation _transform;
258 258
259 _MapStream(Stream<S> source, T transform(S event)) 259 _MapStream(Stream<S> source, T transform(S event))
260 : this._transform = transform, super(source); 260 : this._transform = transform, super(source);
261 261
262 void _handleData(S inputEvent, _StreamOutputSink<T> sink) { 262 void _handleData(S inputEvent, _EventOutputSink<T> sink) {
263 T outputEvent; 263 T outputEvent;
264 try { 264 try {
265 outputEvent = _transform(inputEvent); 265 outputEvent = _transform(inputEvent);
266 } catch (e, s) { 266 } catch (e, s) {
267 sink._sendError(_asyncError(e, s)); 267 sink._sendError(_asyncError(e, s));
268 return; 268 return;
269 } 269 }
270 sink._sendData(outputEvent); 270 sink._sendData(outputEvent);
271 } 271 }
272 } 272 }
273 273
274 /** 274 /**
275 * A stream pipe that converts data events before passing them on. 275 * A stream pipe that converts data events before passing them on.
276 */ 276 */
277 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 277 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
278 final _Transformation<S, Iterable<T>> _expand; 278 final _Transformation<S, Iterable<T>> _expand;
279 279
280 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 280 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
281 : this._expand = expand, super(source); 281 : this._expand = expand, super(source);
282 282
283 void _handleData(S inputEvent, _StreamOutputSink<T> sink) { 283 void _handleData(S inputEvent, _EventOutputSink<T> sink) {
284 try { 284 try {
285 for (T value in _expand(inputEvent)) { 285 for (T value in _expand(inputEvent)) {
286 sink._sendData(value); 286 sink._sendData(value);
287 } 287 }
288 } catch (e, s) { 288 } catch (e, s) {
289 // If either _expand or iterating the generated iterator throws, 289 // If either _expand or iterating the generated iterator throws,
290 // we abort the iteration. 290 // we abort the iteration.
291 sink._sendError(_asyncError(e, s)); 291 sink._sendError(_asyncError(e, s));
292 } 292 }
293 } 293 }
294 } 294 }
295 295
296 296
297 typedef void _ErrorTransformation(AsyncError error); 297 typedef void _ErrorTransformation(AsyncError error);
298 typedef bool _ErrorTest(error); 298 typedef bool _ErrorTest(error);
299 299
300 /** 300 /**
301 * A stream pipe that converts or disposes error events 301 * A stream pipe that converts or disposes error events
302 * before passing them on. 302 * before passing them on.
303 */ 303 */
304 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 304 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
305 final _ErrorTransformation _transform; 305 final _ErrorTransformation _transform;
306 final _ErrorTest _test; 306 final _ErrorTest _test;
307 307
308 _HandleErrorStream(Stream<T> source, 308 _HandleErrorStream(Stream<T> source,
309 void transform(AsyncError event), 309 void transform(AsyncError event),
310 bool test(error)) 310 bool test(error))
311 : this._transform = transform, this._test = test, super(source); 311 : this._transform = transform, this._test = test, super(source);
312 312
313 void _handleError(AsyncError error, _StreamOutputSink<T> sink) { 313 void _handleError(AsyncError error, _EventOutputSink<T> sink) {
314 bool matches = true; 314 bool matches = true;
315 if (_test != null) { 315 if (_test != null) {
316 try { 316 try {
317 matches = _test(error.error); 317 matches = _test(error.error);
318 } catch (e, s) { 318 } catch (e, s) {
319 sink._sendError(_asyncError(e, s, error)); 319 sink._sendError(_asyncError(e, s, error));
320 return; 320 return;
321 } 321 }
322 } 322 }
323 if (matches) { 323 if (matches) {
(...skipping 13 matching lines...) Expand all
337 class _TakeStream<T> extends _ForwardingStream<T, T> { 337 class _TakeStream<T> extends _ForwardingStream<T, T> {
338 int _remaining; 338 int _remaining;
339 339
340 _TakeStream(Stream<T> source, int count) 340 _TakeStream(Stream<T> source, int count)
341 : this._remaining = count, super(source) { 341 : this._remaining = count, super(source) {
342 // This test is done early to avoid handling an async error 342 // This test is done early to avoid handling an async error
343 // in the _handleData method. 343 // in the _handleData method.
344 if (count is! int) throw new ArgumentError(count); 344 if (count is! int) throw new ArgumentError(count);
345 } 345 }
346 346
347 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { 347 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
348 if (_remaining > 0) { 348 if (_remaining > 0) {
349 sink._sendData(inputEvent); 349 sink._sendData(inputEvent);
350 _remaining -= 1; 350 _remaining -= 1;
351 if (_remaining == 0) { 351 if (_remaining == 0) {
352 // Closing also unsubscribes all subscribers, which unsubscribes 352 // Closing also unsubscribes all subscribers, which unsubscribes
353 // this from source. 353 // this from source.
354 sink._sendDone(); 354 sink._sendDone();
355 } 355 }
356 } 356 }
357 } 357 }
358 } 358 }
359 359
360 360
361 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { 361 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
362 final _Predicate<T> _test; 362 final _Predicate<T> _test;
363 363
364 _TakeWhileStream(Stream<T> source, bool test(T value)) 364 _TakeWhileStream(Stream<T> source, bool test(T value))
365 : this._test = test, super(source); 365 : this._test = test, super(source);
366 366
367 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { 367 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
368 bool satisfies; 368 bool satisfies;
369 try { 369 try {
370 satisfies = _test(inputEvent); 370 satisfies = _test(inputEvent);
371 } catch (e, s) { 371 } catch (e, s) {
372 sink._sendError(_asyncError(e, s)); 372 sink._sendError(_asyncError(e, s));
373 // The test didn't say true. Didn't say false either, but we stop anyway. 373 // The test didn't say true. Didn't say false either, but we stop anyway.
374 sink._sendDone(); 374 sink._sendDone();
375 return; 375 return;
376 } 376 }
377 if (satisfies) { 377 if (satisfies) {
378 sink._sendData(inputEvent); 378 sink._sendData(inputEvent);
379 } else { 379 } else {
380 sink._sendDone(); 380 sink._sendDone();
381 } 381 }
382 } 382 }
383 } 383 }
384 384
385 class _SkipStream<T> extends _ForwardingStream<T, T> { 385 class _SkipStream<T> extends _ForwardingStream<T, T> {
386 int _remaining; 386 int _remaining;
387 387
388 _SkipStream(Stream<T> source, int count) 388 _SkipStream(Stream<T> source, int count)
389 : this._remaining = count, super(source) { 389 : this._remaining = count, super(source) {
390 // This test is done early to avoid handling an async error 390 // This test is done early to avoid handling an async error
391 // in the _handleData method. 391 // in the _handleData method.
392 if (count is! int || count < 0) throw new ArgumentError(count); 392 if (count is! int || count < 0) throw new ArgumentError(count);
393 } 393 }
394 394
395 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { 395 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
396 if (_remaining > 0) { 396 if (_remaining > 0) {
397 _remaining--; 397 _remaining--;
398 return; 398 return;
399 } 399 }
400 return sink._sendData(inputEvent); 400 return sink._sendData(inputEvent);
401 } 401 }
402 } 402 }
403 403
404 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { 404 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
405 final _Predicate<T> _test; 405 final _Predicate<T> _test;
406 bool _hasFailed = false; 406 bool _hasFailed = false;
407 407
408 _SkipWhileStream(Stream<T> source, bool test(T value)) 408 _SkipWhileStream(Stream<T> source, bool test(T value))
409 : this._test = test, super(source); 409 : this._test = test, super(source);
410 410
411 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { 411 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
412 if (_hasFailed) { 412 if (_hasFailed) {
413 sink._sendData(inputEvent); 413 sink._sendData(inputEvent);
414 } 414 }
415 bool satisfies; 415 bool satisfies;
416 try { 416 try {
417 satisfies = _test(inputEvent); 417 satisfies = _test(inputEvent);
418 } catch (e, s) { 418 } catch (e, s) {
419 sink._sendError(_asyncError(e, s)); 419 sink._sendError(_asyncError(e, s));
420 // A failure to return a boolean is considered "not matching". 420 // A failure to return a boolean is considered "not matching".
421 _hasFailed = true; 421 _hasFailed = true;
(...skipping 10 matching lines...) Expand all
432 432
433 class _DistinctStream<T> extends _ForwardingStream<T, T> { 433 class _DistinctStream<T> extends _ForwardingStream<T, T> {
434 static var _SENTINEL = new Object(); 434 static var _SENTINEL = new Object();
435 435
436 _Equality<T> _equals; 436 _Equality<T> _equals;
437 var _previous = _SENTINEL; 437 var _previous = _SENTINEL;
438 438
439 _DistinctStream(Stream<T> source, bool equals(T a, T b)) 439 _DistinctStream(Stream<T> source, bool equals(T a, T b))
440 : _equals = equals, super(source); 440 : _equals = equals, super(source);
441 441
442 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { 442 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
443 if (identical(_previous, _SENTINEL)) { 443 if (identical(_previous, _SENTINEL)) {
444 _previous = inputEvent; 444 _previous = inputEvent;
445 return sink._sendData(inputEvent); 445 return sink._sendData(inputEvent);
446 } else { 446 } else {
447 bool isEqual; 447 bool isEqual;
448 try { 448 try {
449 if (_equals == null) { 449 if (_equals == null) {
450 isEqual = (_previous == inputEvent); 450 isEqual = (_previous == inputEvent);
451 } else { 451 } else {
452 isEqual = _equals(_previous, inputEvent); 452 isEqual = _equals(_previous, inputEvent);
453 } 453 }
454 } catch (e, s) { 454 } catch (e, s) {
455 sink._sendError(_asyncError(e, s)); 455 sink._sendError(_asyncError(e, s));
456 return null; 456 return null;
457 } 457 }
458 if (!isEqual) { 458 if (!isEqual) {
459 sink._sendData(inputEvent); 459 sink._sendData(inputEvent);
460 _previous = inputEvent; 460 _previous = inputEvent;
461 } 461 }
462 } 462 }
463 } 463 }
464 } 464 }
465 465
466 // Stream transformations and event transformations. 466 // Stream transformations and event transformations.
467 467
468 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); 468 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
469 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); 469 typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink);
470 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); 470 typedef void _TransformDoneHandler<T>(EventSink<T> sink);
471 471
472 /** Default data handler forwards all data. */ 472 /** Default data handler forwards all data. */
473 void _defaultHandleData(var data, StreamSink sink) { 473 void _defaultHandleData(var data, EventSink sink) {
474 sink.add(data); 474 sink.add(data);
475 } 475 }
476 476
477 /** Default error handler forwards all errors. */ 477 /** Default error handler forwards all errors. */
478 void _defaultHandleError(AsyncError error, StreamSink sink) { 478 void _defaultHandleError(AsyncError error, EventSink sink) {
479 sink.signalError(error); 479 sink.addError(error);
480 } 480 }
481 481
482 /** Default done handler forwards done. */ 482 /** Default done handler forwards done. */
483 void _defaultHandleDone(StreamSink sink) { 483 void _defaultHandleDone(EventSink sink) {
484 sink.close(); 484 sink.close();
485 } 485 }
486 486
487 487
488 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
489 class _StreamImplSink<T> implements StreamSink<T> {
490 _StreamImpl<T> _target;
491 _StreamImplSink(this._target);
492 void add(T data) { _target._add(data); }
493 void signalError(AsyncError error) { _target._signalError(error); }
494 void close() { _target._close(); }
495 }
496
497 /** 488 /**
498 * A [StreamTransformer] that modifies stream events. 489 * A [StreamTransformer] that modifies stream events.
499 * 490 *
500 * This class is used by [StreamTransformer]'s factory constructor. 491 * This class is used by [StreamTransformer]'s factory constructor.
501 * It is actually an [StreamEventTransformer] where the functions used to 492 * It is actually an [StreamEventTransformer] where the functions used to
502 * modify the events are passed as constructor arguments. 493 * modify the events are passed as constructor arguments.
503 * 494 *
504 * If an argument is omitted, it acts as the default method from 495 * If an argument is omitted, it acts as the default method from
505 * [StreamEventTransformer]. 496 * [StreamEventTransformer].
506 */ 497 */
507 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { 498 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
508 // TODO(ahe): Restore type when feature is implemented in dart2js 499 // TODO(ahe): Restore type when feature is implemented in dart2js
509 // checked mode. http://dartbug.com/7733 500 // checked mode. http://dartbug.com/7733
510 final Function /*_TransformDataHandler<S, T>*/ _handleData; 501 final Function /*_TransformDataHandler<S, T>*/ _handleData;
511 final _TransformErrorHandler<T> _handleError; 502 final _TransformErrorHandler<T> _handleError;
512 final _TransformDoneHandler<T> _handleDone; 503 final _TransformDoneHandler<T> _handleDone;
513 504
514 _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink), 505 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink),
515 void handleError(AsyncError data, StreamSink<T> sink), 506 void handleError(AsyncError data, EventSink<T> sink),
516 void handleDone(StreamSink<T> sink)) 507 void handleDone(EventSink<T> sink))
517 : this._handleData = (handleData == null ? _defaultHandleData 508 : this._handleData = (handleData == null ? _defaultHandleData
518 : handleData), 509 : handleData),
519 this._handleError = (handleError == null ? _defaultHandleError 510 this._handleError = (handleError == null ? _defaultHandleError
520 : handleError), 511 : handleError),
521 this._handleDone = (handleDone == null ? _defaultHandleDone 512 this._handleDone = (handleDone == null ? _defaultHandleDone
522 : handleDone); 513 : handleDone);
523 514
524 void handleData(S data, StreamSink<T> sink) { 515 void handleData(S data, EventSink<T> sink) {
525 _handleData(data, sink); 516 _handleData(data, sink);
526 } 517 }
527 518
528 void handleError(AsyncError error, StreamSink<T> sink) { 519 void handleError(AsyncError error, EventSink<T> sink) {
529 _handleError(error, sink); 520 _handleError(error, sink);
530 } 521 }
531 522
532 void handleDone(StreamSink<T> sink) { 523 void handleDone(EventSink<T> sink) {
533 _handleDone(sink); 524 _handleDone(sink);
534 } 525 }
535 } 526 }
536 527
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698