Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Side by Side Diff: lib/src/guarantee_channel.dart

Issue 1671763002: Make IsolateChannel use StreamChannelCompleter. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | lib/src/isolate_channel.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'package:async/async.dart'; 7 import 'package:async/async.dart';
8 8
9 import '../stream_channel.dart'; 9 import '../stream_channel.dart';
10 10
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
120 _inner.add(data); 120 _inner.add(data);
121 } 121 }
122 122
123 void addError(error, [StackTrace stackTrace]) { 123 void addError(error, [StackTrace stackTrace]) {
124 if (_closed) throw new StateError("Cannot add event after closing."); 124 if (_closed) throw new StateError("Cannot add event after closing.");
125 if (_inAddStream) { 125 if (_inAddStream) {
126 throw new StateError("Cannot add event while adding stream."); 126 throw new StateError("Cannot add event while adding stream.");
127 } 127 }
128 if (_disconnected) return; 128 if (_disconnected) return;
129 129
130 _addError(error, stackTrace);
131 }
132
133 /// Like [addError], but doesn't check to ensure that an error can be added.
134 ///
135 /// This is called from [addStream], so it shouldn't fail if a stream is being
136 /// added.
137 void _addError(error, [StackTrace stackTrace]) {
130 if (_allowErrors) { 138 if (_allowErrors) {
131 _inner.addError(error, stackTrace); 139 _inner.addError(error, stackTrace);
132 return; 140 return;
133 } 141 }
134 142
135 _doneCompleter.completeError(error, stackTrace); 143 _doneCompleter.completeError(error, stackTrace);
136 144
137 // Treat an error like both the stream and sink disconnecting. 145 // Treat an error like both the stream and sink disconnecting.
138 _onStreamDisconnected(); 146 _onStreamDisconnected();
139 _channel._onSinkDisconnected(); 147 _channel._onSinkDisconnected();
140 148
141 // Ignore errors from the inner sink. We're already surfacing one error, and 149 // Ignore errors from the inner sink. We're already surfacing one error, and
142 // if the user handles it we don't want them to have another top-level. 150 // if the user handles it we don't want them to have another top-level.
143 _inner.close().catchError((_) {}); 151 _inner.close().catchError((_) {});
144 } 152 }
145 153
146 Future addStream(Stream<T> stream) { 154 Future addStream(Stream<T> stream) {
147 if (_closed) throw new StateError("Cannot add stream after closing."); 155 if (_closed) throw new StateError("Cannot add stream after closing.");
148 if (_inAddStream) { 156 if (_inAddStream) {
149 throw new StateError("Cannot add stream while adding stream."); 157 throw new StateError("Cannot add stream while adding stream.");
150 } 158 }
151 if (_disconnected) return new Future.value(); 159 if (_disconnected) return new Future.value();
152 160
153 _addStreamCompleter = new Completer.sync(); 161 _addStreamCompleter = new Completer.sync();
154 _addStreamSubscription = stream.listen( 162 _addStreamSubscription = stream.listen(
155 _inner.add, 163 _inner.add,
156 onError: _inner.addError, 164 onError: _addError,
157 onDone: _addStreamCompleter.complete); 165 onDone: _addStreamCompleter.complete);
158 return _addStreamCompleter.future.then((_) { 166 return _addStreamCompleter.future.then((_) {
159 _addStreamCompleter = null; 167 _addStreamCompleter = null;
160 _addStreamSubscription = null; 168 _addStreamSubscription = null;
161 }); 169 });
162 } 170 }
163 171
164 Future close() { 172 Future close() {
165 if (_inAddStream) { 173 if (_inAddStream) {
166 throw new StateError("Cannot close sink while adding stream."); 174 throw new StateError("Cannot close sink while adding stream.");
(...skipping 17 matching lines...) Expand all
184 void _onStreamDisconnected() { 192 void _onStreamDisconnected() {
185 _disconnected = true; 193 _disconnected = true;
186 if (!_doneCompleter.isCompleted) _doneCompleter.complete(); 194 if (!_doneCompleter.isCompleted) _doneCompleter.complete();
187 195
188 if (!_inAddStream) return; 196 if (!_inAddStream) return;
189 _addStreamCompleter.complete(_addStreamSubscription.cancel()); 197 _addStreamCompleter.complete(_addStreamSubscription.cancel());
190 _addStreamCompleter = null; 198 _addStreamCompleter = null;
191 _addStreamSubscription = null; 199 _addStreamSubscription = null;
192 } 200 }
193 } 201 }
OLDNEW
« no previous file with comments | « no previous file | lib/src/isolate_channel.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698