| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 import 'future_group.dart'; | 7 import 'future_group.dart'; |
| 8 import 'result.dart'; | 8 import 'result.dart'; |
| 9 | 9 |
| 10 /// A class that splits a single source stream into an arbitrary number of | 10 /// A class that splits a single source stream into an arbitrary number of |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 50 /// Whether [_stream] is done emitting events. | 50 /// Whether [_stream] is done emitting events. |
| 51 var _isDone = false; | 51 var _isDone = false; |
| 52 | 52 |
| 53 /// Whether [close] has been called. | 53 /// Whether [close] has been called. |
| 54 var _isClosed = false; | 54 var _isClosed = false; |
| 55 | 55 |
| 56 /// Splits [stream] into [count] identical streams. | 56 /// Splits [stream] into [count] identical streams. |
| 57 /// | 57 /// |
| 58 /// [count] defaults to 2. This is the same as creating [count] branches and | 58 /// [count] defaults to 2. This is the same as creating [count] branches and |
| 59 /// then closing the [StreamSplitter]. | 59 /// then closing the [StreamSplitter]. |
| 60 static List<Stream> splitFrom(Stream stream, [int count]) { | 60 static List<Stream/*<T>*/> splitFrom/*<T>*/(Stream/*<T>*/ stream, |
| 61 [int count]) { |
| 61 if (count == null) count = 2; | 62 if (count == null) count = 2; |
| 62 var splitter = new StreamSplitter(stream); | 63 var splitter = new StreamSplitter/*<T>*/(stream); |
| 63 var streams = new List.generate(count, (_) => splitter.split()); | 64 var streams = new List<Stream>.generate(count, (_) => splitter.split()); |
| 64 splitter.close(); | 65 splitter.close(); |
| 65 return streams; | 66 return streams; |
| 66 } | 67 } |
| 67 | 68 |
| 68 StreamSplitter(this._stream); | 69 StreamSplitter(this._stream); |
| 69 | 70 |
| 70 /// Returns a single-subscription stream that's a copy of the input stream. | 71 /// Returns a single-subscription stream that's a copy of the input stream. |
| 71 /// | 72 /// |
| 72 /// This will throw a [StateError] if [close] has been called. | 73 /// This will throw a [StateError] if [close] has been called. |
| 73 Stream<T> split() { | 74 Stream<T> split() { |
| 74 if (_isClosed) { | 75 if (_isClosed) { |
| 75 throw new StateError("Can't call split() on a closed StreamSplitter."); | 76 throw new StateError("Can't call split() on a closed StreamSplitter."); |
| 76 } | 77 } |
| 77 | 78 |
| 78 var controller; | 79 var controller = new StreamController<T>( |
| 79 controller = new StreamController<T>( | |
| 80 onListen: _onListen, | 80 onListen: _onListen, |
| 81 onPause: _onPause, | 81 onPause: _onPause, |
| 82 onResume: _onResume, | 82 onResume: _onResume); |
| 83 onCancel: () => _onCancel(controller)); | 83 controller.onCancel = () => _onCancel(controller); |
| 84 | 84 |
| 85 for (var result in _buffer) { | 85 for (var result in _buffer) { |
| 86 result.addTo(controller); | 86 result.addTo(controller); |
| 87 } | 87 } |
| 88 | 88 |
| 89 if (_isDone) { | 89 if (_isDone) { |
| 90 _closeGroup.add(controller.close()); | 90 _closeGroup.add(controller.close()); |
| 91 } else { | 91 } else { |
| 92 _controllers.add(controller); | 92 _controllers.add(controller); |
| 93 } | 93 } |
| (...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 201 } | 201 } |
| 202 | 202 |
| 203 /// Marks [_controllers] as done. | 203 /// Marks [_controllers] as done. |
| 204 void _onDone() { | 204 void _onDone() { |
| 205 _isDone = true; | 205 _isDone = true; |
| 206 for (var controller in _controllers) { | 206 for (var controller in _controllers) { |
| 207 _closeGroup.add(controller.close()); | 207 _closeGroup.add(controller.close()); |
| 208 } | 208 } |
| 209 } | 209 } |
| 210 } | 210 } |
| OLD | NEW |