OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
8 | 8 |
9 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
10 | 10 |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
120 _inner.add(data); | 120 _inner.add(data); |
121 } | 121 } |
122 | 122 |
123 void addError(error, [StackTrace stackTrace]) { | 123 void addError(error, [StackTrace stackTrace]) { |
124 if (_closed) throw new StateError("Cannot add event after closing."); | 124 if (_closed) throw new StateError("Cannot add event after closing."); |
125 if (_inAddStream) { | 125 if (_inAddStream) { |
126 throw new StateError("Cannot add event while adding stream."); | 126 throw new StateError("Cannot add event while adding stream."); |
127 } | 127 } |
128 if (_disconnected) return; | 128 if (_disconnected) return; |
129 | 129 |
| 130 _addError(error, stackTrace); |
| 131 } |
| 132 |
| 133 /// Like [addError], but doesn't check to ensure that an error can be added. |
| 134 /// |
| 135 /// This is called from [addStream], so it shouldn't fail if a stream is being |
| 136 /// added. |
| 137 void _addError(error, [StackTrace stackTrace]) { |
130 if (_allowErrors) { | 138 if (_allowErrors) { |
131 _inner.addError(error, stackTrace); | 139 _inner.addError(error, stackTrace); |
132 return; | 140 return; |
133 } | 141 } |
134 | 142 |
135 _doneCompleter.completeError(error, stackTrace); | 143 _doneCompleter.completeError(error, stackTrace); |
136 | 144 |
137 // Treat an error like both the stream and sink disconnecting. | 145 // Treat an error like both the stream and sink disconnecting. |
138 _onStreamDisconnected(); | 146 _onStreamDisconnected(); |
139 _channel._onSinkDisconnected(); | 147 _channel._onSinkDisconnected(); |
140 | 148 |
141 // Ignore errors from the inner sink. We're already surfacing one error, and | 149 // Ignore errors from the inner sink. We're already surfacing one error, and |
142 // if the user handles it we don't want them to have another top-level. | 150 // if the user handles it we don't want them to have another top-level. |
143 _inner.close().catchError((_) {}); | 151 _inner.close().catchError((_) {}); |
144 } | 152 } |
145 | 153 |
146 Future addStream(Stream<T> stream) { | 154 Future addStream(Stream<T> stream) { |
147 if (_closed) throw new StateError("Cannot add stream after closing."); | 155 if (_closed) throw new StateError("Cannot add stream after closing."); |
148 if (_inAddStream) { | 156 if (_inAddStream) { |
149 throw new StateError("Cannot add stream while adding stream."); | 157 throw new StateError("Cannot add stream while adding stream."); |
150 } | 158 } |
151 if (_disconnected) return new Future.value(); | 159 if (_disconnected) return new Future.value(); |
152 | 160 |
153 _addStreamCompleter = new Completer.sync(); | 161 _addStreamCompleter = new Completer.sync(); |
154 _addStreamSubscription = stream.listen( | 162 _addStreamSubscription = stream.listen( |
155 _inner.add, | 163 _inner.add, |
156 onError: _inner.addError, | 164 onError: _addError, |
157 onDone: _addStreamCompleter.complete); | 165 onDone: _addStreamCompleter.complete); |
158 return _addStreamCompleter.future.then((_) { | 166 return _addStreamCompleter.future.then((_) { |
159 _addStreamCompleter = null; | 167 _addStreamCompleter = null; |
160 _addStreamSubscription = null; | 168 _addStreamSubscription = null; |
161 }); | 169 }); |
162 } | 170 } |
163 | 171 |
164 Future close() { | 172 Future close() { |
165 if (_inAddStream) { | 173 if (_inAddStream) { |
166 throw new StateError("Cannot close sink while adding stream."); | 174 throw new StateError("Cannot close sink while adding stream."); |
(...skipping 17 matching lines...) Expand all Loading... |
184 void _onStreamDisconnected() { | 192 void _onStreamDisconnected() { |
185 _disconnected = true; | 193 _disconnected = true; |
186 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); | 194 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
187 | 195 |
188 if (!_inAddStream) return; | 196 if (!_inAddStream) return; |
189 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | 197 _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
190 _addStreamCompleter = null; | 198 _addStreamCompleter = null; |
191 _addStreamSubscription = null; | 199 _addStreamSubscription = null; |
192 } | 200 } |
193 } | 201 } |
OLD | NEW |