Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(704)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 15842004: Support auto-drain of HttpRequest data. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Add documentation. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
11 11
12 bool fullBodyRead = false; 12 bool fullBodyRead = false;
13 13
14 // Common properties. 14 // Common properties.
15 final _HttpHeaders headers; 15 final _HttpHeaders headers;
16 bool upgraded = false; 16 bool upgraded = false;
17 17
18 // ClientResponse properties. 18 // ClientResponse properties.
19 int statusCode; 19 int statusCode;
20 String reasonPhrase; 20 String reasonPhrase;
21 21
22 // Request properties. 22 // Request properties.
23 String method; 23 String method;
24 Uri uri; 24 Uri uri;
25 25
26 bool hasSubscriber = false;
27
26 // The transfer length if the length of the message body as it 28 // The transfer length if the length of the message body as it
27 // appears in the message (RFC 2616 section 4.4). This can be -1 if 29 // appears in the message (RFC 2616 section 4.4). This can be -1 if
28 // the length of the massage body is not known due to transfer 30 // the length of the massage body is not known due to transfer
29 // codings. 31 // codings.
30 int get transferLength => _transferLength; 32 int get transferLength => _transferLength;
31 33
32 _HttpIncoming(_HttpHeaders this.headers, 34 _HttpIncoming(_HttpHeaders this.headers,
33 int this._transferLength, 35 int this._transferLength,
34 Stream<List<int>> this._stream) { 36 Stream<List<int>> this._stream) {
35 } 37 }
36 38
37 StreamSubscription<List<int>> listen(void onData(List<int> event), 39 StreamSubscription<List<int>> listen(void onData(List<int> event),
38 {void onError(error), 40 {void onError(error),
39 void onDone(), 41 void onDone(),
40 bool cancelOnError}) { 42 bool cancelOnError}) {
43 hasSubscriber = true;
41 return _stream.listen(onData, 44 return _stream.listen(onData,
42 onError: onError, 45 onError: onError,
43 onDone: onDone, 46 onDone: onDone,
44 cancelOnError: cancelOnError); 47 cancelOnError: cancelOnError);
45 } 48 }
46 49
47 // Is completed once all data have been received. 50 // Is completed once all data have been received.
48 Future get dataDone => _dataCompleter.future; 51 Future get dataDone => _dataCompleter.future;
49 52
50 void close(bool closing) { 53 void close(bool closing) {
51 fullBodyRead = true; 54 fullBodyRead = true;
55 hasSubscriber = true;
52 _dataCompleter.complete(closing); 56 _dataCompleter.complete(closing);
53 } 57 }
54 } 58 }
55 59
56 abstract class _HttpInboundMessage extends Stream<List<int>> { 60 abstract class _HttpInboundMessage extends Stream<List<int>> {
57 final _HttpIncoming _incoming; 61 final _HttpIncoming _incoming;
58 List<Cookie> _cookies; 62 List<Cookie> _cookies;
59 63
60 _HttpInboundMessage(_HttpIncoming this._incoming); 64 _HttpInboundMessage(_HttpIncoming this._incoming);
61 65
(...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after
267 bool get _shouldAuthenticate { 271 bool get _shouldAuthenticate {
268 // Only try to authenticate if there is a challenge in the response. 272 // Only try to authenticate if there is a challenge in the response.
269 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; 273 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE];
270 return statusCode == HttpStatus.UNAUTHORIZED && 274 return statusCode == HttpStatus.UNAUTHORIZED &&
271 challenge != null && challenge.length == 1; 275 challenge != null && challenge.length == 1;
272 } 276 }
273 277
274 Future<HttpClientResponse> _authenticate(bool proxyAuth) { 278 Future<HttpClientResponse> _authenticate(bool proxyAuth) {
275 Future<HttpClientResponse> retry() { 279 Future<HttpClientResponse> retry() {
276 // Drain body and retry. 280 // Drain body and retry.
277 return fold(null, (x, y) {}).then((_) { 281 return drain().then((_) {
278 return _httpClient._openUrlFromRequest(_httpRequest.method, 282 return _httpClient._openUrlFromRequest(_httpRequest.method,
279 _httpRequest.uri, 283 _httpRequest.uri,
280 _httpRequest) 284 _httpRequest)
281 .then((request) => request.close()); 285 .then((request) => request.close());
282 }); 286 });
283 } 287 }
284 288
285 List<String> authChallenge() { 289 List<String> authChallenge() {
286 if (proxyAuth) { 290 if (proxyAuth) {
287 return headers[HttpHeaders.PROXY_AUTHENTICATE]; 291 return headers[HttpHeaders.PROXY_AUTHENTICATE];
(...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after
463 Future<T> addStream(Stream<List<int>> stream) { 467 Future<T> addStream(Stream<List<int>> stream) {
464 return _dataSink.addStream(stream); 468 return _dataSink.addStream(stream);
465 } 469 }
466 470
467 Future close() { 471 Future close() {
468 return _dataSink.close(); 472 return _dataSink.close();
469 } 473 }
470 474
471 Future<T> get done => _dataSink.done; 475 Future<T> get done => _dataSink.done;
472 476
473 void _writeHeaders() { 477 Future _writeHeaders({drainRequest: true}) {
474 if (_headersWritten) return; 478 if (_headersWritten) return new Future.value();
475 _headersWritten = true; 479 _headersWritten = true;
476 headers._synchronize(); // Be sure the 'chunked' option is updated. 480 headers._synchronize(); // Be sure the 'chunked' option is updated.
477 bool isServerSide = this is _HttpResponse; 481 bool isServerSide = this is _HttpResponse;
478 if (isServerSide && headers.chunkedTransferEncoding) { 482 if (isServerSide) {
479 var response = this; 483 var response = this;
480 List acceptEncodings = 484 if (headers.chunkedTransferEncoding) {
481 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; 485 List acceptEncodings =
482 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; 486 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
483 if (acceptEncodings != null && 487 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
484 acceptEncodings 488 if (acceptEncodings != null &&
485 .expand((list) => list.split(",")) 489 acceptEncodings
486 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && 490 .expand((list) => list.split(","))
487 contentEncoding == null) { 491 .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
488 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); 492 contentEncoding == null) {
489 _asGZip = true; 493 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
494 _asGZip = true;
495 }
496 }
497 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) {
498 return response._httpRequest.drain()
499 // TODO(ajohnsen): Timeout on drain?
500 .catchError((_) {}) // Ignore errors.
501 .then((_) => _writeHeader());
490 } 502 }
491 } 503 }
492 _writeHeader(); 504 return new Future.sync(_writeHeader);
493 } 505 }
494 506
495 Future _addStream(Stream<List<int>> stream) { 507 Future _addStream(Stream<List<int>> stream) {
496 _writeHeaders(); 508 return _writeHeaders()
497 int contentLength = headers.contentLength; 509 .then((_) {
498 if (_ignoreBody) { 510 int contentLength = headers.contentLength;
499 stream.fold(null, (x, y) {}).catchError((_) {}); 511 if (_ignoreBody) {
500 return _headersSink.close(); 512 stream.drain().catchError((_) {});
501 } 513 return _headersSink.close();
502 stream = stream.transform(new _BufferTransformer()); 514 }
503 if (headers.chunkedTransferEncoding) { 515 stream = stream.transform(new _BufferTransformer());
504 if (_asGZip) { 516 if (headers.chunkedTransferEncoding) {
505 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); 517 if (_asGZip) {
506 } 518 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
507 stream = stream.transform(new _ChunkedTransformer()); 519 }
508 } else if (contentLength >= 0) { 520 stream = stream.transform(new _ChunkedTransformer());
509 stream = stream.transform(new _ContentLengthValidator(contentLength)); 521 } else if (contentLength >= 0) {
510 } 522 stream = stream.transform(
511 return _headersSink.addStream(stream); 523 new _ContentLengthValidator(contentLength));
524 }
525 return _headersSink.addStream(stream);
526 });
512 } 527 }
513 528
514 Future _close() { 529 Future _close() {
515 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and 530 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
516 // persistentConnection is not guaranteed to be in sync. 531 // persistentConnection is not guaranteed to be in sync.
517 if (!_headersWritten) { 532 if (!_headersWritten) {
518 if (!_ignoreBody && headers.contentLength == -1) { 533 if (!_ignoreBody && headers.contentLength == -1) {
519 // If no body was written, _ignoreBody is false (it's not a HEAD 534 // If no body was written, _ignoreBody is false (it's not a HEAD
520 // request) and the content-length is unspecified, set contentLength to 535 // request) and the content-length is unspecified, set contentLength to
521 // 0. 536 // 0.
522 headers.chunkedTransferEncoding = false; 537 headers.chunkedTransferEncoding = false;
523 headers.contentLength = 0; 538 headers.contentLength = 0;
524 } else if (!_ignoreBody && headers.contentLength > 0) { 539 } else if (!_ignoreBody && headers.contentLength > 0) {
525 _headersSink.close().catchError((_) {}); 540 _headersSink.close().catchError((_) {});
526 return new Future.error(new HttpException( 541 return new Future.error(new HttpException(
527 "No content while contentLength was specified to be greater " 542 "No content while contentLength was specified to be greater "
528 " than 0: ${headers.contentLength}.")); 543 " than 0: ${headers.contentLength}."));
529 } 544 }
530 } 545 }
531 _writeHeaders(); 546 return _writeHeaders().then((_) => _headersSink.close());
532 return _headersSink.close();
533 } 547 }
534 548
535 void _writeHeader(); // TODO(ajohnsen): Better name. 549 void _writeHeader(); // TODO(ajohnsen): Better name.
536 } 550 }
537 551
538 552
539 class _HttpOutboundConsumer implements StreamConsumer { 553 class _HttpOutboundConsumer implements StreamConsumer {
540 final _HttpOutboundMessage _outbound; 554 final _HttpOutboundMessage _outbound;
541 StreamController _controller; 555 StreamController _controller;
542 StreamSubscription _subscription; 556 StreamSubscription _subscription;
543 Completer _closeCompleter = new Completer(); 557 Completer _closeCompleter = new Completer();
544 Completer _completer; 558 Completer _completer;
545 bool _socketError = false; 559 bool _socketError = false;
546 560
547 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); 561 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound);
548 562
549 void _cancel() { 563 void _cancel() {
550 if (_subscription != null) { 564 if (_subscription != null) {
551 _subscription.cancel(); 565 _subscription.cancel();
552 } 566 }
553 } 567 }
554 568
555 _ensureController() { 569 _ensureController() {
556 if (_controller != null) return; 570 if (_controller != null) return;
557 _controller = new StreamController(onPause: () => _subscription.pause(), 571 _controller = new StreamController(onPause: () => _subscription.pause(),
558 onResume: () => _subscription.resume(), 572 onResume: () => _subscription.resume(),
573 onListen: () => _subscription.resume(),
559 onCancel: _cancel); 574 onCancel: _cancel);
560 _outbound._addStream(_controller.stream) 575 _outbound._addStream(_controller.stream)
561 .then((_) { 576 .then((_) {
562 _cancel(); 577 _cancel();
563 _done(); 578 _done();
564 _closeCompleter.complete(_outbound); 579 _closeCompleter.complete(_outbound);
565 }, 580 },
566 onError: (error) { 581 onError: (error) {
567 _socketError = true; 582 _socketError = true;
568 if (error is SocketIOException && 583 if (error is SocketIOException &&
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
601 (data) { 616 (data) {
602 _controller.add(data); 617 _controller.add(data);
603 }, 618 },
604 onDone: () { 619 onDone: () {
605 _done(); 620 _done();
606 }, 621 },
607 onError: (error) { 622 onError: (error) {
608 _done(error); 623 _done(error);
609 }, 624 },
610 cancelOnError: true); 625 cancelOnError: true);
626 // Pause the first request.
627 if (_controller == null) _subscription.pause();
611 _ensureController(); 628 _ensureController();
612 return _completer.future; 629 return _completer.future;
613 } 630 }
614 631
615 Future close() { 632 Future close() {
616 Future closeOutbound() { 633 Future closeOutbound() {
617 if (_socketError) return new Future.value(_outbound); 634 if (_socketError) return new Future.value(_outbound);
618 return _outbound._close().then((_) => _outbound); 635 return _outbound._close().then((_) => _outbound);
619 } 636 }
620 if (_controller == null) return closeOutbound(); 637 if (_controller == null) return closeOutbound();
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
675 } 692 }
676 693
677 String get reasonPhrase => _findReasonPhrase(statusCode); 694 String get reasonPhrase => _findReasonPhrase(statusCode);
678 void set reasonPhrase(String reasonPhrase) { 695 void set reasonPhrase(String reasonPhrase) {
679 if (_headersWritten) throw new StateError("Header already sent"); 696 if (_headersWritten) throw new StateError("Header already sent");
680 _reasonPhrase = reasonPhrase; 697 _reasonPhrase = reasonPhrase;
681 } 698 }
682 699
683 Future<Socket> detachSocket() { 700 Future<Socket> detachSocket() {
684 if (_headersWritten) throw new StateError("Headers already sent"); 701 if (_headersWritten) throw new StateError("Headers already sent");
685 _writeHeaders();
686 var future = _httpRequest._httpConnection.detachSocket(); 702 var future = _httpRequest._httpConnection.detachSocket();
703 _writeHeaders(drainRequest: false).then((_) => close());
687 // Close connection so the socket is 'free'. 704 // Close connection so the socket is 'free'.
688 close(); 705 close();
689 done.catchError((_) { 706 done.catchError((_) {
690 // Catch any error on done, as they automatically will be 707 // Catch any error on done, as they automatically will be
691 // propagated to the websocket. 708 // propagated to the websocket.
692 }); 709 });
693 return future; 710 return future;
694 } 711 }
695 712
696 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; 713 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after
871 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; 888 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo;
872 889
873 void _onIncoming(_HttpIncoming incoming) { 890 void _onIncoming(_HttpIncoming incoming) {
874 var response = new _HttpClientResponse(incoming, 891 var response = new _HttpClientResponse(incoming,
875 this, 892 this,
876 _httpClient); 893 _httpClient);
877 Future<HttpClientResponse> future; 894 Future<HttpClientResponse> future;
878 if (followRedirects && response.isRedirect) { 895 if (followRedirects && response.isRedirect) {
879 if (response.redirects.length < maxRedirects) { 896 if (response.redirects.length < maxRedirects) {
880 // Redirect and drain response. 897 // Redirect and drain response.
881 future = response.fold(null, (x, y) {}) 898 future = response.drain()
882 .then((_) => response.redirect()); 899 .then((_) => response.redirect());
883 } else { 900 } else {
884 // End with exception, too many redirects. 901 // End with exception, too many redirects.
885 future = response.fold(null, (x, y) {}) 902 future = response.drain()
886 .then((_) => new Future.error( 903 .then((_) => new Future.error(
887 new RedirectLimitExceededException(response.redirects))); 904 new RedirectLimitExceededException(response.redirects)));
888 } 905 }
889 } else if (response._shouldAuthenticateProxy) { 906 } else if (response._shouldAuthenticateProxy) {
890 future = response._authenticate(true); 907 future = response._authenticate(true);
891 } else if (response._shouldAuthenticate) { 908 } else if (response._shouldAuthenticate) {
892 future = response._authenticate(false); 909 future = response._authenticate(false);
893 } else { 910 } else {
894 future = new Future<HttpClientResponse>.value(response); 911 future = new Future<HttpClientResponse>.value(response);
895 } 912 }
(...skipping 1423 matching lines...) Expand 10 before | Expand all | Expand 10 after
2319 2336
2320 2337
2321 class _RedirectInfo implements RedirectInfo { 2338 class _RedirectInfo implements RedirectInfo {
2322 const _RedirectInfo(int this.statusCode, 2339 const _RedirectInfo(int this.statusCode,
2323 String this.method, 2340 String this.method,
2324 Uri this.location); 2341 Uri this.location);
2325 final int statusCode; 2342 final int statusCode;
2326 final String method; 2343 final String method;
2327 final Uri location; 2344 final Uri location;
2328 } 2345 }
OLDNEW
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698