Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(743)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 528 matching lines...) Expand 10 before | Expand all | Expand 10 after
539 class _HttpOutboundConsumer implements StreamConsumer { 539 class _HttpOutboundConsumer implements StreamConsumer {
540 final _HttpOutboundMessage _outbound; 540 final _HttpOutboundMessage _outbound;
541 StreamController _controller; 541 StreamController _controller;
542 StreamSubscription _subscription; 542 StreamSubscription _subscription;
543 Completer _closeCompleter = new Completer(); 543 Completer _closeCompleter = new Completer();
544 Completer _completer; 544 Completer _completer;
545 545
546 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); 546 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound);
547 547
548 void _onPause() { 548 void _onPause() {
549 if (_controller.isPaused) { 549 _subscription.pause();
550 _subscription.pause(); 550 }
551 } else { 551 void _onResume() {
552 _subscription.resume(); 552 _subscription.resume();
553 }
554 } 553 }
555 554
556 void _onListen() { 555 void _onCancel() {
557 if (!_controller.hasListener && _subscription != null) { 556 if (_subscription != null) {
558 _subscription.cancel(); 557 StreamSubscription subscription = _subscription;
558 _subscription = null;
559 subscription.cancel();
559 } 560 }
560 } 561 }
561 562
562 _ensureController() { 563 _ensureController() {
563 if (_controller != null) return; 564 if (_controller != null) return;
564 _controller = new StreamController(onPause: _onPause, 565 _controller = new StreamController(onPause: _onPause,
565 onResume: _onPause, 566 onResume: _onResume,
566 onListen: _onListen, 567 onCancel: _onCancel);
567 onCancel: _onListen);
568 _outbound._addStream(_controller.stream) 568 _outbound._addStream(_controller.stream)
569 .then((_) { 569 .then((_) {
570 _onListen(); // Make sure we unsubscribe. 570 _onCancel(); // Make sure we unsubscribe.
571 _done(); 571 _done();
572 _closeCompleter.complete(_outbound); 572 _closeCompleter.complete(_outbound);
573 }, 573 },
574 onError: (error) { 574 onError: (error) {
575 if (!_done(error)) { 575 if (!_done(error)) {
576 _closeCompleter.completeError(error); 576 _closeCompleter.completeError(error);
577 } 577 }
578 }); 578 });
579 } 579 }
580 580
581 bool _done([error]) { 581 bool _done([error]) {
582 if (_completer == null) return false; 582 if (_completer == null) return false;
583 if (error != null) { 583 if (error != null) {
584 _completer.completeError(error); 584 _completer.completeError(error);
585 } else { 585 } else {
586 _completer.complete(_outbound); 586 _completer.complete(_outbound);
587 } 587 }
588 _completer = null; 588 _completer = null;
589 return true; 589 return true;
590 } 590 }
591 591
592 Future addStream(var stream) { 592 Future addStream(var stream) {
593 _ensureController(); 593 _ensureController();
594 _completer = new Completer(); 594 _completer = new Completer();
595 Future result = _completer.future;
Anders Johnsen 2013/05/22 13:07:08 Nooo... :)
floitsch 2013/05/22 16:26:29 Apparently this was necessary. Add comment. (it's
Lasse Reichstein Nielsen 2013/05/24 06:02:49 It should not be necessary any more after I preven
595 _subscription = stream.listen( 596 _subscription = stream.listen(
596 (data) { 597 (data) {
597 _controller.add(data); 598 _controller.add(data);
598 }, 599 },
599 onDone: () { 600 onDone: () {
600 _done(); 601 _done();
601 }, 602 },
602 onError: (error) { 603 onError: (error) {
603 _done(error); 604 _done(error);
604 }, 605 },
605 cancelOnError: true); 606 cancelOnError: true);
606 return _completer.future; 607 return result;
607 } 608 }
608 609
609 Future close() { 610 Future close() {
610 Future closeOutbound() { 611 Future closeOutbound() {
611 return _outbound._close().then((_) => _outbound); 612 return _outbound._close().then((_) => _outbound);
612 } 613 }
613 if (_controller == null) return closeOutbound(); 614 if (_controller == null) return closeOutbound();
614 _controller.close(); 615 _controller.close();
615 return _closeCompleter.future.then((_) => closeOutbound()); 616 return _closeCompleter.future.then((_) => closeOutbound());
616 } 617 }
(...skipping 1695 matching lines...) Expand 10 before | Expand all | Expand 10 after
2312 2313
2313 2314
2314 class _RedirectInfo implements RedirectInfo { 2315 class _RedirectInfo implements RedirectInfo {
2315 const _RedirectInfo(int this.statusCode, 2316 const _RedirectInfo(int this.statusCode,
2316 String this.method, 2317 String this.method,
2317 Uri this.location); 2318 Uri this.location);
2318 final int statusCode; 2319 final int statusCode;
2319 final String method; 2320 final String method;
2320 final Uri location; 2321 final Uri location;
2321 } 2322 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698