OLD | NEW |
1 // Copyright 2013 Google Inc. All Rights Reserved. | 1 // Copyright 2013 Google Inc. All Rights Reserved. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 part of quiver.async; | 15 part of quiver.async; |
16 | 16 |
17 /** | 17 /// Splits a [Stream] of events into multiple Streams based on a set of |
18 * Splits a [Stream] of events into multiple Streams based on a set of | 18 /// predicates. |
19 * predicates. | 19 /// |
20 * | 20 /// Using StreamRouter differs from [Stream.where] because events are only sent |
21 * Using StreamRouter differs from [Stream.where] because events are only sent | 21 /// to one Stream. If more than one predicate matches the event, the event is |
22 * to one Stream. If more than one predicate matches the event, the event is | 22 /// sent to the stream created by the earlier call to [route]. Events not |
23 * sent to the stream created by the earlier call to [route]. Events not matched | 23 /// matched by a call to [route] are sent to the [defaultStream]. |
24 * by a call to [route] are sent to the [defaultStream]. | 24 /// |
25 * | 25 /// Example: |
26 * Example: | 26 /// import 'dart:html'; |
27 * import 'dart:html'; | 27 /// import 'package:quiver/async.dart'; |
28 * import 'package:quiver/async.dart'; | 28 /// |
29 * | 29 /// var router = new StreamRouter(window.onClick); |
30 * var router = new StreamRouter(window.onClick); | 30 /// var onRightClick = router.route((e) => e.button == 2); |
31 * var onRightClick = router.route((e) => e.button == 2); | 31 /// var onAltClick = router.route((e) => e.altKey); |
32 * var onAltClick = router.route((e) => e.altKey); | 32 /// var onOtherClick router.defaultStream; |
33 * var onOtherClick router.defaultStream; | |
34 */ | |
35 class StreamRouter<T> { | 33 class StreamRouter<T> { |
36 final Stream<T> _incoming; | 34 final Stream<T> _incoming; |
37 StreamSubscription _subscription; | 35 StreamSubscription _subscription; |
38 | 36 |
39 final List<_Route> _routes = <_Route>[]; | 37 final List<_Route<T>> _routes = <_Route<T>>[]; |
40 final StreamController<T> _defaultController = | 38 final StreamController<T> _defaultController = |
41 new StreamController<T>.broadcast(); | 39 new StreamController<T>.broadcast(); |
42 | 40 |
43 /** | 41 /// Create a new StreamRouter that listens to the [incoming] stream. |
44 * Create a new StreamRouter that listens to the [incoming] stream. | |
45 */ | |
46 StreamRouter(Stream<T> incoming) : _incoming = incoming { | 42 StreamRouter(Stream<T> incoming) : _incoming = incoming { |
47 _subscription = _incoming.listen(_handle, onDone: close); | 43 _subscription = _incoming.listen(_handle, onDone: close); |
48 } | 44 } |
49 | 45 |
50 /** | 46 /// Events that match [predicate] are sent to the stream created by this |
51 * Events that match [predicate] are sent to the stream created by this | 47 /// method, and not sent to any other router streams. |
52 * method, and not sent to any other router streams. | |
53 */ | |
54 Stream<T> route(bool predicate(T event)) { | 48 Stream<T> route(bool predicate(T event)) { |
55 var controller = new StreamController<T>.broadcast(); | 49 var controller = new StreamController<T>.broadcast(); |
56 _routes.add(new _Route(predicate, controller)); | 50 _routes.add(new _Route(predicate, controller)); |
57 return controller.stream; | 51 return controller.stream; |
58 } | 52 } |
59 | 53 |
60 Stream<T> get defaultStream => _defaultController.stream; | 54 Stream<T> get defaultStream => _defaultController.stream; |
61 | 55 |
62 Future close() { | 56 Future close() { |
63 return Future.wait(_routes.map((r) => r.controller.close())).then((_) { | 57 return Future.wait(_routes.map((r) => r.controller.close())).then((_) { |
64 _subscription.cancel(); | 58 _subscription.cancel(); |
65 }); | 59 }); |
66 } | 60 } |
67 | 61 |
68 void _handle(T event) { | 62 void _handle(T event) { |
69 var route = | 63 var route = |
70 _routes.firstWhere((r) => r.predicate(event), orElse: () => null); | 64 _routes.firstWhere((r) => r.predicate(event), orElse: () => null); |
71 var controller = (route != null) ? route.controller : _defaultController; | 65 var controller = (route != null) ? route.controller : _defaultController; |
72 controller.add(event); | 66 controller.add(event); |
73 } | 67 } |
74 } | 68 } |
75 | 69 |
76 typedef bool _Predicate(event); | 70 typedef bool _Predicate(event); |
77 | 71 |
78 class _Route { | 72 class _Route<T> { |
79 final _Predicate predicate; | 73 final _Predicate predicate; |
80 final StreamController controller; | 74 final StreamController<T> controller; |
81 | 75 |
82 _Route(this.predicate, this.controller); | 76 _Route(this.predicate, this.controller); |
83 } | 77 } |
OLD | NEW |