OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 299 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
310 * | 310 * |
311 * The returned stream is a broadcast stream if this stream is. | 311 * The returned stream is a broadcast stream if this stream is. |
312 */ | 312 */ |
313 Stream asyncMap(convert(T event)) { | 313 Stream asyncMap(convert(T event)) { |
314 StreamController controller; | 314 StreamController controller; |
315 StreamSubscription subscription; | 315 StreamSubscription subscription; |
316 void onListen () { | 316 void onListen () { |
317 final add = controller.add; | 317 final add = controller.add; |
318 assert(controller is _StreamController || | 318 assert(controller is _StreamController || |
319 controller is _BroadcastStreamController); | 319 controller is _BroadcastStreamController); |
320 final eventSink = controller; | 320 final eventSink = controller as _EventSink<T>; |
321 final addError = eventSink._addError; | 321 final addError = eventSink._addError; |
322 subscription = this.listen( | 322 subscription = this.listen( |
323 (T event) { | 323 (T event) { |
324 var newValue; | 324 var newValue; |
325 try { | 325 try { |
326 newValue = convert(event); | 326 newValue = convert(event); |
327 } catch (e, s) { | 327 } catch (e, s) { |
328 controller.addError(e, s); | 328 controller.addError(e, s); |
329 return; | 329 return; |
330 } | 330 } |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
370 * just as if it returned an empty stream. | 370 * just as if it returned an empty stream. |
371 * | 371 * |
372 * The returned stream is a broadcast stream if this stream is. | 372 * The returned stream is a broadcast stream if this stream is. |
373 */ | 373 */ |
374 Stream asyncExpand(Stream convert(T event)) { | 374 Stream asyncExpand(Stream convert(T event)) { |
375 StreamController controller; | 375 StreamController controller; |
376 StreamSubscription subscription; | 376 StreamSubscription subscription; |
377 void onListen() { | 377 void onListen() { |
378 assert(controller is _StreamController || | 378 assert(controller is _StreamController || |
379 controller is _BroadcastStreamController); | 379 controller is _BroadcastStreamController); |
380 final eventSink = controller; | 380 final eventSink = controller as _EventSink<T>; |
381 subscription = this.listen( | 381 subscription = this.listen( |
382 (T event) { | 382 (T event) { |
383 Stream newStream; | 383 Stream newStream; |
384 try { | 384 try { |
385 newStream = convert(event); | 385 newStream = convert(event); |
386 } catch (e, s) { | 386 } catch (e, s) { |
387 controller.addError(e, s); | 387 controller.addError(e, s); |
388 return; | 388 return; |
389 } | 389 } |
390 if (newStream != null) { | 390 if (newStream != null) { |
(...skipping 825 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1216 | 1216 |
1217 void onData(T event) { | 1217 void onData(T event) { |
1218 timer.cancel(); | 1218 timer.cancel(); |
1219 controller.add(event); | 1219 controller.add(event); |
1220 timer = zone.createTimer(timeLimit, timeout); | 1220 timer = zone.createTimer(timeLimit, timeout); |
1221 } | 1221 } |
1222 void onError(error, StackTrace stackTrace) { | 1222 void onError(error, StackTrace stackTrace) { |
1223 timer.cancel(); | 1223 timer.cancel(); |
1224 assert(controller is _StreamController || | 1224 assert(controller is _StreamController || |
1225 controller is _BroadcastStreamController); | 1225 controller is _BroadcastStreamController); |
1226 var eventSink = controller; | 1226 var eventSink = controller as _EventSink<T>; |
1227 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. | 1227 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
1228 timer = zone.createTimer(timeLimit, timeout); | 1228 timer = zone.createTimer(timeLimit, timeout); |
1229 } | 1229 } |
1230 void onDone() { | 1230 void onDone() { |
1231 timer.cancel(); | 1231 timer.cancel(); |
1232 controller.close(); | 1232 controller.close(); |
1233 } | 1233 } |
1234 void onListen() { | 1234 void onListen() { |
1235 // This is the onListen callback for of controller. | 1235 // This is the onListen callback for of controller. |
1236 // It runs in the same zone that the subscription was created in. | 1236 // It runs in the same zone that the subscription was created in. |
(...skipping 423 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1660 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1660 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1661 EventSink _sink; | 1661 EventSink _sink; |
1662 _ControllerEventSinkWrapper(this._sink); | 1662 _ControllerEventSinkWrapper(this._sink); |
1663 | 1663 |
1664 void add(T data) { _sink.add(data); } | 1664 void add(T data) { _sink.add(data); } |
1665 void addError(error, [StackTrace stackTrace]) { | 1665 void addError(error, [StackTrace stackTrace]) { |
1666 _sink.addError(error, stackTrace); | 1666 _sink.addError(error, stackTrace); |
1667 } | 1667 } |
1668 void close() { _sink.close(); } | 1668 void close() { _sink.close(); } |
1669 } | 1669 } |
OLD | NEW |