Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2529393002: Make core libraries use generic method syntax. (Closed)
Patch Set: Update status files. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/timer.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
132 _pending.schedule(this); 132 _pending.schedule(this);
133 } 133 }
134 } 134 }
135 135
136 // StreamSubscription interface. 136 // StreamSubscription interface.
137 137
138 void onData(void handleData(T event)) { 138 void onData(void handleData(T event)) {
139 if (handleData == null) handleData = _nullDataHandler; 139 if (handleData == null) handleData = _nullDataHandler;
140 // TODO(floitsch): the return type should be 'void', and the type 140 // TODO(floitsch): the return type should be 'void', and the type
141 // should be inferred. 141 // should be inferred.
142 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData); 142 _onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
143 } 143 }
144 144
145 void onError(Function handleError) { 145 void onError(Function handleError) {
146 if (handleError == null) handleError = _nullErrorHandler; 146 if (handleError == null) handleError = _nullErrorHandler;
147 // We are not allowed to use 'void' as type argument for the generic type, 147 // We are not allowed to use 'void' as type argument for the generic type,
148 // so we use 'dynamic' instead. 148 // so we use 'dynamic' instead.
149 _onError = _registerErrorHandler/*<dynamic>*/(handleError, _zone); 149 _onError = _registerErrorHandler<dynamic>(handleError, _zone);
150 } 150 }
151 151
152 void onDone(void handleDone()) { 152 void onDone(void handleDone()) {
153 if (handleDone == null) handleDone = _nullDoneHandler; 153 if (handleDone == null) handleDone = _nullDoneHandler;
154 _onDone = _zone.registerCallback(handleDone); 154 _onDone = _zone.registerCallback(handleDone);
155 } 155 }
156 156
157 void pause([Future resumeSignal]) { 157 void pause([Future resumeSignal]) {
158 if (_isCanceled) return; 158 if (_isCanceled) return;
159 bool wasPaused = _isPaused; 159 bool wasPaused = _isPaused;
(...skipping 26 matching lines...) Expand all
186 // The user doesn't want to receive any further events. If there is an 186 // The user doesn't want to receive any further events. If there is an
187 // error or done event pending (waiting for the cancel to be done) discard 187 // error or done event pending (waiting for the cancel to be done) discard
188 // that event. 188 // that event.
189 _state &= ~_STATE_WAIT_FOR_CANCEL; 189 _state &= ~_STATE_WAIT_FOR_CANCEL;
190 if (!_isCanceled) { 190 if (!_isCanceled) {
191 _cancel(); 191 _cancel();
192 } 192 }
193 return _cancelFuture ?? Future._nullFuture; 193 return _cancelFuture ?? Future._nullFuture;
194 } 194 }
195 195
196 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 196 Future<E> asFuture<E>([E futureValue]) {
197 _Future/*<E>*/ result = new _Future/*<E>*/(); 197 _Future<E> result = new _Future<E>();
198 198
199 // Overwrite the onDone and onError handlers. 199 // Overwrite the onDone and onError handlers.
200 _onDone = () { result._complete(futureValue); }; 200 _onDone = () { result._complete(futureValue); };
201 _onError = (error, stackTrace) { 201 _onError = (error, stackTrace) {
202 Future cancelFuture = cancel(); 202 Future cancelFuture = cancel();
203 if (!identical(cancelFuture, Future._nullFuture)) { 203 if (!identical(cancelFuture, Future._nullFuture)) {
204 cancelFuture.whenComplete(() { 204 cancelFuture.whenComplete(() {
205 result._completeError(error, stackTrace); 205 result._completeError(error, stackTrace);
206 }); 206 });
207 } else { 207 } else {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
250 } 250 }
251 251
252 // _EventSink interface. 252 // _EventSink interface.
253 253
254 void _add(T data) { 254 void _add(T data) {
255 assert(!_isClosed); 255 assert(!_isClosed);
256 if (_isCanceled) return; 256 if (_isCanceled) return;
257 if (_canFire) { 257 if (_canFire) {
258 _sendData(data); 258 _sendData(data);
259 } else { 259 } else {
260 _addPending(new _DelayedData<dynamic /*=T*/>(data)); 260 _addPending(new _DelayedData<T>(data));
261 } 261 }
262 } 262 }
263 263
264 void _addError(Object error, StackTrace stackTrace) { 264 void _addError(Object error, StackTrace stackTrace) {
265 if (_isCanceled) return; 265 if (_isCanceled) return;
266 if (_canFire) { 266 if (_canFire) {
267 _sendError(error, stackTrace); // Reports cancel after sending. 267 _sendError(error, stackTrace); // Reports cancel after sending.
268 } else { 268 } else {
269 _addPending(new _DelayedError(error, stackTrace)); 269 _addPending(new _DelayedError(error, stackTrace));
270 } 270 }
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
302 302
303 /** 303 /**
304 * Add a pending event. 304 * Add a pending event.
305 * 305 *
306 * If the subscription is not paused, this also schedules a firing 306 * If the subscription is not paused, this also schedules a firing
307 * of pending events later (if necessary). 307 * of pending events later (if necessary).
308 */ 308 */
309 void _addPending(_DelayedEvent event) { 309 void _addPending(_DelayedEvent event) {
310 _StreamImplEvents<T> pending = _pending; 310 _StreamImplEvents<T> pending = _pending;
311 if (_pending == null) { 311 if (_pending == null) {
312 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>(); 312 pending = _pending = new _StreamImplEvents<T>();
313 } 313 }
314 pending.add(event); 314 pending.add(event);
315 if (!_hasPending) { 315 if (!_hasPending) {
316 _state |= _STATE_HAS_PENDING; 316 _state |= _STATE_HAS_PENDING;
317 if (!_isPaused) { 317 if (!_isPaused) {
318 _pending.schedule(this); 318 _pending.schedule(this);
319 } 319 }
320 } 320 }
321 } 321 }
322 322
(...skipping 19 matching lines...) Expand all
342 void sendError() { 342 void sendError() {
343 // If the subscription has been canceled while waiting for the cancel 343 // If the subscription has been canceled while waiting for the cancel
344 // future to finish we must not report the error. 344 // future to finish we must not report the error.
345 if (_isCanceled && !_waitsForCancel) return; 345 if (_isCanceled && !_waitsForCancel) return;
346 _state |= _STATE_IN_CALLBACK; 346 _state |= _STATE_IN_CALLBACK;
347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { 347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError 348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
349 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; 349 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); 350 _zone.runBinaryGuarded(errorCallback, error, stackTrace);
351 } else { 351 } else {
352 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( 352 _zone.runUnaryGuarded<dynamic, dynamic>(
353 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); 353 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
354 } 354 }
355 _state &= ~_STATE_IN_CALLBACK; 355 _state &= ~_STATE_IN_CALLBACK;
356 } 356 }
357 357
358 if (_cancelOnError) { 358 if (_cancelOnError) {
359 _state |= _STATE_WAIT_FOR_CANCEL; 359 _state |= _STATE_WAIT_FOR_CANCEL;
360 _cancel(); 360 _cancel();
361 if (_cancelFuture is Future && 361 if (_cancelFuture is Future &&
362 !identical(_cancelFuture, Future._nullFuture)) { 362 !identical(_cancelFuture, Future._nullFuture)) {
(...skipping 390 matching lines...) Expand 10 before | Expand all | Expand 10 after
753 if (isPaused) { 753 if (isPaused) {
754 _state -= _PAUSED; 754 _state -= _PAUSED;
755 if (!isPaused && !_isSent) { 755 if (!isPaused && !_isSent) {
756 _schedule(); 756 _schedule();
757 } 757 }
758 } 758 }
759 } 759 }
760 760
761 Future cancel() => Future._nullFuture; 761 Future cancel() => Future._nullFuture;
762 762
763 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 763 Future<E> asFuture<E>([E futureValue]) {
764 _Future/*<E>*/ result = new _Future/*<E>*/(); 764 _Future<E> result = new _Future<E>();
765 _onDone = () { result._completeWithValue(null); }; 765 _onDone = () { result._completeWithValue(null); };
766 return result; 766 return result;
767 } 767 }
768 768
769 void _sendDone() { 769 void _sendDone() {
770 _state &= ~_SCHEDULED; 770 _state &= ~_SCHEDULED;
771 if (isPaused) return; 771 if (isPaused) return;
772 _state |= _DONE_SENT; 772 _state |= _DONE_SENT;
773 if (_onDone != null) _zone.runGuarded(_onDone); 773 if (_onDone != null) _zone.runGuarded(_onDone);
774 } 774 }
775 } 775 }
776 776
777 class _AsBroadcastStream<T> extends Stream<T> { 777 class _AsBroadcastStream<T> extends Stream<T> {
778 final Stream<T> _source; 778 final Stream<T> _source;
779 final _BroadcastCallback<T> _onListenHandler; 779 final _BroadcastCallback<T> _onListenHandler;
780 final _BroadcastCallback<T> _onCancelHandler; 780 final _BroadcastCallback<T> _onCancelHandler;
781 final Zone _zone; 781 final Zone _zone;
782 782
783 _AsBroadcastStreamController<T> _controller; 783 _AsBroadcastStreamController<T> _controller;
784 StreamSubscription<T> _subscription; 784 StreamSubscription<T> _subscription;
785 785
786 _AsBroadcastStream(this._source, 786 _AsBroadcastStream(this._source,
787 void onListenHandler(StreamSubscription<T> subscription), 787 void onListenHandler(StreamSubscription<T> subscription),
788 void onCancelHandler(StreamSubscription<T> subscription)) 788 void onCancelHandler(StreamSubscription<T> subscription))
789 // TODO(floitsch): the return type should be void and should be 789 // TODO(floitsch): the return type should be void and should be
790 // inferred. 790 // inferred.
791 : _onListenHandler = Zone.current.registerUnaryCallback 791 : _onListenHandler = Zone.current.registerUnaryCallback
792 /*<dynamic, StreamSubscription<T>>*/(onListenHandler), 792 <dynamic, StreamSubscription<T>>(onListenHandler),
793 _onCancelHandler = Zone.current.registerUnaryCallback 793 _onCancelHandler = Zone.current.registerUnaryCallback
794 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler), 794 <dynamic, StreamSubscription<T>>(onCancelHandler),
795 _zone = Zone.current { 795 _zone = Zone.current {
796 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); 796 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
797 } 797 }
798 798
799 bool get isBroadcast => true; 799 bool get isBroadcast => true;
800 800
801 StreamSubscription<T> listen(void onData(T data), 801 StreamSubscription<T> listen(void onData(T data),
802 { Function onError, 802 { Function onError,
803 void onDone(), 803 void onDone(),
804 bool cancelOnError}) { 804 bool cancelOnError}) {
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
896 896
897 Future cancel() { 897 Future cancel() {
898 _stream._cancelSubscription(); 898 _stream._cancelSubscription();
899 return Future._nullFuture; 899 return Future._nullFuture;
900 } 900 }
901 901
902 bool get isPaused { 902 bool get isPaused {
903 return _stream._isSubscriptionPaused; 903 return _stream._isSubscriptionPaused;
904 } 904 }
905 905
906 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 906 Future<E> asFuture<E>([E futureValue]) {
907 throw new UnsupportedError( 907 throw new UnsupportedError(
908 "Cannot change handlers of asBroadcastStream source subscription."); 908 "Cannot change handlers of asBroadcastStream source subscription.");
909 } 909 }
910 } 910 }
911 911
912 912
913 /** 913 /**
914 * Simple implementation of [StreamIterator]. 914 * Simple implementation of [StreamIterator].
915 * 915 *
916 * Pauses the stream between calls to [moveNext]. 916 * Pauses the stream between calls to [moveNext].
(...skipping 135 matching lines...) Expand 10 before | Expand all | Expand 10 after
1052 class _EmptyStream<T> extends Stream<T> { 1052 class _EmptyStream<T> extends Stream<T> {
1053 const _EmptyStream() : super._internal(); 1053 const _EmptyStream() : super._internal();
1054 bool get isBroadcast => true; 1054 bool get isBroadcast => true;
1055 StreamSubscription<T> listen(void onData(T data), 1055 StreamSubscription<T> listen(void onData(T data),
1056 {Function onError, 1056 {Function onError,
1057 void onDone(), 1057 void onDone(),
1058 bool cancelOnError}) { 1058 bool cancelOnError}) {
1059 return new _DoneStreamSubscription<T>(onDone); 1059 return new _DoneStreamSubscription<T>(onDone);
1060 } 1060 }
1061 } 1061 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/timer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698