Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(274)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 15989006: Revert until Windows crash is debugged. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** 7 /**
8 * Utility function to attach a stack trace to an [error] if it doesn't have 8 * Utility function to attach a stack trace to an [error] if it doesn't have
9 * one already. 9 * one already.
10 */ 10 */
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 StreamSubscription<T> _createSubscription(void onData(T value), 62 StreamSubscription<T> _createSubscription(void onData(T value),
63 void onError(error), 63 void onError(error),
64 void onDone(), 64 void onDone(),
65 bool cancelOnError) { 65 bool cancelOnError) {
66 return new _ForwardingStreamSubscription<S, T>( 66 return new _ForwardingStreamSubscription<S, T>(
67 this, onData, onError, onDone, cancelOnError); 67 this, onData, onError, onDone, cancelOnError);
68 } 68 }
69 69
70 // Override the following methods in subclasses to change the behavior. 70 // Override the following methods in subclasses to change the behavior.
71 71
72 void _handleData(S data, _EventSink<T> sink) { 72 void _handleData(S data, _EventOutputSink<T> sink) {
73 var outputData = data; 73 var outputData = data;
74 sink._add(outputData); 74 sink._sendData(outputData);
75 } 75 }
76 76
77 void _handleError(error, _EventSink<T> sink) { 77 void _handleError(error, _EventOutputSink<T> sink) {
78 sink._addError(error); 78 sink._sendError(error);
79 } 79 }
80 80
81 void _handleDone(_EventSink<T> sink) { 81 void _handleDone(_EventOutputSink<T> sink) {
82 sink._close(); 82 sink._sendDone();
83 } 83 }
84 } 84 }
85 85
86 /** 86 /**
87 * Common behavior of [StreamSubscription] classes.
88 *
89 * Stores and allows updating of the event handlers of a [StreamSubscription].
90 */
91 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
92 // TODO(ahe): Restore type when feature is implemented in dart2js
93 // checked mode. http://dartbug.com/7733
94 var /* _DataHandler<T> */ _onData;
95 _ErrorHandler _onError;
96 _DoneHandler _onDone;
97
98 _BaseStreamSubscription(this._onData,
99 this._onError,
100 this._onDone) {
101 if (_onData == null) _onData = _nullDataHandler;
102 if (_onError == null) _onError = _nullErrorHandler;
103 if (_onDone == null) _onDone = _nullDoneHandler;
104 }
105
106 // StreamSubscription interface.
107 void onData(void handleData(T event)) {
108 if (handleData == null) handleData = _nullDataHandler;
109 _onData = handleData;
110 }
111
112 void onError(void handleError(error)) {
113 if (handleError == null) handleError = _nullErrorHandler;
114 _onError = handleError;
115 }
116
117 void onDone(void handleDone()) {
118 if (handleDone == null) handleDone = _nullDoneHandler;
119 _onDone = handleDone;
120 }
121
122 void pause([Future resumeSignal]);
123
124 void resume();
125
126 void cancel();
127
128 Future asFuture([var futureValue]) {
129 _FutureImpl<T> result = new _FutureImpl<T>();
130
131 // Overwrite the onDone and onError handlers.
132 onDone(() { result._setValue(futureValue); });
133 onError((error) {
134 cancel();
135 result._setError(error);
136 });
137
138 return result;
139 }
140 }
141
142
143 /**
87 * Abstract superclass for subscriptions that forward to other subscriptions. 144 * Abstract superclass for subscriptions that forward to other subscriptions.
88 */ 145 */
89 class _ForwardingStreamSubscription<S, T> 146 class _ForwardingStreamSubscription<S, T>
90 extends _BufferingStreamSubscription<T> { 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
91 final _ForwardingStream<S, T> _stream; 148 final _ForwardingStream<S, T> _stream;
149 final bool _cancelOnError;
92 150
93 StreamSubscription<S> _subscription; 151 StreamSubscription<S> _subscription;
94 152
95 _ForwardingStreamSubscription(this._stream, 153 _ForwardingStreamSubscription(this._stream,
96 void onData(T data), 154 void onData(T data),
97 void onError(error), 155 void onError(error),
98 void onDone(), 156 void onDone(),
99 bool cancelOnError) 157 this._cancelOnError)
100 : super(onData, onError, onDone, cancelOnError) { 158 : super(onData, onError, onDone) {
159 // Don't unsubscribe on incoming error, only if we send an error forwards.
101 _subscription = 160 _subscription =
102 _stream._source.listen(_handleData, 161 _stream._source.listen(_handleData,
103 onError: _handleError, 162 onError: _handleError,
104 onDone: _handleDone); 163 onDone: _handleDone);
105 } 164 }
106 165
107 // _StreamSink interface. 166 // StreamSubscription interface.
108 // Transformers sending more than one event have no way to know if the stream
109 // is canceled or closed after the first, so we just ignore remaining events.
110 167
111 void _add(T data) { 168 void pause([Future resumeSignal]) {
112 if (_isClosed) return; 169 if (_subscription == null) return;
113 super._add(data); 170 _subscription.pause(resumeSignal);
114 } 171 }
115 172
116 void _addError(Object error) { 173 void resume() {
117 if (_isClosed) return;
118 super._addError(error);
119 }
120
121 // StreamSubscription callbacks.
122
123 void _onPause() {
124 if (_subscription == null) return;
125 _subscription.pause();
126 }
127
128 void _onResume() {
129 if (_subscription == null) return; 174 if (_subscription == null) return;
130 _subscription.resume(); 175 _subscription.resume();
131 } 176 }
132 177
133 void _onCancel() { 178 bool get isPaused {
179 if (_subscription == null) return false;
180 return _subscription.isPaused;
181 }
182
183 void cancel() {
134 if (_subscription != null) { 184 if (_subscription != null) {
135 StreamSubscription subscription = _subscription; 185 _subscription.cancel();
136 _subscription = null; 186 _subscription = null;
137 subscription.cancel();
138 } 187 }
139 } 188 }
140 189
190 // _EventOutputSink interface. Sends data to this subscription.
191
192 void _sendData(T data) {
193 _onData(data);
194 }
195
196 void _sendError(error) {
197 _onError(error);
198 if (_cancelOnError) {
199 _subscription.cancel();
200 _subscription = null;
201 }
202 }
203
204 void _sendDone() {
205 // If the transformation sends a done signal, we stop the subscription.
206 if (_subscription != null) {
207 _subscription.cancel();
208 _subscription = null;
209 }
210 _onDone();
211 }
212
141 // Methods used as listener on source subscription. 213 // Methods used as listener on source subscription.
142 214
143 // TODO(ahe): Restore type when feature is implemented in dart2js 215 // TODO(ahe): Restore type when feature is implemented in dart2js
144 // checked mode. http://dartbug.com/7733 216 // checked mode. http://dartbug.com/7733
145 void _handleData(/*S*/ data) { 217 void _handleData(/*S*/ data) {
146 _stream._handleData(data, this); 218 _stream._handleData(data, this);
147 } 219 }
148 220
149 void _handleError(error) { 221 void _handleError(error) {
150 _stream._handleError(error, this); 222 _stream._handleError(error, this);
(...skipping 11 matching lines...) Expand all
162 // ------------------------------------------------------------------- 234 // -------------------------------------------------------------------
163 235
164 typedef bool _Predicate<T>(T value); 236 typedef bool _Predicate<T>(T value);
165 237
166 class _WhereStream<T> extends _ForwardingStream<T, T> { 238 class _WhereStream<T> extends _ForwardingStream<T, T> {
167 final _Predicate<T> _test; 239 final _Predicate<T> _test;
168 240
169 _WhereStream(Stream<T> source, bool test(T value)) 241 _WhereStream(Stream<T> source, bool test(T value))
170 : _test = test, super(source); 242 : _test = test, super(source);
171 243
172 void _handleData(T inputEvent, _EventSink<T> sink) { 244 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
173 bool satisfies; 245 bool satisfies;
174 try { 246 try {
175 satisfies = _test(inputEvent); 247 satisfies = _test(inputEvent);
176 } catch (e, s) { 248 } catch (e, s) {
177 sink._addError(_asyncError(e, s)); 249 sink._sendError(_asyncError(e, s));
178 return; 250 return;
179 } 251 }
180 if (satisfies) { 252 if (satisfies) {
181 sink._add(inputEvent); 253 sink._sendData(inputEvent);
182 } 254 }
183 } 255 }
184 } 256 }
185 257
186 258
187 typedef T _Transformation<S, T>(S value); 259 typedef T _Transformation<S, T>(S value);
188 260
189 /** 261 /**
190 * A stream pipe that converts data events before passing them on. 262 * A stream pipe that converts data events before passing them on.
191 */ 263 */
192 class _MapStream<S, T> extends _ForwardingStream<S, T> { 264 class _MapStream<S, T> extends _ForwardingStream<S, T> {
193 final _Transformation _transform; 265 final _Transformation _transform;
194 266
195 _MapStream(Stream<S> source, T transform(S event)) 267 _MapStream(Stream<S> source, T transform(S event))
196 : this._transform = transform, super(source); 268 : this._transform = transform, super(source);
197 269
198 void _handleData(S inputEvent, _EventSink<T> sink) { 270 void _handleData(S inputEvent, _EventOutputSink<T> sink) {
199 T outputEvent; 271 T outputEvent;
200 try { 272 try {
201 outputEvent = _transform(inputEvent); 273 outputEvent = _transform(inputEvent);
202 } catch (e, s) { 274 } catch (e, s) {
203 sink._addError(_asyncError(e, s)); 275 sink._sendError(_asyncError(e, s));
204 return; 276 return;
205 } 277 }
206 sink._add(outputEvent); 278 sink._sendData(outputEvent);
207 } 279 }
208 } 280 }
209 281
210 /** 282 /**
211 * A stream pipe that converts data events before passing them on. 283 * A stream pipe that converts data events before passing them on.
212 */ 284 */
213 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 285 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
214 final _Transformation<S, Iterable<T>> _expand; 286 final _Transformation<S, Iterable<T>> _expand;
215 287
216 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 288 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
217 : this._expand = expand, super(source); 289 : this._expand = expand, super(source);
218 290
219 void _handleData(S inputEvent, _EventSink<T> sink) { 291 void _handleData(S inputEvent, _EventOutputSink<T> sink) {
220 try { 292 try {
221 for (T value in _expand(inputEvent)) { 293 for (T value in _expand(inputEvent)) {
222 sink._add(value); 294 sink._sendData(value);
223 } 295 }
224 } catch (e, s) { 296 } catch (e, s) {
225 // If either _expand or iterating the generated iterator throws, 297 // If either _expand or iterating the generated iterator throws,
226 // we abort the iteration. 298 // we abort the iteration.
227 sink._addError(_asyncError(e, s)); 299 sink._sendError(_asyncError(e, s));
228 } 300 }
229 } 301 }
230 } 302 }
231 303
232 304
233 typedef void _ErrorTransformation(error); 305 typedef void _ErrorTransformation(error);
234 typedef bool _ErrorTest(error); 306 typedef bool _ErrorTest(error);
235 307
236 /** 308 /**
237 * A stream pipe that converts or disposes error events 309 * A stream pipe that converts or disposes error events
238 * before passing them on. 310 * before passing them on.
239 */ 311 */
240 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 312 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
241 final _ErrorTransformation _transform; 313 final _ErrorTransformation _transform;
242 final _ErrorTest _test; 314 final _ErrorTest _test;
243 315
244 _HandleErrorStream(Stream<T> source, 316 _HandleErrorStream(Stream<T> source,
245 void transform(event), 317 void transform(event),
246 bool test(error)) 318 bool test(error))
247 : this._transform = transform, this._test = test, super(source); 319 : this._transform = transform, this._test = test, super(source);
248 320
249 void _handleError(Object error, _EventSink<T> sink) { 321 void _handleError(Object error, _EventOutputSink<T> sink) {
250 bool matches = true; 322 bool matches = true;
251 if (_test != null) { 323 if (_test != null) {
252 try { 324 try {
253 matches = _test(error); 325 matches = _test(error);
254 } catch (e, s) { 326 } catch (e, s) {
255 sink._addError(_asyncError(e, s)); 327 sink._sendError(_asyncError(e, s));
256 return; 328 return;
257 } 329 }
258 } 330 }
259 if (matches) { 331 if (matches) {
260 try { 332 try {
261 _transform(error); 333 _transform(error);
262 } catch (e, s) { 334 } catch (e, s) {
263 sink._addError(_asyncError(e, s)); 335 sink._sendError(_asyncError(e, s));
264 return; 336 return;
265 } 337 }
266 } else { 338 } else {
267 sink._addError(error); 339 sink._sendError(error);
268 } 340 }
269 } 341 }
270 } 342 }
271 343
272 344
273 class _TakeStream<T> extends _ForwardingStream<T, T> { 345 class _TakeStream<T> extends _ForwardingStream<T, T> {
274 int _remaining; 346 int _remaining;
275 347
276 _TakeStream(Stream<T> source, int count) 348 _TakeStream(Stream<T> source, int count)
277 : this._remaining = count, super(source) { 349 : this._remaining = count, super(source) {
278 // This test is done early to avoid handling an async error 350 // This test is done early to avoid handling an async error
279 // in the _handleData method. 351 // in the _handleData method.
280 if (count is! int) throw new ArgumentError(count); 352 if (count is! int) throw new ArgumentError(count);
281 } 353 }
282 354
283 void _handleData(T inputEvent, _EventSink<T> sink) { 355 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
284 if (_remaining > 0) { 356 if (_remaining > 0) {
285 sink._add(inputEvent); 357 sink._sendData(inputEvent);
286 _remaining -= 1; 358 _remaining -= 1;
287 if (_remaining == 0) { 359 if (_remaining == 0) {
288 // Closing also unsubscribes all subscribers, which unsubscribes 360 // Closing also unsubscribes all subscribers, which unsubscribes
289 // this from source. 361 // this from source.
290 sink._close(); 362 sink._sendDone();
291 } 363 }
292 } 364 }
293 } 365 }
294 } 366 }
295 367
296 368
297 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { 369 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
298 final _Predicate<T> _test; 370 final _Predicate<T> _test;
299 371
300 _TakeWhileStream(Stream<T> source, bool test(T value)) 372 _TakeWhileStream(Stream<T> source, bool test(T value))
301 : this._test = test, super(source); 373 : this._test = test, super(source);
302 374
303 void _handleData(T inputEvent, _EventSink<T> sink) { 375 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
304 bool satisfies; 376 bool satisfies;
305 try { 377 try {
306 satisfies = _test(inputEvent); 378 satisfies = _test(inputEvent);
307 } catch (e, s) { 379 } catch (e, s) {
308 sink._addError(_asyncError(e, s)); 380 sink._sendError(_asyncError(e, s));
309 // The test didn't say true. Didn't say false either, but we stop anyway. 381 // The test didn't say true. Didn't say false either, but we stop anyway.
310 sink._close(); 382 sink._sendDone();
311 return; 383 return;
312 } 384 }
313 if (satisfies) { 385 if (satisfies) {
314 sink._add(inputEvent); 386 sink._sendData(inputEvent);
315 } else { 387 } else {
316 sink._close(); 388 sink._sendDone();
317 } 389 }
318 } 390 }
319 } 391 }
320 392
321 class _SkipStream<T> extends _ForwardingStream<T, T> { 393 class _SkipStream<T> extends _ForwardingStream<T, T> {
322 int _remaining; 394 int _remaining;
323 395
324 _SkipStream(Stream<T> source, int count) 396 _SkipStream(Stream<T> source, int count)
325 : this._remaining = count, super(source) { 397 : this._remaining = count, super(source) {
326 // This test is done early to avoid handling an async error 398 // This test is done early to avoid handling an async error
327 // in the _handleData method. 399 // in the _handleData method.
328 if (count is! int || count < 0) throw new ArgumentError(count); 400 if (count is! int || count < 0) throw new ArgumentError(count);
329 } 401 }
330 402
331 void _handleData(T inputEvent, _EventSink<T> sink) { 403 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
332 if (_remaining > 0) { 404 if (_remaining > 0) {
333 _remaining--; 405 _remaining--;
334 return; 406 return;
335 } 407 }
336 return sink._add(inputEvent); 408 return sink._sendData(inputEvent);
337 } 409 }
338 } 410 }
339 411
340 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { 412 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
341 final _Predicate<T> _test; 413 final _Predicate<T> _test;
342 bool _hasFailed = false; 414 bool _hasFailed = false;
343 415
344 _SkipWhileStream(Stream<T> source, bool test(T value)) 416 _SkipWhileStream(Stream<T> source, bool test(T value))
345 : this._test = test, super(source); 417 : this._test = test, super(source);
346 418
347 void _handleData(T inputEvent, _EventSink<T> sink) { 419 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
348 if (_hasFailed) { 420 if (_hasFailed) {
349 sink._add(inputEvent); 421 sink._sendData(inputEvent);
350 return; 422 return;
351 } 423 }
352 bool satisfies; 424 bool satisfies;
353 try { 425 try {
354 satisfies = _test(inputEvent); 426 satisfies = _test(inputEvent);
355 } catch (e, s) { 427 } catch (e, s) {
356 sink._addError(_asyncError(e, s)); 428 sink._sendError(_asyncError(e, s));
357 // A failure to return a boolean is considered "not matching". 429 // A failure to return a boolean is considered "not matching".
358 _hasFailed = true; 430 _hasFailed = true;
359 return; 431 return;
360 } 432 }
361 if (!satisfies) { 433 if (!satisfies) {
362 _hasFailed = true; 434 _hasFailed = true;
363 sink._add(inputEvent); 435 sink._sendData(inputEvent);
364 } 436 }
365 } 437 }
366 } 438 }
367 439
368 typedef bool _Equality<T>(T a, T b); 440 typedef bool _Equality<T>(T a, T b);
369 441
370 class _DistinctStream<T> extends _ForwardingStream<T, T> { 442 class _DistinctStream<T> extends _ForwardingStream<T, T> {
371 static var _SENTINEL = new Object(); 443 static var _SENTINEL = new Object();
372 444
373 _Equality<T> _equals; 445 _Equality<T> _equals;
374 var _previous = _SENTINEL; 446 var _previous = _SENTINEL;
375 447
376 _DistinctStream(Stream<T> source, bool equals(T a, T b)) 448 _DistinctStream(Stream<T> source, bool equals(T a, T b))
377 : _equals = equals, super(source); 449 : _equals = equals, super(source);
378 450
379 void _handleData(T inputEvent, _EventSink<T> sink) { 451 void _handleData(T inputEvent, _EventOutputSink<T> sink) {
380 if (identical(_previous, _SENTINEL)) { 452 if (identical(_previous, _SENTINEL)) {
381 _previous = inputEvent; 453 _previous = inputEvent;
382 return sink._add(inputEvent); 454 return sink._sendData(inputEvent);
383 } else { 455 } else {
384 bool isEqual; 456 bool isEqual;
385 try { 457 try {
386 if (_equals == null) { 458 if (_equals == null) {
387 isEqual = (_previous == inputEvent); 459 isEqual = (_previous == inputEvent);
388 } else { 460 } else {
389 isEqual = _equals(_previous, inputEvent); 461 isEqual = _equals(_previous, inputEvent);
390 } 462 }
391 } catch (e, s) { 463 } catch (e, s) {
392 sink._addError(_asyncError(e, s)); 464 sink._sendError(_asyncError(e, s));
393 return null; 465 return null;
394 } 466 }
395 if (!isEqual) { 467 if (!isEqual) {
396 sink._add(inputEvent); 468 sink._sendData(inputEvent);
397 _previous = inputEvent; 469 _previous = inputEvent;
398 } 470 }
399 } 471 }
400 } 472 }
401 } 473 }
402 474
403 // Stream transformations and event transformations. 475 // Stream transformations and event transformations.
404 476
405 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); 477 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
406 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); 478 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink);
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
455 527
456 void handleError(error, EventSink<T> sink) { 528 void handleError(error, EventSink<T> sink) {
457 _handleError(error, sink); 529 _handleError(error, sink);
458 } 530 }
459 531
460 void handleDone(EventSink<T> sink) { 532 void handleDone(EventSink<T> sink) {
461 _handleDone(sink); 533 _handleDone(sink);
462 } 534 }
463 } 535 }
464 536
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/io/http_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698