Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(463)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** 7 /**
8 * Utility function to attach a stack trace to an [error] if it doesn't have 8 * Utility function to attach a stack trace to an [error] if it doesn't have
9 * one already. 9 * one already.
10 */ 10 */
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 StreamSubscription<T> _createSubscription(void onData(T value), 62 StreamSubscription<T> _createSubscription(void onData(T value),
63 void onError(error), 63 void onError(error),
64 void onDone(), 64 void onDone(),
65 bool cancelOnError) { 65 bool cancelOnError) {
66 return new _ForwardingStreamSubscription<S, T>( 66 return new _ForwardingStreamSubscription<S, T>(
67 this, onData, onError, onDone, cancelOnError); 67 this, onData, onError, onDone, cancelOnError);
68 } 68 }
69 69
70 // Override the following methods in subclasses to change the behavior. 70 // Override the following methods in subclasses to change the behavior.
71 71
72 void _handleData(S data, _EventOutputSink<T> sink) { 72 void _handleData(S data, _EventSink<T> sink) {
73 var outputData = data; 73 var outputData = data;
74 sink._sendData(outputData); 74 sink._add(outputData);
75 } 75 }
76 76
77 void _handleError(error, _EventOutputSink<T> sink) { 77 void _handleError(error, _EventSink<T> sink) {
78 sink._sendError(error); 78 sink._addError(error);
79 } 79 }
80 80
81 void _handleDone(_EventOutputSink<T> sink) { 81 void _handleDone(_EventSink<T> sink) {
82 sink._sendDone(); 82 sink._close();
83 } 83 }
84 } 84 }
85 85
86 /** 86 /**
87 * Common behavior of [StreamSubscription] classes.
88 *
89 * Stores and allows updating of the event handlers of a [StreamSubscription].
90 */
91 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
92 // TODO(ahe): Restore type when feature is implemented in dart2js
93 // checked mode. http://dartbug.com/7733
94 var /* _DataHandler<T> */ _onData;
95 _ErrorHandler _onError;
96 _DoneHandler _onDone;
97
98 _BaseStreamSubscription(this._onData,
99 this._onError,
100 this._onDone) {
101 if (_onData == null) _onData = _nullDataHandler;
102 if (_onError == null) _onError = _nullErrorHandler;
103 if (_onDone == null) _onDone = _nullDoneHandler;
104 }
105
106 // StreamSubscription interface.
107 void onData(void handleData(T event)) {
108 if (handleData == null) handleData = _nullDataHandler;
109 _onData = handleData;
110 }
111
112 void onError(void handleError(error)) {
113 if (handleError == null) handleError = _nullErrorHandler;
114 _onError = handleError;
115 }
116
117 void onDone(void handleDone()) {
118 if (handleDone == null) handleDone = _nullDoneHandler;
119 _onDone = handleDone;
120 }
121
122 void pause([Future resumeSignal]);
123
124 void resume();
125
126 void cancel();
127
128 Future asFuture([var futureValue]) {
129 _FutureImpl<T> result = new _FutureImpl<T>();
130
131 // Overwrite the onDone and onError handlers.
132 onDone(() { result._setValue(futureValue); });
133 onError((error) {
134 cancel();
135 result._setError(error);
136 });
137
138 return result;
139 }
140 }
141
142
143 /**
144 * Abstract superclass for subscriptions that forward to other subscriptions. 87 * Abstract superclass for subscriptions that forward to other subscriptions.
145 */ 88 */
146 class _ForwardingStreamSubscription<S, T> 89 class _ForwardingStreamSubscription<S, T>
147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { 90 extends _BufferingStreamSubscription<T> {
148 final _ForwardingStream<S, T> _stream; 91 final _ForwardingStream<S, T> _stream;
149 final bool _cancelOnError;
150 92
151 StreamSubscription<S> _subscription; 93 StreamSubscription<S> _subscription;
152 94
153 _ForwardingStreamSubscription(this._stream, 95 _ForwardingStreamSubscription(this._stream,
154 void onData(T data), 96 void onData(T data),
155 void onError(error), 97 void onError(error),
156 void onDone(), 98 void onDone(),
157 this._cancelOnError) 99 bool cancelOnError)
158 : super(onData, onError, onDone) { 100 : super(onData, onError, onDone, cancelOnError, null) {
159 // Don't unsubscribe on incoming error, only if we send an error forwards.
160 _subscription = 101 _subscription =
161 _stream._source.listen(_handleData, 102 _stream._source.listen(_handleData,
162 onError: _handleError, 103 onError: _handleError,
163 onDone: _handleDone); 104 onDone: _handleDone);
164 } 105 }
165 106
166 // StreamSubscription interface. 107 // _StreamSink interface.
108 // Transformers sending more than one event have no way to know if the stream
109 // is cancelled or closed after the first, so we just ignore remaining events.
floitsch 2013/05/22 16:26:29 canceled.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
167 110
168 void pause([Future resumeSignal]) { 111 void _add(T data) {
169 if (_subscription == null) return; 112 if (_isClosed) return;
170 _subscription.pause(resumeSignal); 113 super._add(data);
171 } 114 }
172 115
173 void resume() { 116 void _addError(Object error) {
117 if (_isClosed) return;
118 super._addError(error);
119 }
120
121 // StreamSubscription callbacks.
122
123 void _onPause() {
124 if (_subscription == null) return;
125 _subscription.pause();
126 }
127
128 void _onResume() {
174 if (_subscription == null) return; 129 if (_subscription == null) return;
175 _subscription.resume(); 130 _subscription.resume();
176 } 131 }
177 132
178 bool get isPaused { 133 void _onCancel() {
179 if (_subscription == null) return false;
180 return _subscription.isPaused;
181 }
182
183 void cancel() {
184 if (_subscription != null) { 134 if (_subscription != null) {
185 _subscription.cancel(); 135 StreamSubscription subscription = _subscription;
186 _subscription = null; 136 _subscription = null;
137 subscription.cancel();
187 } 138 }
188 } 139 }
189 140
190 // _EventOutputSink interface. Sends data to this subscription.
191
192 void _sendData(T data) {
193 _onData(data);
194 }
195
196 void _sendError(error) {
197 _onError(error);
198 if (_cancelOnError) {
199 _subscription.cancel();
200 _subscription = null;
201 }
202 }
203
204 void _sendDone() {
205 // If the transformation sends a done signal, we stop the subscription.
206 if (_subscription != null) {
207 _subscription.cancel();
208 _subscription = null;
209 }
210 _onDone();
211 }
212
213 // Methods used as listener on source subscription. 141 // Methods used as listener on source subscription.
214 142
215 // TODO(ahe): Restore type when feature is implemented in dart2js 143 // TODO(ahe): Restore type when feature is implemented in dart2js
216 // checked mode. http://dartbug.com/7733 144 // checked mode. http://dartbug.com/7733
217 void _handleData(/*S*/ data) { 145 void _handleData(/*S*/ data) {
218 _stream._handleData(data, this); 146 _stream._handleData(data, this);
219 } 147 }
220 148
221 void _handleError(error) { 149 void _handleError(error) {
222 _stream._handleError(error, this); 150 _stream._handleError(error, this);
(...skipping 11 matching lines...) Expand all
234 // ------------------------------------------------------------------- 162 // -------------------------------------------------------------------
235 163
236 typedef bool _Predicate<T>(T value); 164 typedef bool _Predicate<T>(T value);
237 165
238 class _WhereStream<T> extends _ForwardingStream<T, T> { 166 class _WhereStream<T> extends _ForwardingStream<T, T> {
239 final _Predicate<T> _test; 167 final _Predicate<T> _test;
240 168
241 _WhereStream(Stream<T> source, bool test(T value)) 169 _WhereStream(Stream<T> source, bool test(T value))
242 : _test = test, super(source); 170 : _test = test, super(source);
243 171
244 void _handleData(T inputEvent, _EventOutputSink<T> sink) { 172 void _handleData(T inputEvent, _EventSink<T> sink) {
245 bool satisfies; 173 bool satisfies;
246 try { 174 try {
247 satisfies = _test(inputEvent); 175 satisfies = _test(inputEvent);
248 } catch (e, s) { 176 } catch (e, s) {
249 sink._sendError(_asyncError(e, s)); 177 sink._addError(_asyncError(e, s));
250 return; 178 return;
251 } 179 }
252 if (satisfies) { 180 if (satisfies) {
253 sink._sendData(inputEvent); 181 sink._add(inputEvent);
254 } 182 }
255 } 183 }
256 } 184 }
257 185
258 186
259 typedef T _Transformation<S, T>(S value); 187 typedef T _Transformation<S, T>(S value);
260 188
261 /** 189 /**
262 * A stream pipe that converts data events before passing them on. 190 * A stream pipe that converts data events before passing them on.
263 */ 191 */
264 class _MapStream<S, T> extends _ForwardingStream<S, T> { 192 class _MapStream<S, T> extends _ForwardingStream<S, T> {
265 final _Transformation _transform; 193 final _Transformation _transform;
266 194
267 _MapStream(Stream<S> source, T transform(S event)) 195 _MapStream(Stream<S> source, T transform(S event))
268 : this._transform = transform, super(source); 196 : this._transform = transform, super(source);
269 197
270 void _handleData(S inputEvent, _EventOutputSink<T> sink) { 198 void _handleData(S inputEvent, _EventSink<T> sink) {
271 T outputEvent; 199 T outputEvent;
272 try { 200 try {
273 outputEvent = _transform(inputEvent); 201 outputEvent = _transform(inputEvent);
274 } catch (e, s) { 202 } catch (e, s) {
275 sink._sendError(_asyncError(e, s)); 203 sink._addError(_asyncError(e, s));
276 return; 204 return;
277 } 205 }
278 sink._sendData(outputEvent); 206 sink._add(outputEvent);
279 } 207 }
280 } 208 }
281 209
282 /** 210 /**
283 * A stream pipe that converts data events before passing them on. 211 * A stream pipe that converts data events before passing them on.
284 */ 212 */
285 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 213 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
286 final _Transformation<S, Iterable<T>> _expand; 214 final _Transformation<S, Iterable<T>> _expand;
287 215
288 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 216 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
289 : this._expand = expand, super(source); 217 : this._expand = expand, super(source);
290 218
291 void _handleData(S inputEvent, _EventOutputSink<T> sink) { 219 void _handleData(S inputEvent, _EventSink<T> sink) {
292 try { 220 try {
293 for (T value in _expand(inputEvent)) { 221 for (T value in _expand(inputEvent)) {
294 sink._sendData(value); 222 sink._add(value);
295 } 223 }
296 } catch (e, s) { 224 } catch (e, s) {
297 // If either _expand or iterating the generated iterator throws, 225 // If either _expand or iterating the generated iterator throws,
298 // we abort the iteration. 226 // we abort the iteration.
299 sink._sendError(_asyncError(e, s)); 227 sink._addError(_asyncError(e, s));
300 } 228 }
301 } 229 }
302 } 230 }
303 231
304 232
305 typedef void _ErrorTransformation(error); 233 typedef void _ErrorTransformation(error);
306 typedef bool _ErrorTest(error); 234 typedef bool _ErrorTest(error);
307 235
308 /** 236 /**
309 * A stream pipe that converts or disposes error events 237 * A stream pipe that converts or disposes error events
310 * before passing them on. 238 * before passing them on.
311 */ 239 */
312 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 240 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
313 final _ErrorTransformation _transform; 241 final _ErrorTransformation _transform;
314 final _ErrorTest _test; 242 final _ErrorTest _test;
315 243
316 _HandleErrorStream(Stream<T> source, 244 _HandleErrorStream(Stream<T> source,
317 void transform(event), 245 void transform(event),
318 bool test(error)) 246 bool test(error))
319 : this._transform = transform, this._test = test, super(source); 247 : this._transform = transform, this._test = test, super(source);
320 248
321 void _handleError(Object error, _EventOutputSink<T> sink) { 249 void _handleError(Object error, _EventSink<T> sink) {
322 bool matches = true; 250 bool matches = true;
323 if (_test != null) { 251 if (_test != null) {
324 try { 252 try {
325 matches = _test(error); 253 matches = _test(error);
326 } catch (e, s) { 254 } catch (e, s) {
327 sink._sendError(_asyncError(e, s)); 255 sink._addError(_asyncError(e, s));
328 return; 256 return;
329 } 257 }
330 } 258 }
331 if (matches) { 259 if (matches) {
332 try { 260 try {
333 _transform(error); 261 _transform(error);
334 } catch (e, s) { 262 } catch (e, s) {
335 sink._sendError(_asyncError(e, s)); 263 sink._addError(_asyncError(e, s));
336 return; 264 return;
337 } 265 }
338 } else { 266 } else {
339 sink._sendError(error); 267 sink._addError(error);
340 } 268 }
341 } 269 }
342 } 270 }
343 271
344 272
345 class _TakeStream<T> extends _ForwardingStream<T, T> { 273 class _TakeStream<T> extends _ForwardingStream<T, T> {
346 int _remaining; 274 int _remaining;
347 275
348 _TakeStream(Stream<T> source, int count) 276 _TakeStream(Stream<T> source, int count)
349 : this._remaining = count, super(source) { 277 : this._remaining = count, super(source) {
350 // This test is done early to avoid handling an async error 278 // This test is done early to avoid handling an async error
351 // in the _handleData method. 279 // in the _handleData method.
352 if (count is! int) throw new ArgumentError(count); 280 if (count is! int) throw new ArgumentError(count);
353 } 281 }
354 282
355 void _handleData(T inputEvent, _EventOutputSink<T> sink) { 283 void _handleData(T inputEvent, _EventSink<T> sink) {
356 if (_remaining > 0) { 284 if (_remaining > 0) {
357 sink._sendData(inputEvent); 285 sink._add(inputEvent);
358 _remaining -= 1; 286 _remaining -= 1;
359 if (_remaining == 0) { 287 if (_remaining == 0) {
360 // Closing also unsubscribes all subscribers, which unsubscribes 288 // Closing also unsubscribes all subscribers, which unsubscribes
361 // this from source. 289 // this from source.
362 sink._sendDone(); 290 sink._close();
363 } 291 }
364 } 292 }
365 } 293 }
366 } 294 }
367 295
368 296
369 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { 297 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
370 final _Predicate<T> _test; 298 final _Predicate<T> _test;
371 299
372 _TakeWhileStream(Stream<T> source, bool test(T value)) 300 _TakeWhileStream(Stream<T> source, bool test(T value))
373 : this._test = test, super(source); 301 : this._test = test, super(source);
374 302
375 void _handleData(T inputEvent, _EventOutputSink<T> sink) { 303 void _handleData(T inputEvent, _EventSink<T> sink) {
376 bool satisfies; 304 bool satisfies;
377 try { 305 try {
378 satisfies = _test(inputEvent); 306 satisfies = _test(inputEvent);
379 } catch (e, s) { 307 } catch (e, s) {
380 sink._sendError(_asyncError(e, s)); 308 sink._addError(_asyncError(e, s));
381 // The test didn't say true. Didn't say false either, but we stop anyway. 309 // The test didn't say true. Didn't say false either, but we stop anyway.
382 sink._sendDone(); 310 sink._close();
383 return; 311 return;
384 } 312 }
385 if (satisfies) { 313 if (satisfies) {
386 sink._sendData(inputEvent); 314 sink._add(inputEvent);
387 } else { 315 } else {
388 sink._sendDone(); 316 sink._close();
389 } 317 }
390 } 318 }
391 } 319 }
392 320
393 class _SkipStream<T> extends _ForwardingStream<T, T> { 321 class _SkipStream<T> extends _ForwardingStream<T, T> {
394 int _remaining; 322 int _remaining;
395 323
396 _SkipStream(Stream<T> source, int count) 324 _SkipStream(Stream<T> source, int count)
397 : this._remaining = count, super(source) { 325 : this._remaining = count, super(source) {
398 // This test is done early to avoid handling an async error 326 // This test is done early to avoid handling an async error
399 // in the _handleData method. 327 // in the _handleData method.
400 if (count is! int || count < 0) throw new ArgumentError(count); 328 if (count is! int || count < 0) throw new ArgumentError(count);
401 } 329 }
402 330
403 void _handleData(T inputEvent, _EventOutputSink<T> sink) { 331 void _handleData(T inputEvent, _EventSink<T> sink) {
404 if (_remaining > 0) { 332 if (_remaining > 0) {
405 _remaining--; 333 _remaining--;
406 return; 334 return;
407 } 335 }
408 return sink._sendData(inputEvent); 336 return sink._add(inputEvent);
409 } 337 }
410 } 338 }
411 339
412 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { 340 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
413 final _Predicate<T> _test; 341 final _Predicate<T> _test;
414 bool _hasFailed = false; 342 bool _hasFailed = false;
415 343
416 _SkipWhileStream(Stream<T> source, bool test(T value)) 344 _SkipWhileStream(Stream<T> source, bool test(T value))
417 : this._test = test, super(source); 345 : this._test = test, super(source);
418 346
419 void _handleData(T inputEvent, _EventOutputSink<T> sink) { 347 void _handleData(T inputEvent, _EventSink<T> sink) {
420 if (_hasFailed) { 348 if (_hasFailed) {
421 sink._sendData(inputEvent); 349 sink._add(inputEvent);
422 return; 350 return;
423 } 351 }
424 bool satisfies; 352 bool satisfies;
425 try { 353 try {
426 satisfies = _test(inputEvent); 354 satisfies = _test(inputEvent);
427 } catch (e, s) { 355 } catch (e, s) {
428 sink._sendError(_asyncError(e, s)); 356 sink._addError(_asyncError(e, s));
429 // A failure to return a boolean is considered "not matching". 357 // A failure to return a boolean is considered "not matching".
430 _hasFailed = true; 358 _hasFailed = true;
431 return; 359 return;
432 } 360 }
433 if (!satisfies) { 361 if (!satisfies) {
434 _hasFailed = true; 362 _hasFailed = true;
435 sink._sendData(inputEvent); 363 sink._add(inputEvent);
436 } 364 }
437 } 365 }
438 } 366 }
439 367
440 typedef bool _Equality<T>(T a, T b); 368 typedef bool _Equality<T>(T a, T b);
441 369
442 class _DistinctStream<T> extends _ForwardingStream<T, T> { 370 class _DistinctStream<T> extends _ForwardingStream<T, T> {
443 static var _SENTINEL = new Object(); 371 static var _SENTINEL = new Object();
444 372
445 _Equality<T> _equals; 373 _Equality<T> _equals;
446 var _previous = _SENTINEL; 374 var _previous = _SENTINEL;
447 375
448 _DistinctStream(Stream<T> source, bool equals(T a, T b)) 376 _DistinctStream(Stream<T> source, bool equals(T a, T b))
449 : _equals = equals, super(source); 377 : _equals = equals, super(source);
450 378
451 void _handleData(T inputEvent, _EventOutputSink<T> sink) { 379 void _handleData(T inputEvent, _EventSink<T> sink) {
452 if (identical(_previous, _SENTINEL)) { 380 if (identical(_previous, _SENTINEL)) {
453 _previous = inputEvent; 381 _previous = inputEvent;
454 return sink._sendData(inputEvent); 382 return sink._add(inputEvent);
455 } else { 383 } else {
456 bool isEqual; 384 bool isEqual;
457 try { 385 try {
458 if (_equals == null) { 386 if (_equals == null) {
459 isEqual = (_previous == inputEvent); 387 isEqual = (_previous == inputEvent);
460 } else { 388 } else {
461 isEqual = _equals(_previous, inputEvent); 389 isEqual = _equals(_previous, inputEvent);
462 } 390 }
463 } catch (e, s) { 391 } catch (e, s) {
464 sink._sendError(_asyncError(e, s)); 392 sink._addError(_asyncError(e, s));
465 return null; 393 return null;
466 } 394 }
467 if (!isEqual) { 395 if (!isEqual) {
468 sink._sendData(inputEvent); 396 sink._add(inputEvent);
469 _previous = inputEvent; 397 _previous = inputEvent;
470 } 398 }
471 } 399 }
472 } 400 }
473 } 401 }
474 402
475 // Stream transformations and event transformations. 403 // Stream transformations and event transformations.
476 404
477 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); 405 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
478 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); 406 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink);
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
527 455
528 void handleError(error, EventSink<T> sink) { 456 void handleError(error, EventSink<T> sink) {
529 _handleError(error, sink); 457 _handleError(error, sink);
530 } 458 }
531 459
532 void handleDone(EventSink<T> sink) { 460 void handleDone(EventSink<T> sink) {
533 _handleDone(sink); 461 _handleDone(sink);
534 } 462 }
535 } 463 }
536 464
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698