| OLD | NEW |
| 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Test merging streams. | 5 // Test merging streams. |
| 6 library merge_stream_test; | 6 library merge_stream_test; |
| 7 | 7 |
| 8 import "dart:async"; | 8 import "dart:async"; |
| 9 import '../../../pkg/unittest/lib/unittest.dart'; | 9 import '../../../pkg/unittest/lib/unittest.dart'; |
| 10 import 'event_helper.dart'; | 10 import 'event_helper.dart'; |
| 11 | 11 |
| 12 testSupercedeStream() { | 12 testSupercedeStream() { |
| 13 { // Simple case of superceding lower priority streams. | 13 { // Simple case of superceding lower priority streams. |
| 14 StreamController s1 = new StreamController.multiSubscription(); | 14 StreamController s1 = new StreamController.broadcast(); |
| 15 StreamController s2 = new StreamController.multiSubscription(); | 15 StreamController s2 = new StreamController.broadcast(); |
| 16 StreamController s3 = new StreamController.multiSubscription(); | 16 StreamController s3 = new StreamController.broadcast(); |
| 17 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 17 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
| 18 Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close(); | 18 Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close(); |
| 19 Events actual = new Events.capture(merge); | 19 Events actual = new Events.capture(merge); |
| 20 s1.add(1); | 20 s1.add(1); |
| 21 s2.add(2); | 21 s2.add(2); |
| 22 s1.add(1); // Ignored. | 22 s1.add(1); // Ignored. |
| 23 s2.add(3); | 23 s2.add(3); |
| 24 s3.add(4); | 24 s3.add(4); |
| 25 s2.add(3); // Ignored. | 25 s2.add(3); // Ignored. |
| 26 s3.close(); | 26 s3.close(); |
| 27 Expect.listEquals(expected.events, actual.events); | 27 Expect.listEquals(expected.events, actual.events); |
| 28 } | 28 } |
| 29 | 29 |
| 30 { // Superceding more than one stream at a time. | 30 { // Superceding more than one stream at a time. |
| 31 StreamController s1 = new StreamController.multiSubscription(); | 31 StreamController s1 = new StreamController.broadcast(); |
| 32 StreamController s2 = new StreamController.multiSubscription(); | 32 StreamController s2 = new StreamController.broadcast(); |
| 33 StreamController s3 = new StreamController.multiSubscription(); | 33 StreamController s3 = new StreamController.broadcast(); |
| 34 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 34 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
| 35 Events expected = new Events()..add(1)..add(2)..close(); | 35 Events expected = new Events()..add(1)..add(2)..close(); |
| 36 Events actual = new Events.capture(merge); | 36 Events actual = new Events.capture(merge); |
| 37 s1.add(1); | 37 s1.add(1); |
| 38 s3.add(2); | 38 s3.add(2); |
| 39 s1.add(1); // Ignored. | 39 s1.add(1); // Ignored. |
| 40 s2.add(1); // Ignored. | 40 s2.add(1); // Ignored. |
| 41 s3.close(); | 41 s3.close(); |
| 42 Expect.listEquals(expected.events, actual.events); | 42 Expect.listEquals(expected.events, actual.events); |
| 43 } | 43 } |
| 44 | 44 |
| 45 { // Closing a stream before superceding it. | 45 { // Closing a stream before superceding it. |
| 46 StreamController s1 = new StreamController.multiSubscription(); | 46 StreamController s1 = new StreamController.broadcast(); |
| 47 StreamController s2 = new StreamController.multiSubscription(); | 47 StreamController s2 = new StreamController.broadcast(); |
| 48 StreamController s3 = new StreamController.multiSubscription(); | 48 StreamController s3 = new StreamController.broadcast(); |
| 49 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 49 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
| 50 Events expected = new Events()..add(1)..add(2)..add(3)..close(); | 50 Events expected = new Events()..add(1)..add(2)..add(3)..close(); |
| 51 Events actual = new Events.capture(merge); | 51 Events actual = new Events.capture(merge); |
| 52 s1.add(1); | 52 s1.add(1); |
| 53 s1.close(); | 53 s1.close(); |
| 54 s3.close(); | 54 s3.close(); |
| 55 s2.add(2); | 55 s2.add(2); |
| 56 s2.add(3); | 56 s2.add(3); |
| 57 s2.close(); | 57 s2.close(); |
| 58 Expect.listEquals(expected.events, actual.events); | 58 Expect.listEquals(expected.events, actual.events); |
| 59 } | 59 } |
| 60 | 60 |
| 61 { // Errors from all non-superceded streams are forwarded. | 61 { // Errors from all non-superceded streams are forwarded. |
| 62 StreamController s1 = new StreamController.multiSubscription(); | 62 StreamController s1 = new StreamController.broadcast(); |
| 63 StreamController s2 = new StreamController.multiSubscription(); | 63 StreamController s2 = new StreamController.broadcast(); |
| 64 StreamController s3 = new StreamController.multiSubscription(); | 64 StreamController s3 = new StreamController.broadcast(); |
| 65 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 65 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
| 66 Events expected = | 66 Events expected = |
| 67 new Events()..add(1)..error("1")..error("2")..error("3") | 67 new Events()..add(1)..error("1")..error("2")..error("3") |
| 68 ..add(3)..error("6")..add(4)..close(); | 68 ..add(3)..error("6")..add(4)..close(); |
| 69 Events actual = new Events.capture(merge); | 69 Events actual = new Events.capture(merge); |
| 70 s1.add(1); | 70 s1.add(1); |
| 71 s1.signalError(new AsyncError("1")); | 71 s1.signalError(new AsyncError("1")); |
| 72 s2.signalError(new AsyncError("2")); | 72 s2.signalError(new AsyncError("2")); |
| 73 s3.signalError(new AsyncError("3")); | 73 s3.signalError(new AsyncError("3")); |
| 74 s3.add(3); | 74 s3.add(3); |
| 75 s1.signalError(new AsyncError("4")); | 75 s1.signalError(new AsyncError("4")); |
| 76 s2.signalError(new AsyncError("5")); | 76 s2.signalError(new AsyncError("5")); |
| 77 s3.signalError(new AsyncError("6")); | 77 s3.signalError(new AsyncError("6")); |
| 78 s1.close(); | 78 s1.close(); |
| 79 s2.close(); | 79 s2.close(); |
| 80 s3.add(4); | 80 s3.add(4); |
| 81 s3.close(); | 81 s3.close(); |
| 82 Expect.listEquals(expected.events, actual.events); | 82 Expect.listEquals(expected.events, actual.events); |
| 83 } | 83 } |
| 84 | 84 |
| 85 test("Pausing on a superceding stream", () { | 85 test("Pausing on a superceding stream", () { |
| 86 StreamController s1 = new StreamController.multiSubscription(); | 86 StreamController s1 = new StreamController.broadcast(); |
| 87 StreamController s2 = new StreamController.multiSubscription(); | 87 StreamController s2 = new StreamController.broadcast(); |
| 88 StreamController s3 = new StreamController.multiSubscription(); | 88 StreamController s3 = new StreamController.broadcast(); |
| 89 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); | 89 Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]); |
| 90 Events expected = new Events()..add(1)..add(2)..add(3); | 90 Events expected = new Events()..add(1)..add(2)..add(3); |
| 91 Events actual = new Events.capture(merge); | 91 Events actual = new Events.capture(merge); |
| 92 s1.add(1); | 92 s1.add(1); |
| 93 s2.add(2); | 93 s2.add(2); |
| 94 s2.add(3); | 94 s2.add(3); |
| 95 Expect.listEquals(expected.events, actual.events); | 95 Expect.listEquals(expected.events, actual.events); |
| 96 actual.pause(); // Pauses the stream that feeds the actual Events. | 96 actual.pause(); // Pauses the stream that feeds the actual Events. |
| 97 Events expected2 = expected.copy(); | 97 Events expected2 = expected.copy(); |
| 98 expected..add(5)..add(6)..close(); | 98 expected..add(5)..add(6)..close(); |
| 99 expected2..add(6)..close(); | 99 expected2..add(6)..close(); |
| 100 s1.add(4); | 100 s1.add(4); |
| 101 s2.add(5); // May or may not arrive before '6' when resuming. | 101 s2.add(5); // May or may not arrive before '6' when resuming. |
| 102 s3.add(6); | 102 s3.add(6); |
| 103 s3.close(); | 103 s3.close(); |
| 104 actual.onDone(expectAsync0(() { | 104 actual.onDone(expectAsync0(() { |
| 105 if (expected.events.length == actual.events.length) { | 105 if (expected.events.length == actual.events.length) { |
| 106 Expect.listEquals(expected.events, actual.events); | 106 Expect.listEquals(expected.events, actual.events); |
| 107 } else { | 107 } else { |
| 108 Expect.listEquals(expected2.events, actual.events); | 108 Expect.listEquals(expected2.events, actual.events); |
| 109 } | 109 } |
| 110 })); | 110 })); |
| 111 actual.resume(); | 111 actual.resume(); |
| 112 }); | 112 }); |
| 113 } | 113 } |
| 114 | 114 |
| 115 void testCyclicStream() { | 115 void testCyclicStream() { |
| 116 test("Simple case of superceding lower priority streams", () { | 116 test("Simple case of superceding lower priority streams", () { |
| 117 StreamController s1 = new StreamController.multiSubscription(); | 117 StreamController s1 = new StreamController.broadcast(); |
| 118 StreamController s2 = new StreamController.multiSubscription(); | 118 StreamController s2 = new StreamController.broadcast(); |
| 119 StreamController s3 = new StreamController.multiSubscription(); | 119 StreamController s3 = new StreamController.broadcast(); |
| 120 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); | 120 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); |
| 121 Events expected = | 121 Events expected = |
| 122 new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close(); | 122 new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close(); |
| 123 Events actual = new Events.capture(merge); | 123 Events actual = new Events.capture(merge); |
| 124 Expect.isFalse(s1.isPaused); | 124 Expect.isFalse(s1.isPaused); |
| 125 Expect.isTrue(s2.isPaused); | 125 Expect.isTrue(s2.isPaused); |
| 126 Expect.isTrue(s3.isPaused); | 126 Expect.isTrue(s3.isPaused); |
| 127 s3.add(3); | 127 s3.add(3); |
| 128 s1.add(1); | 128 s1.add(1); |
| 129 s1.add(4); | 129 s1.add(4); |
| 130 s1.add(6); | 130 s1.add(6); |
| 131 s1.close(); | 131 s1.close(); |
| 132 s2.add(2); | 132 s2.add(2); |
| 133 s2.add(5); | 133 s2.add(5); |
| 134 s2.close(); | 134 s2.close(); |
| 135 s3.close(); | 135 s3.close(); |
| 136 actual.onDone(expectAsync0(() { | 136 actual.onDone(expectAsync0(() { |
| 137 Expect.listEquals(expected.events, actual.events); | 137 Expect.listEquals(expected.events, actual.events); |
| 138 })); | 138 })); |
| 139 }); | 139 }); |
| 140 | 140 |
| 141 test("Cyclic merge with errors", () { | 141 test("Cyclic merge with errors", () { |
| 142 StreamController s1 = new StreamController.multiSubscription(); | 142 StreamController s1 = new StreamController.broadcast(); |
| 143 StreamController s2 = new StreamController.multiSubscription(); | 143 StreamController s2 = new StreamController.broadcast(); |
| 144 StreamController s3 = new StreamController.multiSubscription(); | 144 StreamController s3 = new StreamController.broadcast(); |
| 145 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); | 145 Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]); |
| 146 Events expected = | 146 Events expected = |
| 147 new Events()..add(1)..error("1")..add(2)..add(3)..error("2") | 147 new Events()..add(1)..error("1")..add(2)..add(3)..error("2") |
| 148 ..add(4)..add(5)..error("3")..add(6)..close(); | 148 ..add(4)..add(5)..error("3")..add(6)..close(); |
| 149 Events actual = new Events.capture(merge); | 149 Events actual = new Events.capture(merge); |
| 150 Expect.isFalse(s1.isPaused); | 150 Expect.isFalse(s1.isPaused); |
| 151 Expect.isTrue(s2.isPaused); | 151 Expect.isTrue(s2.isPaused); |
| 152 Expect.isTrue(s3.isPaused); | 152 Expect.isTrue(s3.isPaused); |
| 153 s3.add(3); | 153 s3.add(3); |
| 154 s3.signalError(new AsyncError("3")); // Error just before a "done". | 154 s3.signalError(new AsyncError("3")); // Error just before a "done". |
| (...skipping 10 matching lines...) Expand all Loading... |
| 165 actual.onDone(expectAsync0(() { | 165 actual.onDone(expectAsync0(() { |
| 166 Expect.listEquals(expected.events, actual.events); | 166 Expect.listEquals(expected.events, actual.events); |
| 167 })); | 167 })); |
| 168 }); | 168 }); |
| 169 } | 169 } |
| 170 | 170 |
| 171 main() { | 171 main() { |
| 172 testSupercedeStream(); | 172 testSupercedeStream(); |
| 173 testCyclicStream(); | 173 testCyclicStream(); |
| 174 } | 174 } |
| OLD | NEW |