Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(542)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 13814010: Fix stream_controller_test failure. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/lib/async/stream_controller_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 1117 matching lines...) Expand 10 before | Expand all | Expand 10 after
1128 } 1128 }
1129 } 1129 }
1130 1130
1131 class _EventTransformStreamSubscription<S, T> 1131 class _EventTransformStreamSubscription<S, T>
1132 extends _BaseStreamSubscription<T> 1132 extends _BaseStreamSubscription<T>
1133 implements _EventOutputSink<T> { 1133 implements _EventOutputSink<T> {
1134 /** The transformer used to transform events. */ 1134 /** The transformer used to transform events. */
1135 final StreamEventTransformer<S, T> _transformer; 1135 final StreamEventTransformer<S, T> _transformer;
1136 /** Whether to unsubscribe when emitting an error. */ 1136 /** Whether to unsubscribe when emitting an error. */
1137 final bool _unsubscribeOnError; 1137 final bool _unsubscribeOnError;
1138 /** Whether this stream has sent a done event. */
1139 bool _isClosed = false;
1138 /** Source of incoming events. */ 1140 /** Source of incoming events. */
1139 StreamSubscription<S> _subscription; 1141 StreamSubscription<S> _subscription;
1140 /** Cached EventSink wrapper for this class. */ 1142 /** Cached EventSink wrapper for this class. */
1141 EventSink<T> _sink; 1143 EventSink<T> _sink;
1142 1144
1143 _EventTransformStreamSubscription(Stream<S> source, 1145 _EventTransformStreamSubscription(Stream<S> source,
1144 this._transformer, 1146 this._transformer,
1145 void onData(T data), 1147 void onData(T data),
1146 void onError(AsyncError error), 1148 void onError(AsyncError error),
1147 void onDone(), 1149 void onDone(),
(...skipping 11 matching lines...) Expand all
1159 void pause([Future pauseSignal]) { 1161 void pause([Future pauseSignal]) {
1160 if (_isSubscribed) _subscription.pause(pauseSignal); 1162 if (_isSubscribed) _subscription.pause(pauseSignal);
1161 } 1163 }
1162 1164
1163 void resume() { 1165 void resume() {
1164 if (_isSubscribed) _subscription.resume(); 1166 if (_isSubscribed) _subscription.resume();
1165 } 1167 }
1166 1168
1167 void cancel() { 1169 void cancel() {
1168 if (_isSubscribed) { 1170 if (_isSubscribed) {
1169 _subscription.cancel(); 1171 StreamSubscription subscription = _subscription;
1170 _subscription = null; 1172 _subscription = null;
1173 subscription.cancel();
1171 } 1174 }
1175 _isClosed = true;
1172 } 1176 }
1173 1177
1174 void _handleData(S data) { 1178 void _handleData(S data) {
1175 try { 1179 try {
1176 _transformer.handleData(data, _sink); 1180 _transformer.handleData(data, _sink);
1177 } catch (e, s) { 1181 } catch (e, s) {
1178 _sendError(_asyncError(e, s)); 1182 _sendError(_asyncError(e, s));
1179 } 1183 }
1180 } 1184 }
1181 1185
1182 void _handleError(AsyncError error) { 1186 void _handleError(AsyncError error) {
1183 try { 1187 try {
1184 _transformer.handleError(error, _sink); 1188 _transformer.handleError(error, _sink);
1185 } catch (e, s) { 1189 } catch (e, s) {
1186 _sendError(_asyncError(e, s, error)); 1190 _sendError(_asyncError(e, s, error));
1187 } 1191 }
1188 } 1192 }
1189 1193
1190 void _handleDone() { 1194 void _handleDone() {
1191 try { 1195 try {
1196 _subscription = null;
1192 _transformer.handleDone(_sink); 1197 _transformer.handleDone(_sink);
1193 } catch (e, s) { 1198 } catch (e, s) {
1194 _sendError(_asyncError(e, s)); 1199 _sendError(_asyncError(e, s));
1195 } 1200 }
1196 } 1201 }
1197 1202
1198 // EventOutputSink interface. 1203 // EventOutputSink interface.
1199 void _sendData(T data) { 1204 void _sendData(T data) {
1200 if (!_isSubscribed) return; 1205 if (_isClosed) return;
1201 _onData(data); 1206 _onData(data);
1202 } 1207 }
1203 1208
1204 void _sendError(AsyncError error) { 1209 void _sendError(AsyncError error) {
1205 if (!_isSubscribed) return; 1210 if (_isClosed) return;
1206 _onError(error); 1211 _onError(error);
1207 if (_unsubscribeOnError) { 1212 if (_unsubscribeOnError) {
1208 cancel(); 1213 cancel();
1209 } 1214 }
1210 } 1215 }
1211 1216
1212 void _sendDone() { 1217 void _sendDone() {
1213 if (!_isSubscribed) return; 1218 if (_isClosed) throw new StateError("Already closed.");
1214 _subscription.cancel(); 1219 _isClosed = true;
1215 _subscription = null; 1220 if (_isSubscribed) {
1221 _subscription.cancel();
1222 _subscription = null;
1223 }
1216 _onDone(); 1224 _onDone();
1217 } 1225 }
1218 } 1226 }
1219 1227
1220 /* TODO(8997): Implement EventSink instead, */ 1228 /* TODO(8997): Implement EventSink instead, */
1221 class _EventOutputSinkWrapper<T> extends StreamSink<T> { 1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> {
1222 _EventOutputSink _sink; 1230 _EventOutputSink _sink;
1223 _EventOutputSinkWrapper(this._sink); 1231 _EventOutputSinkWrapper(this._sink);
1224 1232
1225 void add(T data) { _sink._sendData(data); } 1233 void add(T data) { _sink._sendData(data); }
1226 void addError(AsyncError error) { _sink._sendError(error); } 1234 void addError(AsyncError error) { _sink._sendError(error); }
1227 void close() { _sink._sendDone(); } 1235 void close() { _sink._sendDone(); }
1228 } 1236 }
OLDNEW
« no previous file with comments | « no previous file | tests/lib/async/stream_controller_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698