OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library http_multi_server.utils; | 5 library http_multi_server.utils; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:io'; | 8 import 'dart:io'; |
9 | 9 |
10 // TODO(nweiz): Revert this to the version of [mergeStreams] found elsewhere in | |
11 // the repo once issue 19815 is fixed in dart:io. | |
12 /// Merges all streams in [streams] into a single stream that emits all of their | 10 /// Merges all streams in [streams] into a single stream that emits all of their |
13 /// values. | 11 /// values. |
14 /// | 12 /// |
15 /// The returned stream will be closed only when every stream in [streams] is | 13 /// The returned stream will be closed only when every stream in [streams] is |
16 /// closed. | 14 /// closed. |
17 Stream mergeStreams(Iterable<Stream> streams) { | 15 Stream mergeStreams(Iterable<Stream> streams) { |
18 var subscriptions = new Set(); | 16 var subscriptions = new Set(); |
19 var controller; | 17 var controller; |
20 controller = new StreamController(onListen: () { | 18 controller = new StreamController(onListen: () { |
21 for (var stream in streams) { | 19 for (var stream in streams) { |
22 var subscription; | 20 var subscription; |
23 subscription = stream.listen(controller.add, onError: (error, trace) { | 21 subscription = stream.listen(controller.add, |
24 if (subscriptions.length == 1) { | 22 onError: controller.addError, |
25 // If the last subscription errored, pass it on. | 23 onDone: () { |
26 controller.addError(error, trace); | |
27 } else { | |
28 // If only one of the subscriptions has an error (usually IPv6 failing | |
29 // late), then just remove that subscription and ignore the error. | |
30 subscriptions.remove(subscription); | |
31 subscription.cancel(); | |
32 } | |
33 }, onDone: () { | |
34 subscriptions.remove(subscription); | 24 subscriptions.remove(subscription); |
35 if (subscriptions.isEmpty) controller.close(); | 25 if (subscriptions.isEmpty) controller.close(); |
36 }); | 26 }); |
37 subscriptions.add(subscription); | 27 subscriptions.add(subscription); |
38 } | 28 } |
39 }, onCancel: () { | 29 }, onCancel: () { |
40 for (var subscription in subscriptions) { | 30 for (var subscription in subscriptions) { |
41 subscription.cancel(); | 31 subscription.cancel(); |
42 } | 32 } |
43 }, onPause: () { | 33 }, onPause: () { |
44 for (var subscription in subscriptions) { | 34 for (var subscription in subscriptions) { |
45 subscription.pause(); | 35 subscription.pause(); |
46 } | 36 } |
47 }, onResume: () { | 37 }, onResume: () { |
48 for (var subscription in subscriptions) { | 38 for (var subscription in subscriptions) { |
49 subscription.resume(); | 39 subscription.resume(); |
50 } | 40 } |
51 }, sync: true); | 41 }, sync: true); |
52 | 42 |
53 return controller.stream; | 43 return controller.stream; |
54 } | 44 } |
| 45 |
| 46 /// A cache for [supportsIpV6]. |
| 47 bool _supportsIpV6; |
| 48 |
| 49 /// Returns whether this computer supports binding to IPv6 addresses. |
| 50 Future<bool> get supportsIpV6 { |
| 51 if (_supportsIpV6 != null) return new Future.value(_supportsIpV6); |
| 52 |
| 53 return ServerSocket.bind(InternetAddress.LOOPBACK_IP_V6, 0).then((socket) { |
| 54 _supportsIpV6 = true; |
| 55 socket.close(); |
| 56 return true; |
| 57 }).catchError((error) { |
| 58 if (error is! SocketException) throw error; |
| 59 _supportsIpV6 = false; |
| 60 return false; |
| 61 }); |
| 62 } |
OLD | NEW |