OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.single_subscription_transformer; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 | 6 |
9 /// A transformer that converts a broadcast stream into a single-subscription | 7 /// A transformer that converts a broadcast stream into a single-subscription |
10 /// stream. | 8 /// stream. |
11 /// | 9 /// |
12 /// This buffers the broadcast stream's events, which means that it starts | 10 /// This buffers the broadcast stream's events, which means that it starts |
13 /// listening to a stream as soon as it's bound. | 11 /// listening to a stream as soon as it's bound. |
14 class SingleSubscriptionTransformer<S, T> implements StreamTransformer<S, T> { | 12 class SingleSubscriptionTransformer<S, T> implements StreamTransformer<S, T> { |
15 const SingleSubscriptionTransformer(); | 13 const SingleSubscriptionTransformer(); |
16 | 14 |
17 Stream<T> bind(Stream<S> stream) { | 15 Stream<T> bind(Stream<S> stream) { |
18 var subscription; | 16 var subscription; |
19 var controller = new StreamController(sync: true, | 17 var controller = new StreamController(sync: true, |
20 onCancel: () => subscription.cancel()); | 18 onCancel: () => subscription.cancel()); |
21 subscription = stream.listen(controller.add, | 19 subscription = stream.listen(controller.add, |
22 onError: controller.addError, onDone: controller.close); | 20 onError: controller.addError, onDone: controller.close); |
23 return controller.stream; | 21 return controller.stream; |
24 } | 22 } |
25 } | 23 } |
OLD | NEW |