OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 class _BroadcastStream<T> extends _ControllerStream<T> { | 7 class _BroadcastStream<T> extends _ControllerStream<T> { |
8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); | 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); |
9 | 9 |
10 bool get isBroadcast => true; | 10 bool get isBroadcast => true; |
(...skipping 12 matching lines...) Expand all Loading... |
23 // TODO(lrn): Use the _state field on _ControllerSubscription to | 23 // TODO(lrn): Use the _state field on _ControllerSubscription to |
24 // also store this state. Requires that the subscription implementation | 24 // also store this state. Requires that the subscription implementation |
25 // does not assume that it's use of the state integer is the only use. | 25 // does not assume that it's use of the state integer is the only use. |
26 int _eventState; | 26 int _eventState; |
27 | 27 |
28 _BroadcastSubscriptionLink _next; | 28 _BroadcastSubscriptionLink _next; |
29 _BroadcastSubscriptionLink _previous; | 29 _BroadcastSubscriptionLink _previous; |
30 | 30 |
31 _BroadcastSubscription(_StreamControllerLifecycle controller, | 31 _BroadcastSubscription(_StreamControllerLifecycle controller, |
32 void onData(T data), | 32 void onData(T data), |
33 void onError(Object error), | 33 Function onError, |
34 void onDone(), | 34 void onDone(), |
35 bool cancelOnError) | 35 bool cancelOnError) |
36 : super(controller, onData, onError, onDone, cancelOnError) { | 36 : super(controller, onData, onError, onDone, cancelOnError) { |
37 _next = _previous = this; | 37 _next = _previous = this; |
38 } | 38 } |
39 | 39 |
40 _BroadcastStreamController get _controller => super._controller; | 40 _BroadcastStreamController get _controller => super._controller; |
41 | 41 |
42 bool _expectsEvent(int eventId) => | 42 bool _expectsEvent(int eventId) => |
43 (_eventState & _STATE_EVENT_ID) == eventId; | 43 (_eventState & _STATE_EVENT_ID) == eventId; |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
166 _BroadcastSubscriptionLink previous = subscription._previous; | 166 _BroadcastSubscriptionLink previous = subscription._previous; |
167 _BroadcastSubscriptionLink next = subscription._next; | 167 _BroadcastSubscriptionLink next = subscription._next; |
168 previous._next = next; | 168 previous._next = next; |
169 next._previous = previous; | 169 next._previous = previous; |
170 subscription._next = subscription._previous = subscription; | 170 subscription._next = subscription._previous = subscription; |
171 } | 171 } |
172 | 172 |
173 // _StreamControllerLifecycle interface. | 173 // _StreamControllerLifecycle interface. |
174 | 174 |
175 StreamSubscription<T> _subscribe(void onData(T data), | 175 StreamSubscription<T> _subscribe(void onData(T data), |
176 void onError(Object error), | 176 Function onError, |
177 void onDone(), | 177 void onDone(), |
178 bool cancelOnError) { | 178 bool cancelOnError) { |
179 if (isClosed) { | 179 if (isClosed) { |
180 throw new StateError("Subscribing to closed stream"); | 180 throw new StateError("Subscribing to closed stream"); |
181 } | 181 } |
182 StreamSubscription subscription = new _BroadcastSubscription<T>( | 182 StreamSubscription subscription = new _BroadcastSubscription<T>( |
183 this, onData, onError, onDone, cancelOnError); | 183 this, onData, onError, onDone, cancelOnError); |
184 _addListener(subscription); | 184 _addListener(subscription); |
185 if (identical(_next, _previous)) { | 185 if (identical(_next, _previous)) { |
186 // Only one listener, so it must be the first listener. | 186 // Only one listener, so it must be the first listener. |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
220 } | 220 } |
221 | 221 |
222 void add(T data) { | 222 void add(T data) { |
223 if (!_mayAddEvent) throw _addEventError(); | 223 if (!_mayAddEvent) throw _addEventError(); |
224 _sendData(data); | 224 _sendData(data); |
225 } | 225 } |
226 | 226 |
227 void addError(Object error, [Object stackTrace]) { | 227 void addError(Object error, [Object stackTrace]) { |
228 if (!_mayAddEvent) throw _addEventError(); | 228 if (!_mayAddEvent) throw _addEventError(); |
229 if (stackTrace != null) _attachStackTrace(error, stackTrace); | 229 if (stackTrace != null) _attachStackTrace(error, stackTrace); |
230 _sendError(error); | 230 _sendError(error, stackTrace); |
231 } | 231 } |
232 | 232 |
233 Future close() { | 233 Future close() { |
234 if (isClosed) { | 234 if (isClosed) { |
235 assert(_doneFuture != null); | 235 assert(_doneFuture != null); |
236 return _doneFuture; | 236 return _doneFuture; |
237 } | 237 } |
238 if (!_mayAddEvent) throw _addEventError(); | 238 if (!_mayAddEvent) throw _addEventError(); |
239 _state |= _STATE_CLOSED; | 239 _state |= _STATE_CLOSED; |
240 Future doneFuture = _ensureDoneFuture(); | 240 Future doneFuture = _ensureDoneFuture(); |
241 _sendDone(); | 241 _sendDone(); |
242 return doneFuture; | 242 return doneFuture; |
243 } | 243 } |
244 | 244 |
245 Future get done => _ensureDoneFuture(); | 245 Future get done => _ensureDoneFuture(); |
246 | 246 |
247 Future addStream(Stream<T> stream) { | 247 Future addStream(Stream<T> stream) { |
248 if (!_mayAddEvent) throw _addEventError(); | 248 if (!_mayAddEvent) throw _addEventError(); |
249 _state |= _STATE_ADDSTREAM; | 249 _state |= _STATE_ADDSTREAM; |
250 _addStreamState = new _AddStreamState(this, stream); | 250 _addStreamState = new _AddStreamState(this, stream); |
251 return _addStreamState.addStreamFuture; | 251 return _addStreamState.addStreamFuture; |
252 } | 252 } |
253 | 253 |
254 // _EventSink interface, called from AddStreamState. | 254 // _EventSink interface, called from AddStreamState. |
255 void _add(T data) { | 255 void _add(T data) { |
256 _sendData(data); | 256 _sendData(data); |
257 } | 257 } |
258 | 258 |
259 void _addError(Object error) { | 259 void _addError(Object error, StackTrace stackTrace) { |
260 assert(_isAddingStream); | 260 assert(_isAddingStream); |
261 _sendError(error); | 261 _sendError(error, stackTrace); |
262 } | 262 } |
263 | 263 |
264 void _close() { | 264 void _close() { |
265 assert(_isAddingStream); | 265 assert(_isAddingStream); |
266 _AddStreamState addState = _addStreamState; | 266 _AddStreamState addState = _addStreamState; |
267 _addStreamState = null; | 267 _addStreamState = null; |
268 _state &= ~_STATE_ADDSTREAM; | 268 _state &= ~_STATE_ADDSTREAM; |
269 addState.complete(); | 269 addState.complete(); |
270 } | 270 } |
271 | 271 |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
326 | 326 |
327 // EventDispatch interface. | 327 // EventDispatch interface. |
328 | 328 |
329 void _sendData(T data) { | 329 void _sendData(T data) { |
330 if (_isEmpty) return; | 330 if (_isEmpty) return; |
331 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 331 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
332 subscription._add(data); | 332 subscription._add(data); |
333 }); | 333 }); |
334 } | 334 } |
335 | 335 |
336 void _sendError(Object error) { | 336 void _sendError(Object error, StackTrace stackTrace) { |
337 if (_isEmpty) return; | 337 if (_isEmpty) return; |
338 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 338 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
339 subscription._addError(error); | 339 subscription._addError(error, stackTrace); |
340 }); | 340 }); |
341 } | 341 } |
342 | 342 |
343 void _sendDone() { | 343 void _sendDone() { |
344 if (!_isEmpty) { | 344 if (!_isEmpty) { |
345 _forEachListener((_BroadcastSubscription<T> subscription) { | 345 _forEachListener((_BroadcastSubscription<T> subscription) { |
346 subscription._close(); | 346 subscription._close(); |
347 }); | 347 }); |
348 } else { | 348 } else { |
349 assert(_doneFuture != null); | 349 assert(_doneFuture != null); |
(...skipping 11 matching lines...) Expand all Loading... |
361 | 361 |
362 void _sendData(T data) { | 362 void _sendData(T data) { |
363 for (_BroadcastSubscriptionLink link = _next; | 363 for (_BroadcastSubscriptionLink link = _next; |
364 !identical(link, this); | 364 !identical(link, this); |
365 link = link._next) { | 365 link = link._next) { |
366 _BroadcastSubscription<T> subscription = link; | 366 _BroadcastSubscription<T> subscription = link; |
367 subscription._addPending(new _DelayedData(data)); | 367 subscription._addPending(new _DelayedData(data)); |
368 } | 368 } |
369 } | 369 } |
370 | 370 |
371 void _sendError(Object error) { | 371 void _sendError(Object error, StackTrace stackTrace) { |
372 for (_BroadcastSubscriptionLink link = _next; | 372 for (_BroadcastSubscriptionLink link = _next; |
373 !identical(link, this); | 373 !identical(link, this); |
374 link = link._next) { | 374 link = link._next) { |
375 _BroadcastSubscription<T> subscription = link; | 375 _BroadcastSubscription<T> subscription = link; |
376 subscription._addPending(new _DelayedError(error)); | 376 subscription._addPending(new _DelayedError(error, stackTrace)); |
377 } | 377 } |
378 } | 378 } |
379 | 379 |
380 void _sendDone() { | 380 void _sendDone() { |
381 if (!_isEmpty) { | 381 if (!_isEmpty) { |
382 for (_BroadcastSubscriptionLink link = _next; | 382 for (_BroadcastSubscriptionLink link = _next; |
383 !identical(link, this); | 383 !identical(link, this); |
384 link = link._next) { | 384 link = link._next) { |
385 _BroadcastSubscription<T> subscription = link; | 385 _BroadcastSubscription<T> subscription = link; |
386 subscription._addPending(const _DelayedDone()); | 386 subscription._addPending(const _DelayedDone()); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
427 return; | 427 return; |
428 } | 428 } |
429 super.add(data); | 429 super.add(data); |
430 while (_hasPending) { | 430 while (_hasPending) { |
431 _pending.handleNext(this); | 431 _pending.handleNext(this); |
432 } | 432 } |
433 } | 433 } |
434 | 434 |
435 void addError(Object error, [StackTrace stackTrace]) { | 435 void addError(Object error, [StackTrace stackTrace]) { |
436 if (!isClosed && _isFiring) { | 436 if (!isClosed && _isFiring) { |
437 _addPendingEvent(new _DelayedError(error)); | 437 _addPendingEvent(new _DelayedError(error, stackTrace)); |
438 return; | 438 return; |
439 } | 439 } |
440 super.addError(error, stackTrace); | 440 super.addError(error, stackTrace); |
441 while (_hasPending) { | 441 while (_hasPending) { |
442 _pending.handleNext(this); | 442 _pending.handleNext(this); |
443 } | 443 } |
444 } | 444 } |
445 | 445 |
446 Future close() { | 446 Future close() { |
447 if (!isClosed && _isFiring) { | 447 if (!isClosed && _isFiring) { |
(...skipping 13 matching lines...) Expand all Loading... |
461 } | 461 } |
462 super._callOnCancel(); | 462 super._callOnCancel(); |
463 } | 463 } |
464 } | 464 } |
465 | 465 |
466 // A subscription that never receives any events. | 466 // A subscription that never receives any events. |
467 // It can simulate pauses, but otherwise does nothing. | 467 // It can simulate pauses, but otherwise does nothing. |
468 class _DoneSubscription<T> implements StreamSubscription<T> { | 468 class _DoneSubscription<T> implements StreamSubscription<T> { |
469 int _pauseCount = 0; | 469 int _pauseCount = 0; |
470 void onData(void handleData(T data)) {} | 470 void onData(void handleData(T data)) {} |
471 void onError(void handleErrr(Object error)) {} | 471 void onError(Function handleError) {} |
472 void onDone(void handleDone()) {} | 472 void onDone(void handleDone()) {} |
473 void pause([Future resumeSignal]) { | 473 void pause([Future resumeSignal]) { |
474 if (resumeSignal != null) resumeSignal.then(_resume); | 474 if (resumeSignal != null) resumeSignal.then(_resume); |
475 _pauseCount++; | 475 _pauseCount++; |
476 } | 476 } |
477 void resume() { _resume(null); } | 477 void resume() { _resume(null); } |
478 void _resume(_) { | 478 void _resume(_) { |
479 if (_pauseCount > 0) _pauseCount--; | 479 if (_pauseCount > 0) _pauseCount--; |
480 } | 480 } |
481 void cancel() {} | 481 void cancel() {} |
482 bool get isPaused => _pauseCount > 0; | 482 bool get isPaused => _pauseCount > 0; |
483 Future asFuture([Object value]) => new _Future(); | 483 Future asFuture([Object value]) => new _Future(); |
484 } | 484 } |
OLD | NEW |