OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ |
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { |
9 if (error is AsyncError) return error; | 9 if (error is AsyncError) return error; |
10 if (cause == null) return new AsyncError(error, stackTrace); | 10 if (cause == null) return new AsyncError(error, stackTrace); |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
67 StreamSubscription<T> _createSubscription(void onData(T value), | 67 StreamSubscription<T> _createSubscription(void onData(T value), |
68 void onError(AsyncError error), | 68 void onError(AsyncError error), |
69 void onDone(), | 69 void onDone(), |
70 bool unsubscribeOnError) { | 70 bool unsubscribeOnError) { |
71 return new _ForwardingStreamSubscription<S, T>( | 71 return new _ForwardingStreamSubscription<S, T>( |
72 this, onData, onError, onDone, unsubscribeOnError); | 72 this, onData, onError, onDone, unsubscribeOnError); |
73 } | 73 } |
74 | 74 |
75 // Override the following methods in subclasses to change the behavior. | 75 // Override the following methods in subclasses to change the behavior. |
76 | 76 |
77 void _handleData(S data, _StreamOutputSink<T> sink) { | 77 void _handleData(S data, _EventOutputSink<T> sink) { |
78 var outputData = data; | 78 var outputData = data; |
79 sink._sendData(outputData); | 79 sink._sendData(outputData); |
80 } | 80 } |
81 | 81 |
82 void _handleError(AsyncError error, _StreamOutputSink<T> sink) { | 82 void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
83 sink._sendError(error); | 83 sink._sendError(error); |
84 } | 84 } |
85 | 85 |
86 void _handleDone(_StreamOutputSink<T> sink) { | 86 void _handleDone(_EventOutputSink<T> sink) { |
87 sink._sendDone(); | 87 sink._sendDone(); |
88 } | 88 } |
89 } | 89 } |
90 | 90 |
91 /** | 91 /** |
92 * Common behavior of [StreamSubscription] classes. | 92 * Common behavior of [StreamSubscription] classes. |
93 * | 93 * |
94 * Stores and allows updating of the event handlers of a [StreamSubscription]. | 94 * Stores and allows updating of the event handlers of a [StreamSubscription]. |
95 */ | 95 */ |
96 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { | 96 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
129 void resume(); | 129 void resume(); |
130 | 130 |
131 void cancel(); | 131 void cancel(); |
132 } | 132 } |
133 | 133 |
134 | 134 |
135 /** | 135 /** |
136 * Abstract superclass for subscriptions that forward to other subscriptions. | 136 * Abstract superclass for subscriptions that forward to other subscriptions. |
137 */ | 137 */ |
138 class _ForwardingStreamSubscription<S, T> | 138 class _ForwardingStreamSubscription<S, T> |
139 extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> { | 139 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
140 final _ForwardingStream<S, T> _stream; | 140 final _ForwardingStream<S, T> _stream; |
141 final bool _unsubscribeOnError; | 141 final bool _unsubscribeOnError; |
142 | 142 |
143 StreamSubscription<S> _subscription; | 143 StreamSubscription<S> _subscription; |
144 | 144 |
145 _ForwardingStreamSubscription(this._stream, | 145 _ForwardingStreamSubscription(this._stream, |
146 void onData(T data), | 146 void onData(T data), |
147 void onError(AsyncError error), | 147 void onError(AsyncError error), |
148 void onDone(), | 148 void onDone(), |
149 this._unsubscribeOnError) | 149 this._unsubscribeOnError) |
(...skipping 22 matching lines...) Expand all Loading... |
172 } | 172 } |
173 | 173 |
174 void cancel() { | 174 void cancel() { |
175 if (_subscription == null) { | 175 if (_subscription == null) { |
176 throw new StateError("Subscription has been unsubscribed"); | 176 throw new StateError("Subscription has been unsubscribed"); |
177 } | 177 } |
178 _subscription.cancel(); | 178 _subscription.cancel(); |
179 _subscription = null; | 179 _subscription = null; |
180 } | 180 } |
181 | 181 |
182 // _StreamOutputSink interface. Sends data to this subscription. | 182 // _EventOutputSink interface. Sends data to this subscription. |
183 | 183 |
184 void _sendData(T data) { | 184 void _sendData(T data) { |
185 _onData(data); | 185 _onData(data); |
186 } | 186 } |
187 | 187 |
188 void _sendError(AsyncError error) { | 188 void _sendError(AsyncError error) { |
189 _onError(error); | 189 _onError(error); |
190 if (_unsubscribeOnError) { | 190 if (_unsubscribeOnError) { |
191 _subscription.cancel(); | 191 _subscription.cancel(); |
192 _subscription = null; | 192 _subscription = null; |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
226 // ------------------------------------------------------------------- | 226 // ------------------------------------------------------------------- |
227 | 227 |
228 typedef bool _Predicate<T>(T value); | 228 typedef bool _Predicate<T>(T value); |
229 | 229 |
230 class _WhereStream<T> extends _ForwardingStream<T, T> { | 230 class _WhereStream<T> extends _ForwardingStream<T, T> { |
231 final _Predicate<T> _test; | 231 final _Predicate<T> _test; |
232 | 232 |
233 _WhereStream(Stream<T> source, bool test(T value)) | 233 _WhereStream(Stream<T> source, bool test(T value)) |
234 : _test = test, super(source); | 234 : _test = test, super(source); |
235 | 235 |
236 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 236 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
237 bool satisfies; | 237 bool satisfies; |
238 try { | 238 try { |
239 satisfies = _test(inputEvent); | 239 satisfies = _test(inputEvent); |
240 } catch (e, s) { | 240 } catch (e, s) { |
241 sink._sendError(_asyncError(e, s)); | 241 sink._sendError(_asyncError(e, s)); |
242 return; | 242 return; |
243 } | 243 } |
244 if (satisfies) { | 244 if (satisfies) { |
245 sink._sendData(inputEvent); | 245 sink._sendData(inputEvent); |
246 } | 246 } |
247 } | 247 } |
248 } | 248 } |
249 | 249 |
250 | 250 |
251 typedef T _Transformation<S, T>(S value); | 251 typedef T _Transformation<S, T>(S value); |
252 | 252 |
253 /** | 253 /** |
254 * A stream pipe that converts data events before passing them on. | 254 * A stream pipe that converts data events before passing them on. |
255 */ | 255 */ |
256 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 256 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
257 final _Transformation _transform; | 257 final _Transformation _transform; |
258 | 258 |
259 _MapStream(Stream<S> source, T transform(S event)) | 259 _MapStream(Stream<S> source, T transform(S event)) |
260 : this._transform = transform, super(source); | 260 : this._transform = transform, super(source); |
261 | 261 |
262 void _handleData(S inputEvent, _StreamOutputSink<T> sink) { | 262 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
263 T outputEvent; | 263 T outputEvent; |
264 try { | 264 try { |
265 outputEvent = _transform(inputEvent); | 265 outputEvent = _transform(inputEvent); |
266 } catch (e, s) { | 266 } catch (e, s) { |
267 sink._sendError(_asyncError(e, s)); | 267 sink._sendError(_asyncError(e, s)); |
268 return; | 268 return; |
269 } | 269 } |
270 sink._sendData(outputEvent); | 270 sink._sendData(outputEvent); |
271 } | 271 } |
272 } | 272 } |
273 | 273 |
274 /** | 274 /** |
275 * A stream pipe that converts data events before passing them on. | 275 * A stream pipe that converts data events before passing them on. |
276 */ | 276 */ |
277 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 277 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
278 final _Transformation<S, Iterable<T>> _expand; | 278 final _Transformation<S, Iterable<T>> _expand; |
279 | 279 |
280 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 280 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
281 : this._expand = expand, super(source); | 281 : this._expand = expand, super(source); |
282 | 282 |
283 void _handleData(S inputEvent, _StreamOutputSink<T> sink) { | 283 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
284 try { | 284 try { |
285 for (T value in _expand(inputEvent)) { | 285 for (T value in _expand(inputEvent)) { |
286 sink._sendData(value); | 286 sink._sendData(value); |
287 } | 287 } |
288 } catch (e, s) { | 288 } catch (e, s) { |
289 // If either _expand or iterating the generated iterator throws, | 289 // If either _expand or iterating the generated iterator throws, |
290 // we abort the iteration. | 290 // we abort the iteration. |
291 sink._sendError(_asyncError(e, s)); | 291 sink._sendError(_asyncError(e, s)); |
292 } | 292 } |
293 } | 293 } |
294 } | 294 } |
295 | 295 |
296 | 296 |
297 typedef void _ErrorTransformation(AsyncError error); | 297 typedef void _ErrorTransformation(AsyncError error); |
298 typedef bool _ErrorTest(error); | 298 typedef bool _ErrorTest(error); |
299 | 299 |
300 /** | 300 /** |
301 * A stream pipe that converts or disposes error events | 301 * A stream pipe that converts or disposes error events |
302 * before passing them on. | 302 * before passing them on. |
303 */ | 303 */ |
304 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 304 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
305 final _ErrorTransformation _transform; | 305 final _ErrorTransformation _transform; |
306 final _ErrorTest _test; | 306 final _ErrorTest _test; |
307 | 307 |
308 _HandleErrorStream(Stream<T> source, | 308 _HandleErrorStream(Stream<T> source, |
309 void transform(AsyncError event), | 309 void transform(AsyncError event), |
310 bool test(error)) | 310 bool test(error)) |
311 : this._transform = transform, this._test = test, super(source); | 311 : this._transform = transform, this._test = test, super(source); |
312 | 312 |
313 void _handleError(AsyncError error, _StreamOutputSink<T> sink) { | 313 void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
314 bool matches = true; | 314 bool matches = true; |
315 if (_test != null) { | 315 if (_test != null) { |
316 try { | 316 try { |
317 matches = _test(error.error); | 317 matches = _test(error.error); |
318 } catch (e, s) { | 318 } catch (e, s) { |
319 sink._sendError(_asyncError(e, s, error)); | 319 sink._sendError(_asyncError(e, s, error)); |
320 return; | 320 return; |
321 } | 321 } |
322 } | 322 } |
323 if (matches) { | 323 if (matches) { |
(...skipping 13 matching lines...) Expand all Loading... |
337 class _TakeStream<T> extends _ForwardingStream<T, T> { | 337 class _TakeStream<T> extends _ForwardingStream<T, T> { |
338 int _remaining; | 338 int _remaining; |
339 | 339 |
340 _TakeStream(Stream<T> source, int count) | 340 _TakeStream(Stream<T> source, int count) |
341 : this._remaining = count, super(source) { | 341 : this._remaining = count, super(source) { |
342 // This test is done early to avoid handling an async error | 342 // This test is done early to avoid handling an async error |
343 // in the _handleData method. | 343 // in the _handleData method. |
344 if (count is! int) throw new ArgumentError(count); | 344 if (count is! int) throw new ArgumentError(count); |
345 } | 345 } |
346 | 346 |
347 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 347 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
348 if (_remaining > 0) { | 348 if (_remaining > 0) { |
349 sink._sendData(inputEvent); | 349 sink._sendData(inputEvent); |
350 _remaining -= 1; | 350 _remaining -= 1; |
351 if (_remaining == 0) { | 351 if (_remaining == 0) { |
352 // Closing also unsubscribes all subscribers, which unsubscribes | 352 // Closing also unsubscribes all subscribers, which unsubscribes |
353 // this from source. | 353 // this from source. |
354 sink._sendDone(); | 354 sink._sendDone(); |
355 } | 355 } |
356 } | 356 } |
357 } | 357 } |
358 } | 358 } |
359 | 359 |
360 | 360 |
361 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 361 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
362 final _Predicate<T> _test; | 362 final _Predicate<T> _test; |
363 | 363 |
364 _TakeWhileStream(Stream<T> source, bool test(T value)) | 364 _TakeWhileStream(Stream<T> source, bool test(T value)) |
365 : this._test = test, super(source); | 365 : this._test = test, super(source); |
366 | 366 |
367 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 367 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
368 bool satisfies; | 368 bool satisfies; |
369 try { | 369 try { |
370 satisfies = _test(inputEvent); | 370 satisfies = _test(inputEvent); |
371 } catch (e, s) { | 371 } catch (e, s) { |
372 sink._sendError(_asyncError(e, s)); | 372 sink._sendError(_asyncError(e, s)); |
373 // The test didn't say true. Didn't say false either, but we stop anyway. | 373 // The test didn't say true. Didn't say false either, but we stop anyway. |
374 sink._sendDone(); | 374 sink._sendDone(); |
375 return; | 375 return; |
376 } | 376 } |
377 if (satisfies) { | 377 if (satisfies) { |
378 sink._sendData(inputEvent); | 378 sink._sendData(inputEvent); |
379 } else { | 379 } else { |
380 sink._sendDone(); | 380 sink._sendDone(); |
381 } | 381 } |
382 } | 382 } |
383 } | 383 } |
384 | 384 |
385 class _SkipStream<T> extends _ForwardingStream<T, T> { | 385 class _SkipStream<T> extends _ForwardingStream<T, T> { |
386 int _remaining; | 386 int _remaining; |
387 | 387 |
388 _SkipStream(Stream<T> source, int count) | 388 _SkipStream(Stream<T> source, int count) |
389 : this._remaining = count, super(source) { | 389 : this._remaining = count, super(source) { |
390 // This test is done early to avoid handling an async error | 390 // This test is done early to avoid handling an async error |
391 // in the _handleData method. | 391 // in the _handleData method. |
392 if (count is! int || count < 0) throw new ArgumentError(count); | 392 if (count is! int || count < 0) throw new ArgumentError(count); |
393 } | 393 } |
394 | 394 |
395 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 395 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
396 if (_remaining > 0) { | 396 if (_remaining > 0) { |
397 _remaining--; | 397 _remaining--; |
398 return; | 398 return; |
399 } | 399 } |
400 return sink._sendData(inputEvent); | 400 return sink._sendData(inputEvent); |
401 } | 401 } |
402 } | 402 } |
403 | 403 |
404 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 404 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
405 final _Predicate<T> _test; | 405 final _Predicate<T> _test; |
406 bool _hasFailed = false; | 406 bool _hasFailed = false; |
407 | 407 |
408 _SkipWhileStream(Stream<T> source, bool test(T value)) | 408 _SkipWhileStream(Stream<T> source, bool test(T value)) |
409 : this._test = test, super(source); | 409 : this._test = test, super(source); |
410 | 410 |
411 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 411 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
412 if (_hasFailed) { | 412 if (_hasFailed) { |
413 sink._sendData(inputEvent); | 413 sink._sendData(inputEvent); |
414 } | 414 } |
415 bool satisfies; | 415 bool satisfies; |
416 try { | 416 try { |
417 satisfies = _test(inputEvent); | 417 satisfies = _test(inputEvent); |
418 } catch (e, s) { | 418 } catch (e, s) { |
419 sink._sendError(_asyncError(e, s)); | 419 sink._sendError(_asyncError(e, s)); |
420 // A failure to return a boolean is considered "not matching". | 420 // A failure to return a boolean is considered "not matching". |
421 _hasFailed = true; | 421 _hasFailed = true; |
(...skipping 10 matching lines...) Expand all Loading... |
432 | 432 |
433 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 433 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
434 static var _SENTINEL = new Object(); | 434 static var _SENTINEL = new Object(); |
435 | 435 |
436 _Equality<T> _equals; | 436 _Equality<T> _equals; |
437 var _previous = _SENTINEL; | 437 var _previous = _SENTINEL; |
438 | 438 |
439 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 439 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
440 : _equals = equals, super(source); | 440 : _equals = equals, super(source); |
441 | 441 |
442 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 442 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
443 if (identical(_previous, _SENTINEL)) { | 443 if (identical(_previous, _SENTINEL)) { |
444 _previous = inputEvent; | 444 _previous = inputEvent; |
445 return sink._sendData(inputEvent); | 445 return sink._sendData(inputEvent); |
446 } else { | 446 } else { |
447 bool isEqual; | 447 bool isEqual; |
448 try { | 448 try { |
449 if (_equals == null) { | 449 if (_equals == null) { |
450 isEqual = (_previous == inputEvent); | 450 isEqual = (_previous == inputEvent); |
451 } else { | 451 } else { |
452 isEqual = _equals(_previous, inputEvent); | 452 isEqual = _equals(_previous, inputEvent); |
453 } | 453 } |
454 } catch (e, s) { | 454 } catch (e, s) { |
455 sink._sendError(_asyncError(e, s)); | 455 sink._sendError(_asyncError(e, s)); |
456 return null; | 456 return null; |
457 } | 457 } |
458 if (!isEqual) { | 458 if (!isEqual) { |
459 sink._sendData(inputEvent); | 459 sink._sendData(inputEvent); |
460 _previous = inputEvent; | 460 _previous = inputEvent; |
461 } | 461 } |
462 } | 462 } |
463 } | 463 } |
464 } | 464 } |
465 | 465 |
466 // Stream transformations and event transformations. | 466 // Stream transformations and event transformations. |
467 | 467 |
468 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); | 468 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
469 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); | 469 typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink); |
470 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); | 470 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
471 | 471 |
472 /** Default data handler forwards all data. */ | 472 /** Default data handler forwards all data. */ |
473 void _defaultHandleData(var data, StreamSink sink) { | 473 void _defaultHandleData(var data, EventSink sink) { |
474 sink.add(data); | 474 sink.add(data); |
475 } | 475 } |
476 | 476 |
477 /** Default error handler forwards all errors. */ | 477 /** Default error handler forwards all errors. */ |
478 void _defaultHandleError(AsyncError error, StreamSink sink) { | 478 void _defaultHandleError(AsyncError error, EventSink sink) { |
479 sink.signalError(error); | 479 sink.addError(error); |
480 } | 480 } |
481 | 481 |
482 /** Default done handler forwards done. */ | 482 /** Default done handler forwards done. */ |
483 void _defaultHandleDone(StreamSink sink) { | 483 void _defaultHandleDone(EventSink sink) { |
484 sink.close(); | 484 sink.close(); |
485 } | 485 } |
486 | 486 |
487 | 487 |
488 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ | |
489 class _StreamImplSink<T> implements StreamSink<T> { | |
490 _StreamImpl<T> _target; | |
491 _StreamImplSink(this._target); | |
492 void add(T data) { _target._add(data); } | |
493 void signalError(AsyncError error) { _target._signalError(error); } | |
494 void close() { _target._close(); } | |
495 } | |
496 | |
497 /** | 488 /** |
498 * A [StreamTransformer] that modifies stream events. | 489 * A [StreamTransformer] that modifies stream events. |
499 * | 490 * |
500 * This class is used by [StreamTransformer]'s factory constructor. | 491 * This class is used by [StreamTransformer]'s factory constructor. |
501 * It is actually an [StreamEventTransformer] where the functions used to | 492 * It is actually an [StreamEventTransformer] where the functions used to |
502 * modify the events are passed as constructor arguments. | 493 * modify the events are passed as constructor arguments. |
503 * | 494 * |
504 * If an argument is omitted, it acts as the default method from | 495 * If an argument is omitted, it acts as the default method from |
505 * [StreamEventTransformer]. | 496 * [StreamEventTransformer]. |
506 */ | 497 */ |
507 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { | 498 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { |
508 // TODO(ahe): Restore type when feature is implemented in dart2js | 499 // TODO(ahe): Restore type when feature is implemented in dart2js |
509 // checked mode. http://dartbug.com/7733 | 500 // checked mode. http://dartbug.com/7733 |
510 final Function /*_TransformDataHandler<S, T>*/ _handleData; | 501 final Function /*_TransformDataHandler<S, T>*/ _handleData; |
511 final _TransformErrorHandler<T> _handleError; | 502 final _TransformErrorHandler<T> _handleError; |
512 final _TransformDoneHandler<T> _handleDone; | 503 final _TransformDoneHandler<T> _handleDone; |
513 | 504 |
514 _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink), | 505 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), |
515 void handleError(AsyncError data, StreamSink<T> sink), | 506 void handleError(AsyncError data, EventSink<T> sink), |
516 void handleDone(StreamSink<T> sink)) | 507 void handleDone(EventSink<T> sink)) |
517 : this._handleData = (handleData == null ? _defaultHandleData | 508 : this._handleData = (handleData == null ? _defaultHandleData |
518 : handleData), | 509 : handleData), |
519 this._handleError = (handleError == null ? _defaultHandleError | 510 this._handleError = (handleError == null ? _defaultHandleError |
520 : handleError), | 511 : handleError), |
521 this._handleDone = (handleDone == null ? _defaultHandleDone | 512 this._handleDone = (handleDone == null ? _defaultHandleDone |
522 : handleDone); | 513 : handleDone); |
523 | 514 |
524 void handleData(S data, StreamSink<T> sink) { | 515 void handleData(S data, EventSink<T> sink) { |
525 _handleData(data, sink); | 516 _handleData(data, sink); |
526 } | 517 } |
527 | 518 |
528 void handleError(AsyncError error, StreamSink<T> sink) { | 519 void handleError(AsyncError error, EventSink<T> sink) { |
529 _handleError(error, sink); | 520 _handleError(error, sink); |
530 } | 521 } |
531 | 522 |
532 void handleDone(StreamSink<T> sink) { | 523 void handleDone(EventSink<T> sink) { |
533 _handleDone(sink); | 524 _handleDone(sink); |
534 } | 525 } |
535 } | 526 } |
536 | 527 |
OLD | NEW |