Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 18 matching lines...) Expand all
29 * 29 *
30 * The only public methods are those of [StreamSubscription], so instances of 30 * The only public methods are those of [StreamSubscription], so instances of
31 * [_BufferingStreamSubscription] can be returned directly as a 31 * [_BufferingStreamSubscription] can be returned directly as a
32 * [StreamSubscription] without exposing internal functionality. 32 * [StreamSubscription] without exposing internal functionality.
33 * 33 *
34 * The [StreamController] is a public facing version of [Stream] and this class, 34 * The [StreamController] is a public facing version of [Stream] and this class,
35 * with some methods made public. 35 * with some methods made public.
36 * 36 *
37 * The user interface of [_BufferingStreamSubscription] are the following 37 * The user interface of [_BufferingStreamSubscription] are the following
38 * methods: 38 * methods:
39 *
39 * * [_add]: Add a data event to the stream. 40 * * [_add]: Add a data event to the stream.
40 * * [_addError]: Add an error event to the stream. 41 * * [_addError]: Add an error event to the stream.
41 * * [_close]: Request to close the stream. 42 * * [_close]: Request to close the stream.
42 * * [_onCancel]: Called when the subscription will provide no more events, 43 * * [_onCancel]: Called when the subscription will provide no more events,
43 * either due to being actively canceled, or after sending a done event. 44 * either due to being actively canceled, or after sending a done event.
44 * * [_onPause]: Called when the subscription wants the event source to pause. 45 * * [_onPause]: Called when the subscription wants the event source to pause.
45 * * [_onResume]: Called when allowing new events after a pause. 46 * * [_onResume]: Called when allowing new events after a pause.
47 *
46 * The user should not add new events when the subscription requests a paused, 48 * The user should not add new events when the subscription requests a paused,
47 * but if it happens anyway, the subscription will enqueue the events just as 49 * but if it happens anyway, the subscription will enqueue the events just as
48 * when new events arrive while still firing an old event. 50 * when new events arrive while still firing an old event.
49 */ 51 */
50 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, 52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
51 _EventSink<T>, 53 _EventSink<T>,
52 _EventDispatch<T> { 54 _EventDispatch<T> {
53 /** The `cancelOnError` flag from the `listen` call. */ 55 /** The `cancelOnError` flag from the `listen` call. */
54 static const int _STATE_CANCEL_ON_ERROR = 1; 56 static const int _STATE_CANCEL_ON_ERROR = 1;
55 /** 57 /**
(...skipping 29 matching lines...) Expand all
85 /** Bit vector based on state-constants above. */ 87 /** Bit vector based on state-constants above. */
86 int _state; 88 int _state;
87 89
88 /** 90 /**
89 * Queue of pending events. 91 * Queue of pending events.
90 * 92 *
91 * Is created when necessary, or set in constructor for preconfigured events. 93 * Is created when necessary, or set in constructor for preconfigured events.
92 */ 94 */
93 _PendingEvents _pending; 95 _PendingEvents _pending;
94 96
95 _BufferingStreamSubscription(void onData(T data), 97 _BufferingStreamSubscription(void onDataHandler(T data),
96 Function onError, 98 Function onErrorHandler,
97 void onDone(), 99 void onDoneHandler(),
98 bool cancelOnError) 100 bool cancelOnError)
99 : _onData = Zone.current.registerUnaryCallback(onData), 101 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
100 _onError = _registerErrorHandler(onError, Zone.current), 102 onData(onDataHandler);
101 _onDone = Zone.current.registerCallback(onDone), 103 onError(onErrorHandler);
102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { 104 onDone(onDoneHandler);
103 assert(_onData != null);
104 assert(_onError != null);
105 assert(_onDone != null);
106 } 105 }
107 106
108 /** 107 /**
109 * Sets the subscription's pending events object. 108 * Sets the subscription's pending events object.
110 * 109 *
111 * This can only be done once. The pending events object is used for the 110 * This can only be done once. The pending events object is used for the
112 * rest of the subscription's life cycle. 111 * rest of the subscription's life cycle.
113 */ 112 */
114 void _setPendingEvents(_PendingEvents pendingEvents) { 113 void _setPendingEvents(_PendingEvents pendingEvents) {
115 assert(_pending == null); 114 assert(_pending == null);
(...skipping 882 matching lines...) Expand 10 before | Expand all | Expand 10 after
998 _Future<bool> hasNext = _futureOrPrefetch; 997 _Future<bool> hasNext = _futureOrPrefetch;
999 _clear(); 998 _clear();
1000 hasNext._complete(false); 999 hasNext._complete(false);
1001 return; 1000 return;
1002 } 1001 }
1003 _subscription.pause(); 1002 _subscription.pause();
1004 _futureOrPrefetch = null; 1003 _futureOrPrefetch = null;
1005 _state = _STATE_EXTRA_DONE; 1004 _state = _STATE_EXTRA_DONE;
1006 } 1005 }
1007 } 1006 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698