OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
8 final int _transferLength; | 8 final int _transferLength; |
9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
(...skipping 528 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
539 class _HttpOutboundConsumer implements StreamConsumer { | 539 class _HttpOutboundConsumer implements StreamConsumer { |
540 final _HttpOutboundMessage _outbound; | 540 final _HttpOutboundMessage _outbound; |
541 StreamController _controller; | 541 StreamController _controller; |
542 StreamSubscription _subscription; | 542 StreamSubscription _subscription; |
543 Completer _closeCompleter = new Completer(); | 543 Completer _closeCompleter = new Completer(); |
544 Completer _completer; | 544 Completer _completer; |
545 | 545 |
546 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); | 546 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); |
547 | 547 |
548 void _onPause() { | 548 void _onPause() { |
549 if (_controller.isPaused) { | 549 _subscription.pause(); |
550 _subscription.pause(); | 550 } |
551 } else { | 551 void _onResume() { |
552 _subscription.resume(); | 552 _subscription.resume(); |
553 } | |
554 } | 553 } |
555 | 554 |
556 void _onListen() { | 555 void _onCancel() { |
557 if (!_controller.hasListener && _subscription != null) { | 556 if (_subscription != null) { |
558 _subscription.cancel(); | 557 StreamSubscription subscription = _subscription; |
558 _subscription = null; | |
559 subscription.cancel(); | |
559 } | 560 } |
560 } | 561 } |
561 | 562 |
562 _ensureController() { | 563 _ensureController() { |
563 if (_controller != null) return; | 564 if (_controller != null) return; |
564 _controller = new StreamController(onPause: _onPause, | 565 _controller = new StreamController(onPause: _onPause, |
565 onResume: _onPause, | 566 onResume: _onResume, |
566 onListen: _onListen, | 567 onCancel: _onCancel); |
567 onCancel: _onListen); | |
568 _outbound._addStream(_controller.stream) | 568 _outbound._addStream(_controller.stream) |
569 .then((_) { | 569 .then((_) { |
570 _onListen(); // Make sure we unsubscribe. | 570 _onCancel(); // Make sure we unsubscribe. |
571 _done(); | 571 _done(); |
572 _closeCompleter.complete(_outbound); | 572 _closeCompleter.complete(_outbound); |
573 }, | 573 }, |
574 onError: (error) { | 574 onError: (error) { |
575 if (!_done(error)) { | 575 if (!_done(error)) { |
576 _closeCompleter.completeError(error); | 576 _closeCompleter.completeError(error); |
577 } | 577 } |
578 }); | 578 }); |
579 } | 579 } |
580 | 580 |
581 bool _done([error]) { | 581 bool _done([error]) { |
582 if (_completer == null) return false; | 582 if (_completer == null) return false; |
583 if (error != null) { | 583 if (error != null) { |
584 _completer.completeError(error); | 584 _completer.completeError(error); |
585 } else { | 585 } else { |
586 _completer.complete(_outbound); | 586 _completer.complete(_outbound); |
587 } | 587 } |
588 _completer = null; | 588 _completer = null; |
589 return true; | 589 return true; |
590 } | 590 } |
591 | 591 |
592 Future addStream(var stream) { | 592 Future addStream(var stream) { |
593 _ensureController(); | 593 _ensureController(); |
594 _completer = new Completer(); | 594 _completer = new Completer(); |
595 Future result = _completer.future; | |
Anders Johnsen
2013/05/22 13:07:08
Nooo... :)
floitsch
2013/05/22 16:26:29
Apparently this was necessary.
Add comment. (it's
Lasse Reichstein Nielsen
2013/05/24 06:02:49
It should not be necessary any more after I preven
| |
595 _subscription = stream.listen( | 596 _subscription = stream.listen( |
596 (data) { | 597 (data) { |
597 _controller.add(data); | 598 _controller.add(data); |
598 }, | 599 }, |
599 onDone: () { | 600 onDone: () { |
600 _done(); | 601 _done(); |
601 }, | 602 }, |
602 onError: (error) { | 603 onError: (error) { |
603 _done(error); | 604 _done(error); |
604 }, | 605 }, |
605 cancelOnError: true); | 606 cancelOnError: true); |
606 return _completer.future; | 607 return result; |
607 } | 608 } |
608 | 609 |
609 Future close() { | 610 Future close() { |
610 Future closeOutbound() { | 611 Future closeOutbound() { |
611 return _outbound._close().then((_) => _outbound); | 612 return _outbound._close().then((_) => _outbound); |
612 } | 613 } |
613 if (_controller == null) return closeOutbound(); | 614 if (_controller == null) return closeOutbound(); |
614 _controller.close(); | 615 _controller.close(); |
615 return _closeCompleter.future.then((_) => closeOutbound()); | 616 return _closeCompleter.future.then((_) => closeOutbound()); |
616 } | 617 } |
(...skipping 1695 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
2312 | 2313 |
2313 | 2314 |
2314 class _RedirectInfo implements RedirectInfo { | 2315 class _RedirectInfo implements RedirectInfo { |
2315 const _RedirectInfo(int this.statusCode, | 2316 const _RedirectInfo(int this.statusCode, |
2316 String this.method, | 2317 String this.method, |
2317 Uri this.location); | 2318 Uri this.location); |
2318 final int statusCode; | 2319 final int statusCode; |
2319 final String method; | 2320 final String method; |
2320 final Uri location; | 2321 final Uri location; |
2321 } | 2322 } |
OLD | NEW |