| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 class _BroadcastStream<T> extends _ControllerStream<T> { | 7 class _BroadcastStream<T> extends _ControllerStream<T> { |
| 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); | 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); |
| 9 | 9 |
| 10 bool get isBroadcast => true; | 10 bool get isBroadcast => true; |
| (...skipping 12 matching lines...) Expand all Loading... |
| 23 // TODO(lrn): Use the _state field on _ControllerSubscription to | 23 // TODO(lrn): Use the _state field on _ControllerSubscription to |
| 24 // also store this state. Requires that the subscription implementation | 24 // also store this state. Requires that the subscription implementation |
| 25 // does not assume that it's use of the state integer is the only use. | 25 // does not assume that it's use of the state integer is the only use. |
| 26 int _eventState; | 26 int _eventState; |
| 27 | 27 |
| 28 _BroadcastSubscriptionLink _next; | 28 _BroadcastSubscriptionLink _next; |
| 29 _BroadcastSubscriptionLink _previous; | 29 _BroadcastSubscriptionLink _previous; |
| 30 | 30 |
| 31 _BroadcastSubscription(_StreamControllerLifecycle controller, | 31 _BroadcastSubscription(_StreamControllerLifecycle controller, |
| 32 void onData(T data), | 32 void onData(T data), |
| 33 void onError(Object error), | 33 Function onError, |
| 34 void onDone(), | 34 void onDone(), |
| 35 bool cancelOnError) | 35 bool cancelOnError) |
| 36 : super(controller, onData, onError, onDone, cancelOnError) { | 36 : super(controller, onData, onError, onDone, cancelOnError) { |
| 37 _next = _previous = this; | 37 _next = _previous = this; |
| 38 } | 38 } |
| 39 | 39 |
| 40 _BroadcastStreamController get _controller => super._controller; | 40 _BroadcastStreamController get _controller => super._controller; |
| 41 | 41 |
| 42 bool _expectsEvent(int eventId) => | 42 bool _expectsEvent(int eventId) => |
| 43 (_eventState & _STATE_EVENT_ID) == eventId; | 43 (_eventState & _STATE_EVENT_ID) == eventId; |
| (...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 166 _BroadcastSubscriptionLink previous = subscription._previous; | 166 _BroadcastSubscriptionLink previous = subscription._previous; |
| 167 _BroadcastSubscriptionLink next = subscription._next; | 167 _BroadcastSubscriptionLink next = subscription._next; |
| 168 previous._next = next; | 168 previous._next = next; |
| 169 next._previous = previous; | 169 next._previous = previous; |
| 170 subscription._next = subscription._previous = subscription; | 170 subscription._next = subscription._previous = subscription; |
| 171 } | 171 } |
| 172 | 172 |
| 173 // _StreamControllerLifecycle interface. | 173 // _StreamControllerLifecycle interface. |
| 174 | 174 |
| 175 StreamSubscription<T> _subscribe(void onData(T data), | 175 StreamSubscription<T> _subscribe(void onData(T data), |
| 176 void onError(Object error), | 176 Function onError, |
| 177 void onDone(), | 177 void onDone(), |
| 178 bool cancelOnError) { | 178 bool cancelOnError) { |
| 179 if (isClosed) { | 179 if (isClosed) { |
| 180 throw new StateError("Subscribing to closed stream"); | 180 throw new StateError("Subscribing to closed stream"); |
| 181 } | 181 } |
| 182 StreamSubscription subscription = new _BroadcastSubscription<T>( | 182 StreamSubscription subscription = new _BroadcastSubscription<T>( |
| 183 this, onData, onError, onDone, cancelOnError); | 183 this, onData, onError, onDone, cancelOnError); |
| 184 _addListener(subscription); | 184 _addListener(subscription); |
| 185 if (identical(_next, _previous)) { | 185 if (identical(_next, _previous)) { |
| 186 // Only one listener, so it must be the first listener. | 186 // Only one listener, so it must be the first listener. |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 220 } | 220 } |
| 221 | 221 |
| 222 void add(T data) { | 222 void add(T data) { |
| 223 if (!_mayAddEvent) throw _addEventError(); | 223 if (!_mayAddEvent) throw _addEventError(); |
| 224 _sendData(data); | 224 _sendData(data); |
| 225 } | 225 } |
| 226 | 226 |
| 227 void addError(Object error, [Object stackTrace]) { | 227 void addError(Object error, [Object stackTrace]) { |
| 228 if (!_mayAddEvent) throw _addEventError(); | 228 if (!_mayAddEvent) throw _addEventError(); |
| 229 if (stackTrace != null) _attachStackTrace(error, stackTrace); | 229 if (stackTrace != null) _attachStackTrace(error, stackTrace); |
| 230 _sendError(error); | 230 _sendError(error, stackTrace); |
| 231 } | 231 } |
| 232 | 232 |
| 233 Future close() { | 233 Future close() { |
| 234 if (isClosed) { | 234 if (isClosed) { |
| 235 assert(_doneFuture != null); | 235 assert(_doneFuture != null); |
| 236 return _doneFuture; | 236 return _doneFuture; |
| 237 } | 237 } |
| 238 if (!_mayAddEvent) throw _addEventError(); | 238 if (!_mayAddEvent) throw _addEventError(); |
| 239 _state |= _STATE_CLOSED; | 239 _state |= _STATE_CLOSED; |
| 240 Future doneFuture = _ensureDoneFuture(); | 240 Future doneFuture = _ensureDoneFuture(); |
| 241 _sendDone(); | 241 _sendDone(); |
| 242 return doneFuture; | 242 return doneFuture; |
| 243 } | 243 } |
| 244 | 244 |
| 245 Future get done => _ensureDoneFuture(); | 245 Future get done => _ensureDoneFuture(); |
| 246 | 246 |
| 247 Future addStream(Stream<T> stream) { | 247 Future addStream(Stream<T> stream) { |
| 248 if (!_mayAddEvent) throw _addEventError(); | 248 if (!_mayAddEvent) throw _addEventError(); |
| 249 _state |= _STATE_ADDSTREAM; | 249 _state |= _STATE_ADDSTREAM; |
| 250 _addStreamState = new _AddStreamState(this, stream); | 250 _addStreamState = new _AddStreamState(this, stream); |
| 251 return _addStreamState.addStreamFuture; | 251 return _addStreamState.addStreamFuture; |
| 252 } | 252 } |
| 253 | 253 |
| 254 // _EventSink interface, called from AddStreamState. | 254 // _EventSink interface, called from AddStreamState. |
| 255 void _add(T data) { | 255 void _add(T data) { |
| 256 _sendData(data); | 256 _sendData(data); |
| 257 } | 257 } |
| 258 | 258 |
| 259 void _addError(Object error) { | 259 void _addError(Object error, StackTrace stackTrace) { |
| 260 assert(_isAddingStream); | 260 assert(_isAddingStream); |
| 261 _sendError(error); | 261 _sendError(error, stackTrace); |
| 262 } | 262 } |
| 263 | 263 |
| 264 void _close() { | 264 void _close() { |
| 265 assert(_isAddingStream); | 265 assert(_isAddingStream); |
| 266 _AddStreamState addState = _addStreamState; | 266 _AddStreamState addState = _addStreamState; |
| 267 _addStreamState = null; | 267 _addStreamState = null; |
| 268 _state &= ~_STATE_ADDSTREAM; | 268 _state &= ~_STATE_ADDSTREAM; |
| 269 addState.complete(); | 269 addState.complete(); |
| 270 } | 270 } |
| 271 | 271 |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 326 | 326 |
| 327 // EventDispatch interface. | 327 // EventDispatch interface. |
| 328 | 328 |
| 329 void _sendData(T data) { | 329 void _sendData(T data) { |
| 330 if (_isEmpty) return; | 330 if (_isEmpty) return; |
| 331 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 331 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 332 subscription._add(data); | 332 subscription._add(data); |
| 333 }); | 333 }); |
| 334 } | 334 } |
| 335 | 335 |
| 336 void _sendError(Object error) { | 336 void _sendError(Object error, StackTrace stackTrace) { |
| 337 if (_isEmpty) return; | 337 if (_isEmpty) return; |
| 338 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 338 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 339 subscription._addError(error); | 339 subscription._addError(error, stackTrace); |
| 340 }); | 340 }); |
| 341 } | 341 } |
| 342 | 342 |
| 343 void _sendDone() { | 343 void _sendDone() { |
| 344 if (!_isEmpty) { | 344 if (!_isEmpty) { |
| 345 _forEachListener((_BroadcastSubscription<T> subscription) { | 345 _forEachListener((_BroadcastSubscription<T> subscription) { |
| 346 subscription._close(); | 346 subscription._close(); |
| 347 }); | 347 }); |
| 348 } else { | 348 } else { |
| 349 assert(_doneFuture != null); | 349 assert(_doneFuture != null); |
| (...skipping 11 matching lines...) Expand all Loading... |
| 361 | 361 |
| 362 void _sendData(T data) { | 362 void _sendData(T data) { |
| 363 for (_BroadcastSubscriptionLink link = _next; | 363 for (_BroadcastSubscriptionLink link = _next; |
| 364 !identical(link, this); | 364 !identical(link, this); |
| 365 link = link._next) { | 365 link = link._next) { |
| 366 _BroadcastSubscription<T> subscription = link; | 366 _BroadcastSubscription<T> subscription = link; |
| 367 subscription._addPending(new _DelayedData(data)); | 367 subscription._addPending(new _DelayedData(data)); |
| 368 } | 368 } |
| 369 } | 369 } |
| 370 | 370 |
| 371 void _sendError(Object error) { | 371 void _sendError(Object error, StackTrace stackTrace) { |
| 372 for (_BroadcastSubscriptionLink link = _next; | 372 for (_BroadcastSubscriptionLink link = _next; |
| 373 !identical(link, this); | 373 !identical(link, this); |
| 374 link = link._next) { | 374 link = link._next) { |
| 375 _BroadcastSubscription<T> subscription = link; | 375 _BroadcastSubscription<T> subscription = link; |
| 376 subscription._addPending(new _DelayedError(error)); | 376 subscription._addPending(new _DelayedError(error, stackTrace)); |
| 377 } | 377 } |
| 378 } | 378 } |
| 379 | 379 |
| 380 void _sendDone() { | 380 void _sendDone() { |
| 381 if (!_isEmpty) { | 381 if (!_isEmpty) { |
| 382 for (_BroadcastSubscriptionLink link = _next; | 382 for (_BroadcastSubscriptionLink link = _next; |
| 383 !identical(link, this); | 383 !identical(link, this); |
| 384 link = link._next) { | 384 link = link._next) { |
| 385 _BroadcastSubscription<T> subscription = link; | 385 _BroadcastSubscription<T> subscription = link; |
| 386 subscription._addPending(const _DelayedDone()); | 386 subscription._addPending(const _DelayedDone()); |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 427 return; | 427 return; |
| 428 } | 428 } |
| 429 super.add(data); | 429 super.add(data); |
| 430 while (_hasPending) { | 430 while (_hasPending) { |
| 431 _pending.handleNext(this); | 431 _pending.handleNext(this); |
| 432 } | 432 } |
| 433 } | 433 } |
| 434 | 434 |
| 435 void addError(Object error, [StackTrace stackTrace]) { | 435 void addError(Object error, [StackTrace stackTrace]) { |
| 436 if (!isClosed && _isFiring) { | 436 if (!isClosed && _isFiring) { |
| 437 _addPendingEvent(new _DelayedError(error)); | 437 _addPendingEvent(new _DelayedError(error, stackTrace)); |
| 438 return; | 438 return; |
| 439 } | 439 } |
| 440 super.addError(error, stackTrace); | 440 super.addError(error, stackTrace); |
| 441 while (_hasPending) { | 441 while (_hasPending) { |
| 442 _pending.handleNext(this); | 442 _pending.handleNext(this); |
| 443 } | 443 } |
| 444 } | 444 } |
| 445 | 445 |
| 446 Future close() { | 446 Future close() { |
| 447 if (!isClosed && _isFiring) { | 447 if (!isClosed && _isFiring) { |
| (...skipping 13 matching lines...) Expand all Loading... |
| 461 } | 461 } |
| 462 super._callOnCancel(); | 462 super._callOnCancel(); |
| 463 } | 463 } |
| 464 } | 464 } |
| 465 | 465 |
| 466 // A subscription that never receives any events. | 466 // A subscription that never receives any events. |
| 467 // It can simulate pauses, but otherwise does nothing. | 467 // It can simulate pauses, but otherwise does nothing. |
| 468 class _DoneSubscription<T> implements StreamSubscription<T> { | 468 class _DoneSubscription<T> implements StreamSubscription<T> { |
| 469 int _pauseCount = 0; | 469 int _pauseCount = 0; |
| 470 void onData(void handleData(T data)) {} | 470 void onData(void handleData(T data)) {} |
| 471 void onError(void handleErrr(Object error)) {} | 471 void onError(Function handleError) {} |
| 472 void onDone(void handleDone()) {} | 472 void onDone(void handleDone()) {} |
| 473 void pause([Future resumeSignal]) { | 473 void pause([Future resumeSignal]) { |
| 474 if (resumeSignal != null) resumeSignal.then(_resume); | 474 if (resumeSignal != null) resumeSignal.then(_resume); |
| 475 _pauseCount++; | 475 _pauseCount++; |
| 476 } | 476 } |
| 477 void resume() { _resume(null); } | 477 void resume() { _resume(null); } |
| 478 void _resume(_) { | 478 void _resume(_) { |
| 479 if (_pauseCount > 0) _pauseCount--; | 479 if (_pauseCount > 0) _pauseCount--; |
| 480 } | 480 } |
| 481 void cancel() {} | 481 void cancel() {} |
| 482 bool get isPaused => _pauseCount > 0; | 482 bool get isPaused => _pauseCount > 0; |
| 483 Future asFuture([Object value]) => new _Future(); | 483 Future asFuture([Object value]) => new _Future(); |
| 484 } | 484 } |
| OLD | NEW |