| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ |
| 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { |
| 9 if (error is AsyncError) return error; | 9 if (error is AsyncError) return error; |
| 10 if (cause == null) return new AsyncError(error, stackTrace); | 10 if (cause == null) return new AsyncError(error, stackTrace); |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 67 StreamSubscription<T> _createSubscription(void onData(T value), | 67 StreamSubscription<T> _createSubscription(void onData(T value), |
| 68 void onError(AsyncError error), | 68 void onError(AsyncError error), |
| 69 void onDone(), | 69 void onDone(), |
| 70 bool unsubscribeOnError) { | 70 bool unsubscribeOnError) { |
| 71 return new _ForwardingStreamSubscription<S, T>( | 71 return new _ForwardingStreamSubscription<S, T>( |
| 72 this, onData, onError, onDone, unsubscribeOnError); | 72 this, onData, onError, onDone, unsubscribeOnError); |
| 73 } | 73 } |
| 74 | 74 |
| 75 // Override the following methods in subclasses to change the behavior. | 75 // Override the following methods in subclasses to change the behavior. |
| 76 | 76 |
| 77 void _handleData(S data, _StreamOutputSink<T> sink) { | 77 void _handleData(S data, _EventOutputSink<T> sink) { |
| 78 var outputData = data; | 78 var outputData = data; |
| 79 sink._sendData(outputData); | 79 sink._sendData(outputData); |
| 80 } | 80 } |
| 81 | 81 |
| 82 void _handleError(AsyncError error, _StreamOutputSink<T> sink) { | 82 void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
| 83 sink._sendError(error); | 83 sink._sendError(error); |
| 84 } | 84 } |
| 85 | 85 |
| 86 void _handleDone(_StreamOutputSink<T> sink) { | 86 void _handleDone(_EventOutputSink<T> sink) { |
| 87 sink._sendDone(); | 87 sink._sendDone(); |
| 88 } | 88 } |
| 89 } | 89 } |
| 90 | 90 |
| 91 /** | 91 /** |
| 92 * Common behavior of [StreamSubscription] classes. | 92 * Common behavior of [StreamSubscription] classes. |
| 93 * | 93 * |
| 94 * Stores and allows updating of the event handlers of a [StreamSubscription]. | 94 * Stores and allows updating of the event handlers of a [StreamSubscription]. |
| 95 */ | 95 */ |
| 96 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { | 96 abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 129 void resume(); | 129 void resume(); |
| 130 | 130 |
| 131 void cancel(); | 131 void cancel(); |
| 132 } | 132 } |
| 133 | 133 |
| 134 | 134 |
| 135 /** | 135 /** |
| 136 * Abstract superclass for subscriptions that forward to other subscriptions. | 136 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 137 */ | 137 */ |
| 138 class _ForwardingStreamSubscription<S, T> | 138 class _ForwardingStreamSubscription<S, T> |
| 139 extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> { | 139 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
| 140 final _ForwardingStream<S, T> _stream; | 140 final _ForwardingStream<S, T> _stream; |
| 141 final bool _unsubscribeOnError; | 141 final bool _unsubscribeOnError; |
| 142 | 142 |
| 143 StreamSubscription<S> _subscription; | 143 StreamSubscription<S> _subscription; |
| 144 | 144 |
| 145 _ForwardingStreamSubscription(this._stream, | 145 _ForwardingStreamSubscription(this._stream, |
| 146 void onData(T data), | 146 void onData(T data), |
| 147 void onError(AsyncError error), | 147 void onError(AsyncError error), |
| 148 void onDone(), | 148 void onDone(), |
| 149 this._unsubscribeOnError) | 149 this._unsubscribeOnError) |
| (...skipping 22 matching lines...) Expand all Loading... |
| 172 } | 172 } |
| 173 | 173 |
| 174 void cancel() { | 174 void cancel() { |
| 175 if (_subscription == null) { | 175 if (_subscription == null) { |
| 176 throw new StateError("Subscription has been unsubscribed"); | 176 throw new StateError("Subscription has been unsubscribed"); |
| 177 } | 177 } |
| 178 _subscription.cancel(); | 178 _subscription.cancel(); |
| 179 _subscription = null; | 179 _subscription = null; |
| 180 } | 180 } |
| 181 | 181 |
| 182 // _StreamOutputSink interface. Sends data to this subscription. | 182 // _EventOutputSink interface. Sends data to this subscription. |
| 183 | 183 |
| 184 void _sendData(T data) { | 184 void _sendData(T data) { |
| 185 _onData(data); | 185 _onData(data); |
| 186 } | 186 } |
| 187 | 187 |
| 188 void _sendError(AsyncError error) { | 188 void _sendError(AsyncError error) { |
| 189 _onError(error); | 189 _onError(error); |
| 190 if (_unsubscribeOnError) { | 190 if (_unsubscribeOnError) { |
| 191 _subscription.cancel(); | 191 _subscription.cancel(); |
| 192 _subscription = null; | 192 _subscription = null; |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 226 // ------------------------------------------------------------------- | 226 // ------------------------------------------------------------------- |
| 227 | 227 |
| 228 typedef bool _Predicate<T>(T value); | 228 typedef bool _Predicate<T>(T value); |
| 229 | 229 |
| 230 class _WhereStream<T> extends _ForwardingStream<T, T> { | 230 class _WhereStream<T> extends _ForwardingStream<T, T> { |
| 231 final _Predicate<T> _test; | 231 final _Predicate<T> _test; |
| 232 | 232 |
| 233 _WhereStream(Stream<T> source, bool test(T value)) | 233 _WhereStream(Stream<T> source, bool test(T value)) |
| 234 : _test = test, super(source); | 234 : _test = test, super(source); |
| 235 | 235 |
| 236 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 236 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 237 bool satisfies; | 237 bool satisfies; |
| 238 try { | 238 try { |
| 239 satisfies = _test(inputEvent); | 239 satisfies = _test(inputEvent); |
| 240 } catch (e, s) { | 240 } catch (e, s) { |
| 241 sink._sendError(_asyncError(e, s)); | 241 sink._sendError(_asyncError(e, s)); |
| 242 return; | 242 return; |
| 243 } | 243 } |
| 244 if (satisfies) { | 244 if (satisfies) { |
| 245 sink._sendData(inputEvent); | 245 sink._sendData(inputEvent); |
| 246 } | 246 } |
| 247 } | 247 } |
| 248 } | 248 } |
| 249 | 249 |
| 250 | 250 |
| 251 typedef T _Transformation<S, T>(S value); | 251 typedef T _Transformation<S, T>(S value); |
| 252 | 252 |
| 253 /** | 253 /** |
| 254 * A stream pipe that converts data events before passing them on. | 254 * A stream pipe that converts data events before passing them on. |
| 255 */ | 255 */ |
| 256 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 256 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 257 final _Transformation _transform; | 257 final _Transformation _transform; |
| 258 | 258 |
| 259 _MapStream(Stream<S> source, T transform(S event)) | 259 _MapStream(Stream<S> source, T transform(S event)) |
| 260 : this._transform = transform, super(source); | 260 : this._transform = transform, super(source); |
| 261 | 261 |
| 262 void _handleData(S inputEvent, _StreamOutputSink<T> sink) { | 262 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
| 263 T outputEvent; | 263 T outputEvent; |
| 264 try { | 264 try { |
| 265 outputEvent = _transform(inputEvent); | 265 outputEvent = _transform(inputEvent); |
| 266 } catch (e, s) { | 266 } catch (e, s) { |
| 267 sink._sendError(_asyncError(e, s)); | 267 sink._sendError(_asyncError(e, s)); |
| 268 return; | 268 return; |
| 269 } | 269 } |
| 270 sink._sendData(outputEvent); | 270 sink._sendData(outputEvent); |
| 271 } | 271 } |
| 272 } | 272 } |
| 273 | 273 |
| 274 /** | 274 /** |
| 275 * A stream pipe that converts data events before passing them on. | 275 * A stream pipe that converts data events before passing them on. |
| 276 */ | 276 */ |
| 277 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 277 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 278 final _Transformation<S, Iterable<T>> _expand; | 278 final _Transformation<S, Iterable<T>> _expand; |
| 279 | 279 |
| 280 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 280 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| 281 : this._expand = expand, super(source); | 281 : this._expand = expand, super(source); |
| 282 | 282 |
| 283 void _handleData(S inputEvent, _StreamOutputSink<T> sink) { | 283 void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
| 284 try { | 284 try { |
| 285 for (T value in _expand(inputEvent)) { | 285 for (T value in _expand(inputEvent)) { |
| 286 sink._sendData(value); | 286 sink._sendData(value); |
| 287 } | 287 } |
| 288 } catch (e, s) { | 288 } catch (e, s) { |
| 289 // If either _expand or iterating the generated iterator throws, | 289 // If either _expand or iterating the generated iterator throws, |
| 290 // we abort the iteration. | 290 // we abort the iteration. |
| 291 sink._sendError(_asyncError(e, s)); | 291 sink._sendError(_asyncError(e, s)); |
| 292 } | 292 } |
| 293 } | 293 } |
| 294 } | 294 } |
| 295 | 295 |
| 296 | 296 |
| 297 typedef void _ErrorTransformation(AsyncError error); | 297 typedef void _ErrorTransformation(AsyncError error); |
| 298 typedef bool _ErrorTest(error); | 298 typedef bool _ErrorTest(error); |
| 299 | 299 |
| 300 /** | 300 /** |
| 301 * A stream pipe that converts or disposes error events | 301 * A stream pipe that converts or disposes error events |
| 302 * before passing them on. | 302 * before passing them on. |
| 303 */ | 303 */ |
| 304 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 304 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 305 final _ErrorTransformation _transform; | 305 final _ErrorTransformation _transform; |
| 306 final _ErrorTest _test; | 306 final _ErrorTest _test; |
| 307 | 307 |
| 308 _HandleErrorStream(Stream<T> source, | 308 _HandleErrorStream(Stream<T> source, |
| 309 void transform(AsyncError event), | 309 void transform(AsyncError event), |
| 310 bool test(error)) | 310 bool test(error)) |
| 311 : this._transform = transform, this._test = test, super(source); | 311 : this._transform = transform, this._test = test, super(source); |
| 312 | 312 |
| 313 void _handleError(AsyncError error, _StreamOutputSink<T> sink) { | 313 void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
| 314 bool matches = true; | 314 bool matches = true; |
| 315 if (_test != null) { | 315 if (_test != null) { |
| 316 try { | 316 try { |
| 317 matches = _test(error.error); | 317 matches = _test(error.error); |
| 318 } catch (e, s) { | 318 } catch (e, s) { |
| 319 sink._sendError(_asyncError(e, s, error)); | 319 sink._sendError(_asyncError(e, s, error)); |
| 320 return; | 320 return; |
| 321 } | 321 } |
| 322 } | 322 } |
| 323 if (matches) { | 323 if (matches) { |
| (...skipping 13 matching lines...) Expand all Loading... |
| 337 class _TakeStream<T> extends _ForwardingStream<T, T> { | 337 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| 338 int _remaining; | 338 int _remaining; |
| 339 | 339 |
| 340 _TakeStream(Stream<T> source, int count) | 340 _TakeStream(Stream<T> source, int count) |
| 341 : this._remaining = count, super(source) { | 341 : this._remaining = count, super(source) { |
| 342 // This test is done early to avoid handling an async error | 342 // This test is done early to avoid handling an async error |
| 343 // in the _handleData method. | 343 // in the _handleData method. |
| 344 if (count is! int) throw new ArgumentError(count); | 344 if (count is! int) throw new ArgumentError(count); |
| 345 } | 345 } |
| 346 | 346 |
| 347 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 347 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 348 if (_remaining > 0) { | 348 if (_remaining > 0) { |
| 349 sink._sendData(inputEvent); | 349 sink._sendData(inputEvent); |
| 350 _remaining -= 1; | 350 _remaining -= 1; |
| 351 if (_remaining == 0) { | 351 if (_remaining == 0) { |
| 352 // Closing also unsubscribes all subscribers, which unsubscribes | 352 // Closing also unsubscribes all subscribers, which unsubscribes |
| 353 // this from source. | 353 // this from source. |
| 354 sink._sendDone(); | 354 sink._sendDone(); |
| 355 } | 355 } |
| 356 } | 356 } |
| 357 } | 357 } |
| 358 } | 358 } |
| 359 | 359 |
| 360 | 360 |
| 361 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 361 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 362 final _Predicate<T> _test; | 362 final _Predicate<T> _test; |
| 363 | 363 |
| 364 _TakeWhileStream(Stream<T> source, bool test(T value)) | 364 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 365 : this._test = test, super(source); | 365 : this._test = test, super(source); |
| 366 | 366 |
| 367 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 367 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 368 bool satisfies; | 368 bool satisfies; |
| 369 try { | 369 try { |
| 370 satisfies = _test(inputEvent); | 370 satisfies = _test(inputEvent); |
| 371 } catch (e, s) { | 371 } catch (e, s) { |
| 372 sink._sendError(_asyncError(e, s)); | 372 sink._sendError(_asyncError(e, s)); |
| 373 // The test didn't say true. Didn't say false either, but we stop anyway. | 373 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 374 sink._sendDone(); | 374 sink._sendDone(); |
| 375 return; | 375 return; |
| 376 } | 376 } |
| 377 if (satisfies) { | 377 if (satisfies) { |
| 378 sink._sendData(inputEvent); | 378 sink._sendData(inputEvent); |
| 379 } else { | 379 } else { |
| 380 sink._sendDone(); | 380 sink._sendDone(); |
| 381 } | 381 } |
| 382 } | 382 } |
| 383 } | 383 } |
| 384 | 384 |
| 385 class _SkipStream<T> extends _ForwardingStream<T, T> { | 385 class _SkipStream<T> extends _ForwardingStream<T, T> { |
| 386 int _remaining; | 386 int _remaining; |
| 387 | 387 |
| 388 _SkipStream(Stream<T> source, int count) | 388 _SkipStream(Stream<T> source, int count) |
| 389 : this._remaining = count, super(source) { | 389 : this._remaining = count, super(source) { |
| 390 // This test is done early to avoid handling an async error | 390 // This test is done early to avoid handling an async error |
| 391 // in the _handleData method. | 391 // in the _handleData method. |
| 392 if (count is! int || count < 0) throw new ArgumentError(count); | 392 if (count is! int || count < 0) throw new ArgumentError(count); |
| 393 } | 393 } |
| 394 | 394 |
| 395 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 395 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 396 if (_remaining > 0) { | 396 if (_remaining > 0) { |
| 397 _remaining--; | 397 _remaining--; |
| 398 return; | 398 return; |
| 399 } | 399 } |
| 400 return sink._sendData(inputEvent); | 400 return sink._sendData(inputEvent); |
| 401 } | 401 } |
| 402 } | 402 } |
| 403 | 403 |
| 404 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 404 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| 405 final _Predicate<T> _test; | 405 final _Predicate<T> _test; |
| 406 bool _hasFailed = false; | 406 bool _hasFailed = false; |
| 407 | 407 |
| 408 _SkipWhileStream(Stream<T> source, bool test(T value)) | 408 _SkipWhileStream(Stream<T> source, bool test(T value)) |
| 409 : this._test = test, super(source); | 409 : this._test = test, super(source); |
| 410 | 410 |
| 411 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 411 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 412 if (_hasFailed) { | 412 if (_hasFailed) { |
| 413 sink._sendData(inputEvent); | 413 sink._sendData(inputEvent); |
| 414 } | 414 } |
| 415 bool satisfies; | 415 bool satisfies; |
| 416 try { | 416 try { |
| 417 satisfies = _test(inputEvent); | 417 satisfies = _test(inputEvent); |
| 418 } catch (e, s) { | 418 } catch (e, s) { |
| 419 sink._sendError(_asyncError(e, s)); | 419 sink._sendError(_asyncError(e, s)); |
| 420 // A failure to return a boolean is considered "not matching". | 420 // A failure to return a boolean is considered "not matching". |
| 421 _hasFailed = true; | 421 _hasFailed = true; |
| (...skipping 10 matching lines...) Expand all Loading... |
| 432 | 432 |
| 433 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 433 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| 434 static var _SENTINEL = new Object(); | 434 static var _SENTINEL = new Object(); |
| 435 | 435 |
| 436 _Equality<T> _equals; | 436 _Equality<T> _equals; |
| 437 var _previous = _SENTINEL; | 437 var _previous = _SENTINEL; |
| 438 | 438 |
| 439 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 439 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| 440 : _equals = equals, super(source); | 440 : _equals = equals, super(source); |
| 441 | 441 |
| 442 void _handleData(T inputEvent, _StreamOutputSink<T> sink) { | 442 void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| 443 if (identical(_previous, _SENTINEL)) { | 443 if (identical(_previous, _SENTINEL)) { |
| 444 _previous = inputEvent; | 444 _previous = inputEvent; |
| 445 return sink._sendData(inputEvent); | 445 return sink._sendData(inputEvent); |
| 446 } else { | 446 } else { |
| 447 bool isEqual; | 447 bool isEqual; |
| 448 try { | 448 try { |
| 449 if (_equals == null) { | 449 if (_equals == null) { |
| 450 isEqual = (_previous == inputEvent); | 450 isEqual = (_previous == inputEvent); |
| 451 } else { | 451 } else { |
| 452 isEqual = _equals(_previous, inputEvent); | 452 isEqual = _equals(_previous, inputEvent); |
| 453 } | 453 } |
| 454 } catch (e, s) { | 454 } catch (e, s) { |
| 455 sink._sendError(_asyncError(e, s)); | 455 sink._sendError(_asyncError(e, s)); |
| 456 return null; | 456 return null; |
| 457 } | 457 } |
| 458 if (!isEqual) { | 458 if (!isEqual) { |
| 459 sink._sendData(inputEvent); | 459 sink._sendData(inputEvent); |
| 460 _previous = inputEvent; | 460 _previous = inputEvent; |
| 461 } | 461 } |
| 462 } | 462 } |
| 463 } | 463 } |
| 464 } | 464 } |
| 465 | 465 |
| 466 // Stream transformations and event transformations. | 466 // Stream transformations and event transformations. |
| 467 | 467 |
| 468 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); | 468 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 469 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); | 469 typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink); |
| 470 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); | 470 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
| 471 | 471 |
| 472 /** Default data handler forwards all data. */ | 472 /** Default data handler forwards all data. */ |
| 473 void _defaultHandleData(var data, StreamSink sink) { | 473 void _defaultHandleData(var data, EventSink sink) { |
| 474 sink.add(data); | 474 sink.add(data); |
| 475 } | 475 } |
| 476 | 476 |
| 477 /** Default error handler forwards all errors. */ | 477 /** Default error handler forwards all errors. */ |
| 478 void _defaultHandleError(AsyncError error, StreamSink sink) { | 478 void _defaultHandleError(AsyncError error, EventSink sink) { |
| 479 sink.signalError(error); | 479 sink.addError(error); |
| 480 } | 480 } |
| 481 | 481 |
| 482 /** Default done handler forwards done. */ | 482 /** Default done handler forwards done. */ |
| 483 void _defaultHandleDone(StreamSink sink) { | 483 void _defaultHandleDone(EventSink sink) { |
| 484 sink.close(); | 484 sink.close(); |
| 485 } | 485 } |
| 486 | 486 |
| 487 | 487 |
| 488 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ | |
| 489 class _StreamImplSink<T> implements StreamSink<T> { | |
| 490 _StreamImpl<T> _target; | |
| 491 _StreamImplSink(this._target); | |
| 492 void add(T data) { _target._add(data); } | |
| 493 void signalError(AsyncError error) { _target._signalError(error); } | |
| 494 void close() { _target._close(); } | |
| 495 } | |
| 496 | |
| 497 /** | 488 /** |
| 498 * A [StreamTransformer] that modifies stream events. | 489 * A [StreamTransformer] that modifies stream events. |
| 499 * | 490 * |
| 500 * This class is used by [StreamTransformer]'s factory constructor. | 491 * This class is used by [StreamTransformer]'s factory constructor. |
| 501 * It is actually an [StreamEventTransformer] where the functions used to | 492 * It is actually an [StreamEventTransformer] where the functions used to |
| 502 * modify the events are passed as constructor arguments. | 493 * modify the events are passed as constructor arguments. |
| 503 * | 494 * |
| 504 * If an argument is omitted, it acts as the default method from | 495 * If an argument is omitted, it acts as the default method from |
| 505 * [StreamEventTransformer]. | 496 * [StreamEventTransformer]. |
| 506 */ | 497 */ |
| 507 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { | 498 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { |
| 508 // TODO(ahe): Restore type when feature is implemented in dart2js | 499 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 509 // checked mode. http://dartbug.com/7733 | 500 // checked mode. http://dartbug.com/7733 |
| 510 final Function /*_TransformDataHandler<S, T>*/ _handleData; | 501 final Function /*_TransformDataHandler<S, T>*/ _handleData; |
| 511 final _TransformErrorHandler<T> _handleError; | 502 final _TransformErrorHandler<T> _handleError; |
| 512 final _TransformDoneHandler<T> _handleDone; | 503 final _TransformDoneHandler<T> _handleDone; |
| 513 | 504 |
| 514 _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink), | 505 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), |
| 515 void handleError(AsyncError data, StreamSink<T> sink), | 506 void handleError(AsyncError data, EventSink<T> sink), |
| 516 void handleDone(StreamSink<T> sink)) | 507 void handleDone(EventSink<T> sink)) |
| 517 : this._handleData = (handleData == null ? _defaultHandleData | 508 : this._handleData = (handleData == null ? _defaultHandleData |
| 518 : handleData), | 509 : handleData), |
| 519 this._handleError = (handleError == null ? _defaultHandleError | 510 this._handleError = (handleError == null ? _defaultHandleError |
| 520 : handleError), | 511 : handleError), |
| 521 this._handleDone = (handleDone == null ? _defaultHandleDone | 512 this._handleDone = (handleDone == null ? _defaultHandleDone |
| 522 : handleDone); | 513 : handleDone); |
| 523 | 514 |
| 524 void handleData(S data, StreamSink<T> sink) { | 515 void handleData(S data, EventSink<T> sink) { |
| 525 _handleData(data, sink); | 516 _handleData(data, sink); |
| 526 } | 517 } |
| 527 | 518 |
| 528 void handleError(AsyncError error, StreamSink<T> sink) { | 519 void handleError(AsyncError error, EventSink<T> sink) { |
| 529 _handleError(error, sink); | 520 _handleError(error, sink); |
| 530 } | 521 } |
| 531 | 522 |
| 532 void handleDone(StreamSink<T> sink) { | 523 void handleDone(EventSink<T> sink) { |
| 533 _handleDone(sink); | 524 _handleDone(sink); |
| 534 } | 525 } |
| 535 } | 526 } |
| 536 | 527 |
| OLD | NEW |