| OLD | NEW |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
| 8 | 8 |
| 9 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
| 10 | 10 |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 120 _inner.add(data); | 120 _inner.add(data); |
| 121 } | 121 } |
| 122 | 122 |
| 123 void addError(error, [StackTrace stackTrace]) { | 123 void addError(error, [StackTrace stackTrace]) { |
| 124 if (_closed) throw new StateError("Cannot add event after closing."); | 124 if (_closed) throw new StateError("Cannot add event after closing."); |
| 125 if (_inAddStream) { | 125 if (_inAddStream) { |
| 126 throw new StateError("Cannot add event while adding stream."); | 126 throw new StateError("Cannot add event while adding stream."); |
| 127 } | 127 } |
| 128 if (_disconnected) return; | 128 if (_disconnected) return; |
| 129 | 129 |
| 130 _addError(error, stackTrace); |
| 131 } |
| 132 |
| 133 /// Like [addError], but doesn't check to ensure that an error can be added. |
| 134 /// |
| 135 /// This is called from [addStream], so it shouldn't fail if a stream is being |
| 136 /// added. |
| 137 void _addError(error, [StackTrace stackTrace]) { |
| 130 if (_allowErrors) { | 138 if (_allowErrors) { |
| 131 _inner.addError(error, stackTrace); | 139 _inner.addError(error, stackTrace); |
| 132 return; | 140 return; |
| 133 } | 141 } |
| 134 | 142 |
| 135 _doneCompleter.completeError(error, stackTrace); | 143 _doneCompleter.completeError(error, stackTrace); |
| 136 | 144 |
| 137 // Treat an error like both the stream and sink disconnecting. | 145 // Treat an error like both the stream and sink disconnecting. |
| 138 _onStreamDisconnected(); | 146 _onStreamDisconnected(); |
| 139 _channel._onSinkDisconnected(); | 147 _channel._onSinkDisconnected(); |
| 140 | 148 |
| 141 // Ignore errors from the inner sink. We're already surfacing one error, and | 149 // Ignore errors from the inner sink. We're already surfacing one error, and |
| 142 // if the user handles it we don't want them to have another top-level. | 150 // if the user handles it we don't want them to have another top-level. |
| 143 _inner.close().catchError((_) {}); | 151 _inner.close().catchError((_) {}); |
| 144 } | 152 } |
| 145 | 153 |
| 146 Future addStream(Stream<T> stream) { | 154 Future addStream(Stream<T> stream) { |
| 147 if (_closed) throw new StateError("Cannot add stream after closing."); | 155 if (_closed) throw new StateError("Cannot add stream after closing."); |
| 148 if (_inAddStream) { | 156 if (_inAddStream) { |
| 149 throw new StateError("Cannot add stream while adding stream."); | 157 throw new StateError("Cannot add stream while adding stream."); |
| 150 } | 158 } |
| 151 if (_disconnected) return new Future.value(); | 159 if (_disconnected) return new Future.value(); |
| 152 | 160 |
| 153 _addStreamCompleter = new Completer.sync(); | 161 _addStreamCompleter = new Completer.sync(); |
| 154 _addStreamSubscription = stream.listen( | 162 _addStreamSubscription = stream.listen( |
| 155 _inner.add, | 163 _inner.add, |
| 156 onError: _inner.addError, | 164 onError: _addError, |
| 157 onDone: _addStreamCompleter.complete); | 165 onDone: _addStreamCompleter.complete); |
| 158 return _addStreamCompleter.future.then((_) { | 166 return _addStreamCompleter.future.then((_) { |
| 159 _addStreamCompleter = null; | 167 _addStreamCompleter = null; |
| 160 _addStreamSubscription = null; | 168 _addStreamSubscription = null; |
| 161 }); | 169 }); |
| 162 } | 170 } |
| 163 | 171 |
| 164 Future close() { | 172 Future close() { |
| 165 if (_inAddStream) { | 173 if (_inAddStream) { |
| 166 throw new StateError("Cannot close sink while adding stream."); | 174 throw new StateError("Cannot close sink while adding stream."); |
| (...skipping 17 matching lines...) Expand all Loading... |
| 184 void _onStreamDisconnected() { | 192 void _onStreamDisconnected() { |
| 185 _disconnected = true; | 193 _disconnected = true; |
| 186 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | 194 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
| 187 | 195 |
| 188 if (!_inAddStream) return; | 196 if (!_inAddStream) return; |
| 189 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | 197 _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
| 190 _addStreamCompleter = null; | 198 _addStreamCompleter = null; |
| 191 _addStreamSubscription = null; | 199 _addStreamSubscription = null; |
| 192 } | 200 } |
| 193 } | 201 } |
| OLD | NEW |