Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: test/dart_codegen/expect/async/stream_controller.dart

Issue 1148283010: Remove dart backend (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 part of dart.async;
2 abstract class StreamController<T> implements StreamSink<T> {Stream<T> get stre am;
3 factory StreamController({
4 void onListen(), void onPause(), void onResume(), onCancel(), bool sync : fals e}
5 ) {
6 if (onListen == null && onPause == null && onResume == null && onCancel == nul l) {
7 return ((__x42) => DEVC$RT.cast(__x42, DEVC$RT.type((_StreamController<dynam ic> _) {
8 }
9 ), DEVC$RT.type((StreamController<T> _) {
10 }
11 ), "CompositeCast", """line 83, column 14 of dart:async/stream_controller.da rt: """, __x42 is StreamController<T>, false))(sync ? new _NoCallbackSyncStreamC ontroller() : new _NoCallbackAsyncStreamController());
12 }
13 return sync ? new _SyncStreamController<T>(onListen, onPause, onResume, onCan cel) : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
14 }
15 factory StreamController.broadcast({
16 void onListen(), void onCancel(), bool sync : false}
17 ) {
18 return sync ? new _SyncBroadcastStreamController<T>(onListen, onCancel) : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
19 }
20 StreamSink<T> get sink;
21 bool get isClosed;
22 bool get isPaused;
23 bool get hasListener;
24 void addError(Object error, [StackTrace stackTrace]);
25 Future addStream(Stream<T> source, {
26 bool cancelOnError : true}
27 );
28 }
29 abstract class _StreamControllerLifecycle<T> {StreamSubscription<T> _subscribe( void onData(T data), Function onError, void onDone(), bool cancelOnError);
30 void _recordPause(StreamSubscription<T> subscription) {
31 }
32 void _recordResume(StreamSubscription<T> subscription) {
33 }
34 Future _recordCancel(StreamSubscription<T> subscription) => null;
35 }
36 abstract class _StreamController<T> implements StreamController<T>, _StreamCont rollerLifecycle<T>, _EventSink<T>, _EventDispatch<T> {static const int _STATE_IN ITIAL = 0;
37 static const int _STATE_SUBSCRIBED = 1;
38 static const int _STATE_CANCELED = 2;
39 static const int _STATE_SUBSCRIPTION_MASK = 3;
40 static const int _STATE_CLOSED = 4;
41 static const int _STATE_ADDSTREAM = 8;
42 var _varData;
43 int _state = _STATE_INITIAL;
44 _Future _doneFuture;
45 _StreamController();
46 _NotificationHandler get _onListen;
47 _NotificationHandler get _onPause;
48 _NotificationHandler get _onResume;
49 _NotificationHandler get _onCancel;
50 Stream<T> get stream => new _ControllerStream<T>(this);
51 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
52 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
53 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
54 bool get _isInitialState => (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITI AL;
55 bool get isClosed => (_state & _STATE_CLOSED) != 0;
56 bool get isPaused => hasListener ? _subscription._isInputPaused : !_isCanceled;
57 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
58 bool get _mayAddEvent => (_state < _STATE_CLOSED);
59 _PendingEvents get _pendingEvents {
60 assert (_isInitialState); if (!_isAddingStream) {
61 return DEVC$RT.cast(_varData, dynamic, _PendingEvents, "DynamicCast", """line 33 4, column 14 of dart:async/stream_controller.dart: """, _varData is _PendingEven ts, true);
62 }
63 _StreamControllerAddStreamState state = DEVC$RT.cast(_varData, dynamic, DEVC$RT .type((_StreamControllerAddStreamState<dynamic> _) {
64 }
65 ), "DynamicCast", """line 336, column 45 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
66 return DEVC$RT.cast(state.varData, dynamic, _PendingEvents, "DynamicCast", """l ine 337, column 12 of dart:async/stream_controller.dart: """, state.varData is _ PendingEvents, true);
67 }
68 _StreamImplEvents _ensurePendingEvents() {
69 assert (_isInitialState); if (!_isAddingStream) {
70 if (_varData == null) _varData = new _StreamImplEvents();
71 return DEVC$RT.cast(_varData, dynamic, _StreamImplEvents, "DynamicCast", """lin e 345, column 14 of dart:async/stream_controller.dart: """, _varData is _StreamI mplEvents, true);
72 }
73 _StreamControllerAddStreamState state = DEVC$RT.cast(_varData, dynamic, DEVC$RT .type((_StreamControllerAddStreamState<dynamic> _) {
74 }
75 ), "DynamicCast", """line 347, column 45 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
76 if (state.varData == null) state.varData = new _StreamImplEvents();
77 return DEVC$RT.cast(state.varData, dynamic, _StreamImplEvents, "DynamicCast", " ""line 349, column 12 of dart:async/stream_controller.dart: """, state.varData i s _StreamImplEvents, true);
78 }
79 _ControllerSubscription get _subscription {
80 assert (hasListener); if (_isAddingStream) {
81 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$ RT.type((_StreamControllerAddStreamState<dynamic> _) {
82 }
83 ), "DynamicCast", """line 358, column 50 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
84 return DEVC$RT.cast(addState.varData, dynamic, DEVC$RT.type((_ControllerSubscri ption<dynamic> _) {
85 }
86 ), "DynamicCast", """line 359, column 14 of dart:async/stream_controller.dart: " "", addState.varData is _ControllerSubscription<dynamic>, true);
87 }
88 return DEVC$RT.cast(_varData, dynamic, DEVC$RT.type((_ControllerSubscription<dy namic> _) {
89 }
90 ), "DynamicCast", """line 361, column 12 of dart:async/stream_controller.dart: " "", _varData is _ControllerSubscription<dynamic>, true);
91 }
92 Error _badEventState() {
93 if (isClosed) {
94 return new StateError("Cannot add event after closing");
95 }
96 assert (_isAddingStream); return new StateError("Cannot add event while adding a stream");
97 }
98 Future addStream(Stream<T> source, {
99 bool cancelOnError : true}
100 ) {
101 if (!_mayAddEvent) throw _badEventState();
102 if (_isCanceled) return new _Future.immediate(null);
103 _StreamControllerAddStreamState addState = new _StreamControllerAddStreamState( this, _varData, source, cancelOnError);
104 _varData = addState;
105 _state |= _STATE_ADDSTREAM;
106 return addState.addStreamFuture;
107 }
108 Future get done => _ensureDoneFuture();
109 Future _ensureDoneFuture() {
110 if (_doneFuture == null) {
111 _doneFuture = _isCanceled ? Future._nullFuture : new _Future();
112 }
113 return _doneFuture;
114 }
115 void add(T value) {
116 if (!_mayAddEvent) throw _badEventState();
117 _add(value);
118 }
119 void addError(Object error, [StackTrace stackTrace]) {
120 error = _nonNullError(error);
121 if (!_mayAddEvent) throw _badEventState();
122 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
123 if (replacement != null) {
124 error = _nonNullError(replacement.error);
125 stackTrace = replacement.stackTrace;
126 }
127 _addError(error, stackTrace);
128 }
129 Future close() {
130 if (isClosed) {
131 return _ensureDoneFuture();
132 }
133 if (!_mayAddEvent) throw _badEventState();
134 _closeUnchecked();
135 return _ensureDoneFuture();
136 }
137 void _closeUnchecked() {
138 _state |= _STATE_CLOSED;
139 if (hasListener) {
140 _sendDone();
141 }
142 else if (_isInitialState) {
143 _ensurePendingEvents().add(const _DelayedDone());
144 }
145 }
146 void _add(T value) {
147 if (hasListener) {
148 _sendData(value);
149 }
150 else if (_isInitialState) {
151 _ensurePendingEvents().add(new _DelayedData<T>(value));
152 }
153 }
154 void _addError(Object error, StackTrace stackTrace) {
155 if (hasListener) {
156 _sendError(error, stackTrace);
157 }
158 else if (_isInitialState) {
159 _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
160 }
161 }
162 void _close() {
163 assert (_isAddingStream); _StreamControllerAddStreamState addState = DEVC$RT.cas t(_varData, dynamic, DEVC$RT.type((_StreamControllerAddStreamState<dynamic> _) {
164 }
165 ), "DynamicCast", """line 482, column 48 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
166 _varData = addState.varData;
167 _state &= ~_STATE_ADDSTREAM;
168 addState.complete();
169 }
170 StreamSubscription<T> _subscribe(void onData(T data), Function onError, void on Done(), bool cancelOnError) {
171 if (!_isInitialState) {
172 throw new StateError("Stream has already been listened to.");
173 }
174 _ControllerSubscription subscription = new _ControllerSubscription(this, onData , onError, onDone, cancelOnError);
175 _PendingEvents pendingEvents = _pendingEvents;
176 _state |= _STATE_SUBSCRIBED;
177 if (_isAddingStream) {
178 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$ RT.type((_StreamControllerAddStreamState<dynamic> _) {
179 }
180 ), "DynamicCast", """line 505, column 50 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
181 addState.varData = subscription;
182 addState.resume();
183 }
184 else {
185 _varData = subscription;
186 }
187 subscription._setPendingEvents(pendingEvents);
188 subscription._guardCallback(() {
189 _runGuarded(_onListen);
190 }
191 );
192 return DEVC$RT.cast(subscription, DEVC$RT.type((_ControllerSubscription<dynamic > _) {
193 }
194 ), DEVC$RT.type((StreamSubscription<T> _) {
195 }
196 ), "CompositeCast", """line 516, column 12 of dart:async/stream_controller.dart: """, subscription is StreamSubscription<T>, false);
197 }
198 Future _recordCancel(StreamSubscription<T> subscription) {
199 Future result;
200 if (_isAddingStream) {
201 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$ RT.type((_StreamControllerAddStreamState<dynamic> _) {
202 }
203 ), "DynamicCast", """line 530, column 50 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
204 result = addState.cancel();
205 }
206 _varData = null;
207 _state = (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
208 if (_onCancel != null) {
209 if (result == null) {
210 try {
211 result = ((__x43) => DEVC$RT.cast(__x43, dynamic, DEVC$RT.type((Future<dynam ic> _) {
212 }
213 ), "DynamicCast", """line 542, column 20 of dart:async/stream_controller.dar t: """, __x43 is Future<dynamic>, true))(_onCancel());
214 }
215 catch (e, s) {
216 result = new _Future().._asyncCompleteError(e, s);
217 }
218 }
219 else {
220 result = result.whenComplete(_onCancel);
221 }
222 }
223 void complete() {
224 if (_doneFuture != null && _doneFuture._mayComplete) {
225 _doneFuture._asyncComplete(null);
226 }
227 }
228 if (result != null) {
229 result = result.whenComplete(complete);
230 }
231 else {
232 complete();
233 }
234 return result;
235 }
236 void _recordPause(StreamSubscription<T> subscription) {
237 if (_isAddingStream) {
238 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$ RT.type((_StreamControllerAddStreamState<dynamic> _) {
239 }
240 ), "DynamicCast", """line 572, column 50 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
241 addState.pause();
242 }
243 _runGuarded(_onPause);
244 }
245 void _recordResume(StreamSubscription<T> subscription) {
246 if (_isAddingStream) {
247 _StreamControllerAddStreamState addState = DEVC$RT.cast(_varData, dynamic, DEVC$ RT.type((_StreamControllerAddStreamState<dynamic> _) {
248 }
249 ), "DynamicCast", """line 580, column 50 of dart:async/stream_controller.dart: " "", _varData is _StreamControllerAddStreamState<dynamic>, true);
250 addState.resume();
251 }
252 _runGuarded(_onResume);
253 }
254 }
255 abstract class _SyncStreamControllerDispatch<T> implements _StreamController<T> {void _sendData(T data) {
256 _subscription._add(data);
257 }
258 void _sendError(Object error, StackTrace stackTrace) {
259 _subscription._addError(error, stackTrace);
260 }
261 void _sendDone() {
262 _subscription._close();
263 }
264 }
265 abstract class _AsyncStreamControllerDispatch<T> implements _StreamController<T > {void _sendData(T data) {
266 _subscription._addPending(new _DelayedData(data));
267 }
268 void _sendError(Object error, StackTrace stackTrace) {
269 _subscription._addPending(new _DelayedError(error, stackTrace));
270 }
271 void _sendDone() {
272 _subscription._addPending(const _DelayedDone());
273 }
274 }
275 class _AsyncStreamController<T> extends _StreamController<T> with _AsyncStreamC ontrollerDispatch<T> {final _NotificationHandler _onListen;
276 final _NotificationHandler _onPause;
277 final _NotificationHandler _onResume;
278 final _NotificationHandler _onCancel;
279 _AsyncStreamController(void this._onListen(), void this._onPause(), void this._ onResume(), this._onCancel());
280 }
281 class _SyncStreamController<T> extends _StreamController<T> with _SyncStreamCon trollerDispatch<T> {final _NotificationHandler _onListen;
282 final _NotificationHandler _onPause;
283 final _NotificationHandler _onResume;
284 final _NotificationHandler _onCancel;
285 _SyncStreamController(void this._onListen(), void this._onPause(), void this._o nResume(), this._onCancel());
286 }
287 abstract class _NoCallbacks {_NotificationHandler get _onListen => null;
288 _NotificationHandler get _onPause => null;
289 _NotificationHandler get _onResume => null;
290 _NotificationHandler get _onCancel => null;
291 }
292 class _NoCallbackAsyncStreamController = _StreamController with _AsyncStreamCon trollerDispatch, _NoCallbacks;
293 class _NoCallbackSyncStreamController = _StreamController with _SyncStreamContr ollerDispatch, _NoCallbacks;
294 typedef _NotificationHandler();
295 Future _runGuarded(_NotificationHandler notificationHandler) {
296 if (notificationHandler == null) return null;
297 try {
298 var result = notificationHandler();
299 if (result is Future) return DEVC$RT.cast(result, dynamic, DEVC$RT.type((Future <dynamic> _) {
300 }
301 ), "DynamicCast", """line 665, column 34 of dart:async/stream_controller.dart: " "", result is Future<dynamic>, true);
302 return null;
303 }
304 catch (e, s) {
305 Zone.current.handleUncaughtError(e, s);
306 }
307 }
308 class _ControllerStream<T> extends _StreamImpl<T> {_StreamControllerLifecycle<T > _controller;
309 _ControllerStream(this._controller);
310 StreamSubscription<T> _createSubscription(void onData(T data), Function onError , void onDone(), bool cancelOnError) => _controller._subscribe(onData, onError, onDone, cancelOnError);
311 int get hashCode => _controller.hashCode ^ 0x35323532;
312 bool operator ==(Object other) {
313 if (identical(this, other)) return true;
314 if (other is! _ControllerStream) return false;
315 _ControllerStream otherStream = DEVC$RT.cast(other, Object, DEVC$RT.type((_Cont rollerStream<dynamic> _) {
316 }
317 ), "AssignmentCast", """line 693, column 37 of dart:async/stream_controller.dart : """, other is _ControllerStream<dynamic>, true);
318 return identical(otherStream._controller, this._controller);
319 }
320 }
321 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {final _StreamControllerLifecycle<T> _controller;
322 _ControllerSubscription(this._controller, void onData(T data), Function onError , void onDone(), bool cancelOnError) : super(onData, onError, onDone, cancelOnEr ror);
323 Future _onCancel() {
324 return _controller._recordCancel(this);
325 }
326 void _onPause() {
327 _controller._recordPause(this);
328 }
329 void _onResume() {
330 _controller._recordResume(this);
331 }
332 }
333 class _StreamSinkWrapper<T> implements StreamSink<T> {final StreamController _t arget;
334 _StreamSinkWrapper(this._target);
335 void add(T data) {
336 _target.add(data);
337 }
338 void addError(Object error, [StackTrace stackTrace]) {
339 _target.addError(error, stackTrace);
340 }
341 Future close() => _target.close();
342 Future addStream(Stream<T> source, {
343 bool cancelOnError : true}
344 ) => _target.addStream(source, cancelOnError: cancelOnError);
345 Future get done => _target.done;
346 }
347 class _AddStreamState<T> {final _Future addStreamFuture;
348 final StreamSubscription addSubscription;
349 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) : addStreamFuture = new _Future(), addSubscription = source.listen(controller._add , onError: cancelOnError ? ((__x44) => DEVC$RT.cast(__x44, dynamic, Function, "D ynamicCast", """line 747, column 48 of dart:async/stream_controller.dart: """, _ _x44 is Function, true))(makeErrorHandler(controller)) : controller._addError, o nDone: controller._close, cancelOnError: cancelOnError);
350 static makeErrorHandler(_EventSink controller) => (e, StackTrace s) {
351 controller._addError(e, s);
352 controller._close();
353 }
354 ;
355 void pause() {
356 addSubscription.pause();
357 }
358 void resume() {
359 addSubscription.resume();
360 }
361 Future cancel() {
362 var cancel = addSubscription.cancel();
363 if (cancel == null) {
364 addStreamFuture._asyncComplete(null);
365 return null;
366 }
367 return cancel.whenComplete(() {
368 addStreamFuture._asyncComplete(null);
369 }
370 );
371 }
372 void complete() {
373 addStreamFuture._asyncComplete(null);
374 }
375 }
376 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {var varDat a;
377 _StreamControllerAddStreamState(_StreamController controller, this.varData, Str eam source, bool cancelOnError) : super(DEVC$RT.cast(controller, DEVC$RT.type((_ StreamController<dynamic> _) {
378 }
379 ), DEVC$RT.type((_EventSink<T> _) {
380 }
381 ), "CompositeCast", """line 798, column 15 of dart:async/stream_controller.dart: """, controller is _EventSink<T>, false), source, cancelOnError) {
382 if (controller.isPaused) {
383 addSubscription.pause();
384 }
385 }
386 }
OLDNEW
« no previous file with comments | « test/dart_codegen/expect/async/stream.dart ('k') | test/dart_codegen/expect/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698