OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1158 * If an error event occurs before the value is found, the future completes | 1158 * If an error event occurs before the value is found, the future completes |
1159 * with this error. | 1159 * with this error. |
1160 * | 1160 * |
1161 * If a done event occurs before the value is found, the future completes | 1161 * If a done event occurs before the value is found, the future completes |
1162 * with a [RangeError]. | 1162 * with a [RangeError]. |
1163 */ | 1163 */ |
1164 Future<T> elementAt(int index) { | 1164 Future<T> elementAt(int index) { |
1165 if (index is! int || index < 0) throw new ArgumentError(index); | 1165 if (index is! int || index < 0) throw new ArgumentError(index); |
1166 _Future<T> future = new _Future<T>(); | 1166 _Future<T> future = new _Future<T>(); |
1167 StreamSubscription subscription; | 1167 StreamSubscription subscription; |
| 1168 int elementIndex = 0; |
1168 subscription = this.listen( | 1169 subscription = this.listen( |
1169 (T value) { | 1170 (T value) { |
1170 if (index == 0) { | 1171 if (index == elementIndex) { |
1171 _cancelAndValue(subscription, future, value); | 1172 _cancelAndValue(subscription, future, value); |
1172 return; | 1173 return; |
1173 } | 1174 } |
1174 index -= 1; | 1175 elementIndex += 1; |
1175 }, | 1176 }, |
1176 onError: future._completeError, | 1177 onError: future._completeError, |
1177 onDone: () { | 1178 onDone: () { |
1178 future._completeError(new RangeError.value(index)); | 1179 future._completeError( |
| 1180 new RangeError.index(index, this, "index", null, elementIndex)); |
1179 }, | 1181 }, |
1180 cancelOnError: true); | 1182 cancelOnError: true); |
1181 return future; | 1183 return future; |
1182 } | 1184 } |
1183 | 1185 |
1184 /** | 1186 /** |
1185 * Creates a new stream with the same events as this stream. | 1187 * Creates a new stream with the same events as this stream. |
1186 * | 1188 * |
1187 * Whenever more than [timeLimit] passes between two events from this stream, | 1189 * Whenever more than [timeLimit] passes between two events from this stream, |
1188 * the [onTimeout] function is called. | 1190 * the [onTimeout] function is called. |
(...skipping 468 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1657 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1659 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1658 EventSink _sink; | 1660 EventSink _sink; |
1659 _ControllerEventSinkWrapper(this._sink); | 1661 _ControllerEventSinkWrapper(this._sink); |
1660 | 1662 |
1661 void add(T data) { _sink.add(data); } | 1663 void add(T data) { _sink.add(data); } |
1662 void addError(error, [StackTrace stackTrace]) { | 1664 void addError(error, [StackTrace stackTrace]) { |
1663 _sink.addError(error, stackTrace); | 1665 _sink.addError(error, stackTrace); |
1664 } | 1666 } |
1665 void close() { _sink.close(); } | 1667 void close() { _sink.close(); } |
1666 } | 1668 } |
OLD | NEW |