| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
| 8 final int _transferLength; | 8 final int _transferLength; |
| 9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
| 10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
| 11 | 11 |
| 12 bool fullBodyRead = false; | 12 bool fullBodyRead = false; |
| 13 | 13 |
| 14 // Common properties. | 14 // Common properties. |
| 15 final _HttpHeaders headers; | 15 final _HttpHeaders headers; |
| 16 bool upgraded = false; | 16 bool upgraded = false; |
| 17 | 17 |
| 18 // ClientResponse properties. | 18 // ClientResponse properties. |
| 19 int statusCode; | 19 int statusCode; |
| 20 String reasonPhrase; | 20 String reasonPhrase; |
| 21 | 21 |
| 22 // Request properties. | 22 // Request properties. |
| 23 String method; | 23 String method; |
| 24 Uri uri; | 24 Uri uri; |
| 25 | 25 |
| 26 bool hasSubscriber = false; |
| 27 |
| 26 // The transfer length if the length of the message body as it | 28 // The transfer length if the length of the message body as it |
| 27 // appears in the message (RFC 2616 section 4.4). This can be -1 if | 29 // appears in the message (RFC 2616 section 4.4). This can be -1 if |
| 28 // the length of the massage body is not known due to transfer | 30 // the length of the massage body is not known due to transfer |
| 29 // codings. | 31 // codings. |
| 30 int get transferLength => _transferLength; | 32 int get transferLength => _transferLength; |
| 31 | 33 |
| 32 _HttpIncoming(_HttpHeaders this.headers, | 34 _HttpIncoming(_HttpHeaders this.headers, |
| 33 int this._transferLength, | 35 int this._transferLength, |
| 34 Stream<List<int>> this._stream) { | 36 Stream<List<int>> this._stream) { |
| 35 } | 37 } |
| 36 | 38 |
| 37 StreamSubscription<List<int>> listen(void onData(List<int> event), | 39 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 38 {void onError(error), | 40 {void onError(error), |
| 39 void onDone(), | 41 void onDone(), |
| 40 bool cancelOnError}) { | 42 bool cancelOnError}) { |
| 43 hasSubscriber = true; |
| 41 return _stream.listen(onData, | 44 return _stream.listen(onData, |
| 42 onError: onError, | 45 onError: onError, |
| 43 onDone: onDone, | 46 onDone: onDone, |
| 44 cancelOnError: cancelOnError); | 47 cancelOnError: cancelOnError); |
| 45 } | 48 } |
| 46 | 49 |
| 47 // Is completed once all data have been received. | 50 // Is completed once all data have been received. |
| 48 Future get dataDone => _dataCompleter.future; | 51 Future get dataDone => _dataCompleter.future; |
| 49 | 52 |
| 50 void close(bool closing) { | 53 void close(bool closing) { |
| 51 fullBodyRead = true; | 54 fullBodyRead = true; |
| 55 hasSubscriber = true; |
| 52 _dataCompleter.complete(closing); | 56 _dataCompleter.complete(closing); |
| 53 } | 57 } |
| 54 } | 58 } |
| 55 | 59 |
| 56 abstract class _HttpInboundMessage extends Stream<List<int>> { | 60 abstract class _HttpInboundMessage extends Stream<List<int>> { |
| 57 final _HttpIncoming _incoming; | 61 final _HttpIncoming _incoming; |
| 58 List<Cookie> _cookies; | 62 List<Cookie> _cookies; |
| 59 | 63 |
| 60 _HttpInboundMessage(_HttpIncoming this._incoming); | 64 _HttpInboundMessage(_HttpIncoming this._incoming); |
| 61 | 65 |
| (...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 267 bool get _shouldAuthenticate { | 271 bool get _shouldAuthenticate { |
| 268 // Only try to authenticate if there is a challenge in the response. | 272 // Only try to authenticate if there is a challenge in the response. |
| 269 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; | 273 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; |
| 270 return statusCode == HttpStatus.UNAUTHORIZED && | 274 return statusCode == HttpStatus.UNAUTHORIZED && |
| 271 challenge != null && challenge.length == 1; | 275 challenge != null && challenge.length == 1; |
| 272 } | 276 } |
| 273 | 277 |
| 274 Future<HttpClientResponse> _authenticate(bool proxyAuth) { | 278 Future<HttpClientResponse> _authenticate(bool proxyAuth) { |
| 275 Future<HttpClientResponse> retry() { | 279 Future<HttpClientResponse> retry() { |
| 276 // Drain body and retry. | 280 // Drain body and retry. |
| 277 return fold(null, (x, y) {}).then((_) { | 281 return drain().then((_) { |
| 278 return _httpClient._openUrlFromRequest(_httpRequest.method, | 282 return _httpClient._openUrlFromRequest(_httpRequest.method, |
| 279 _httpRequest.uri, | 283 _httpRequest.uri, |
| 280 _httpRequest) | 284 _httpRequest) |
| 281 .then((request) => request.close()); | 285 .then((request) => request.close()); |
| 282 }); | 286 }); |
| 283 } | 287 } |
| 284 | 288 |
| 285 List<String> authChallenge() { | 289 List<String> authChallenge() { |
| 286 if (proxyAuth) { | 290 if (proxyAuth) { |
| 287 return headers[HttpHeaders.PROXY_AUTHENTICATE]; | 291 return headers[HttpHeaders.PROXY_AUTHENTICATE]; |
| (...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 463 Future<T> addStream(Stream<List<int>> stream) { | 467 Future<T> addStream(Stream<List<int>> stream) { |
| 464 return _dataSink.addStream(stream); | 468 return _dataSink.addStream(stream); |
| 465 } | 469 } |
| 466 | 470 |
| 467 Future close() { | 471 Future close() { |
| 468 return _dataSink.close(); | 472 return _dataSink.close(); |
| 469 } | 473 } |
| 470 | 474 |
| 471 Future<T> get done => _dataSink.done; | 475 Future<T> get done => _dataSink.done; |
| 472 | 476 |
| 473 void _writeHeaders() { | 477 Future _writeHeaders({drainRequest: true}) { |
| 474 if (_headersWritten) return; | 478 if (_headersWritten) return new Future.value(); |
| 475 _headersWritten = true; | 479 _headersWritten = true; |
| 476 headers._synchronize(); // Be sure the 'chunked' option is updated. | 480 headers._synchronize(); // Be sure the 'chunked' option is updated. |
| 477 bool isServerSide = this is _HttpResponse; | 481 bool isServerSide = this is _HttpResponse; |
| 478 if (isServerSide && headers.chunkedTransferEncoding) { | 482 if (isServerSide) { |
| 479 var response = this; | 483 var response = this; |
| 480 List acceptEncodings = | 484 if (headers.chunkedTransferEncoding) { |
| 481 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; | 485 List acceptEncodings = |
| 482 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; | 486 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
| 483 if (acceptEncodings != null && | 487 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
| 484 acceptEncodings | 488 if (acceptEncodings != null && |
| 485 .expand((list) => list.split(",")) | 489 acceptEncodings |
| 486 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && | 490 .expand((list) => list.split(",")) |
| 487 contentEncoding == null) { | 491 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
| 488 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); | 492 contentEncoding == null) { |
| 489 _asGZip = true; | 493 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
| 494 _asGZip = true; |
| 495 } |
| 496 } |
| 497 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { |
| 498 return response._httpRequest.drain() |
| 499 // TODO(ajohnsen): Timeout on drain? |
| 500 .catchError((_) {}) // Ignore errors. |
| 501 .then((_) => _writeHeader()); |
| 490 } | 502 } |
| 491 } | 503 } |
| 492 _writeHeader(); | 504 return new Future.sync(_writeHeader); |
| 493 } | 505 } |
| 494 | 506 |
| 495 Future _addStream(Stream<List<int>> stream) { | 507 Future _addStream(Stream<List<int>> stream) { |
| 496 _writeHeaders(); | 508 return _writeHeaders() |
| 497 int contentLength = headers.contentLength; | 509 .then((_) { |
| 498 if (_ignoreBody) { | 510 int contentLength = headers.contentLength; |
| 499 stream.fold(null, (x, y) {}).catchError((_) {}); | 511 if (_ignoreBody) { |
| 500 return _headersSink.close(); | 512 stream.drain().catchError((_) {}); |
| 501 } | 513 return _headersSink.close(); |
| 502 stream = stream.transform(new _BufferTransformer()); | 514 } |
| 503 if (headers.chunkedTransferEncoding) { | 515 stream = stream.transform(new _BufferTransformer()); |
| 504 if (_asGZip) { | 516 if (headers.chunkedTransferEncoding) { |
| 505 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); | 517 if (_asGZip) { |
| 506 } | 518 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
| 507 stream = stream.transform(new _ChunkedTransformer()); | 519 } |
| 508 } else if (contentLength >= 0) { | 520 stream = stream.transform(new _ChunkedTransformer()); |
| 509 stream = stream.transform(new _ContentLengthValidator(contentLength)); | 521 } else if (contentLength >= 0) { |
| 510 } | 522 stream = stream.transform( |
| 511 return _headersSink.addStream(stream); | 523 new _ContentLengthValidator(contentLength)); |
| 524 } |
| 525 return _headersSink.addStream(stream); |
| 526 }); |
| 512 } | 527 } |
| 513 | 528 |
| 514 Future _close() { | 529 Future _close() { |
| 515 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and | 530 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and |
| 516 // persistentConnection is not guaranteed to be in sync. | 531 // persistentConnection is not guaranteed to be in sync. |
| 517 if (!_headersWritten) { | 532 if (!_headersWritten) { |
| 518 if (!_ignoreBody && headers.contentLength == -1) { | 533 if (!_ignoreBody && headers.contentLength == -1) { |
| 519 // If no body was written, _ignoreBody is false (it's not a HEAD | 534 // If no body was written, _ignoreBody is false (it's not a HEAD |
| 520 // request) and the content-length is unspecified, set contentLength to | 535 // request) and the content-length is unspecified, set contentLength to |
| 521 // 0. | 536 // 0. |
| 522 headers.chunkedTransferEncoding = false; | 537 headers.chunkedTransferEncoding = false; |
| 523 headers.contentLength = 0; | 538 headers.contentLength = 0; |
| 524 } else if (!_ignoreBody && headers.contentLength > 0) { | 539 } else if (!_ignoreBody && headers.contentLength > 0) { |
| 525 _headersSink.close().catchError((_) {}); | 540 _headersSink.close().catchError((_) {}); |
| 526 return new Future.error(new HttpException( | 541 return new Future.error(new HttpException( |
| 527 "No content while contentLength was specified to be greater " | 542 "No content while contentLength was specified to be greater " |
| 528 " than 0: ${headers.contentLength}.")); | 543 " than 0: ${headers.contentLength}.")); |
| 529 } | 544 } |
| 530 } | 545 } |
| 531 _writeHeaders(); | 546 return _writeHeaders().then((_) => _headersSink.close()); |
| 532 return _headersSink.close(); | |
| 533 } | 547 } |
| 534 | 548 |
| 535 void _writeHeader(); // TODO(ajohnsen): Better name. | 549 void _writeHeader(); // TODO(ajohnsen): Better name. |
| 536 } | 550 } |
| 537 | 551 |
| 538 | 552 |
| 539 class _HttpOutboundConsumer implements StreamConsumer { | 553 class _HttpOutboundConsumer implements StreamConsumer { |
| 540 final _HttpOutboundMessage _outbound; | 554 final _HttpOutboundMessage _outbound; |
| 541 StreamController _controller; | 555 StreamController _controller; |
| 542 StreamSubscription _subscription; | 556 StreamSubscription _subscription; |
| 543 Completer _closeCompleter = new Completer(); | 557 Completer _closeCompleter = new Completer(); |
| 544 Completer _completer; | 558 Completer _completer; |
| 545 bool _socketError = false; | 559 bool _socketError = false; |
| 546 | 560 |
| 547 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); | 561 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); |
| 548 | 562 |
| 549 void _cancel() { | 563 void _cancel() { |
| 550 if (_subscription != null) { | 564 if (_subscription != null) { |
| 551 _subscription.cancel(); | 565 _subscription.cancel(); |
| 552 } | 566 } |
| 553 } | 567 } |
| 554 | 568 |
| 555 _ensureController() { | 569 _ensureController() { |
| 556 if (_controller != null) return; | 570 if (_controller != null) return; |
| 557 _controller = new StreamController(onPause: () => _subscription.pause(), | 571 _controller = new StreamController(onPause: () => _subscription.pause(), |
| 558 onResume: () => _subscription.resume(), | 572 onResume: () => _subscription.resume(), |
| 573 onListen: () => _subscription.resume(), |
| 559 onCancel: _cancel); | 574 onCancel: _cancel); |
| 560 _outbound._addStream(_controller.stream) | 575 _outbound._addStream(_controller.stream) |
| 561 .then((_) { | 576 .then((_) { |
| 562 _cancel(); | 577 _cancel(); |
| 563 _done(); | 578 _done(); |
| 564 _closeCompleter.complete(_outbound); | 579 _closeCompleter.complete(_outbound); |
| 565 }, | 580 }, |
| 566 onError: (error) { | 581 onError: (error) { |
| 567 _socketError = true; | 582 _socketError = true; |
| 568 if (error is SocketIOException && | 583 if (error is SocketIOException && |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 601 (data) { | 616 (data) { |
| 602 _controller.add(data); | 617 _controller.add(data); |
| 603 }, | 618 }, |
| 604 onDone: () { | 619 onDone: () { |
| 605 _done(); | 620 _done(); |
| 606 }, | 621 }, |
| 607 onError: (error) { | 622 onError: (error) { |
| 608 _done(error); | 623 _done(error); |
| 609 }, | 624 }, |
| 610 cancelOnError: true); | 625 cancelOnError: true); |
| 626 // Pause the first request. |
| 627 if (_controller == null) _subscription.pause(); |
| 611 _ensureController(); | 628 _ensureController(); |
| 612 return _completer.future; | 629 return _completer.future; |
| 613 } | 630 } |
| 614 | 631 |
| 615 Future close() { | 632 Future close() { |
| 616 Future closeOutbound() { | 633 Future closeOutbound() { |
| 617 if (_socketError) return new Future.value(_outbound); | 634 if (_socketError) return new Future.value(_outbound); |
| 618 return _outbound._close().then((_) => _outbound); | 635 return _outbound._close().then((_) => _outbound); |
| 619 } | 636 } |
| 620 if (_controller == null) return closeOutbound(); | 637 if (_controller == null) return closeOutbound(); |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 675 } | 692 } |
| 676 | 693 |
| 677 String get reasonPhrase => _findReasonPhrase(statusCode); | 694 String get reasonPhrase => _findReasonPhrase(statusCode); |
| 678 void set reasonPhrase(String reasonPhrase) { | 695 void set reasonPhrase(String reasonPhrase) { |
| 679 if (_headersWritten) throw new StateError("Header already sent"); | 696 if (_headersWritten) throw new StateError("Header already sent"); |
| 680 _reasonPhrase = reasonPhrase; | 697 _reasonPhrase = reasonPhrase; |
| 681 } | 698 } |
| 682 | 699 |
| 683 Future<Socket> detachSocket() { | 700 Future<Socket> detachSocket() { |
| 684 if (_headersWritten) throw new StateError("Headers already sent"); | 701 if (_headersWritten) throw new StateError("Headers already sent"); |
| 685 _writeHeaders(); | |
| 686 var future = _httpRequest._httpConnection.detachSocket(); | 702 var future = _httpRequest._httpConnection.detachSocket(); |
| 703 _writeHeaders(drainRequest: false).then((_) => close()); |
| 687 // Close connection so the socket is 'free'. | 704 // Close connection so the socket is 'free'. |
| 688 close(); | 705 close(); |
| 689 done.catchError((_) { | 706 done.catchError((_) { |
| 690 // Catch any error on done, as they automatically will be | 707 // Catch any error on done, as they automatically will be |
| 691 // propagated to the websocket. | 708 // propagated to the websocket. |
| 692 }); | 709 }); |
| 693 return future; | 710 return future; |
| 694 } | 711 } |
| 695 | 712 |
| 696 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; | 713 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
| (...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 871 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; | 888 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; |
| 872 | 889 |
| 873 void _onIncoming(_HttpIncoming incoming) { | 890 void _onIncoming(_HttpIncoming incoming) { |
| 874 var response = new _HttpClientResponse(incoming, | 891 var response = new _HttpClientResponse(incoming, |
| 875 this, | 892 this, |
| 876 _httpClient); | 893 _httpClient); |
| 877 Future<HttpClientResponse> future; | 894 Future<HttpClientResponse> future; |
| 878 if (followRedirects && response.isRedirect) { | 895 if (followRedirects && response.isRedirect) { |
| 879 if (response.redirects.length < maxRedirects) { | 896 if (response.redirects.length < maxRedirects) { |
| 880 // Redirect and drain response. | 897 // Redirect and drain response. |
| 881 future = response.fold(null, (x, y) {}) | 898 future = response.drain() |
| 882 .then((_) => response.redirect()); | 899 .then((_) => response.redirect()); |
| 883 } else { | 900 } else { |
| 884 // End with exception, too many redirects. | 901 // End with exception, too many redirects. |
| 885 future = response.fold(null, (x, y) {}) | 902 future = response.drain() |
| 886 .then((_) => new Future.error( | 903 .then((_) => new Future.error( |
| 887 new RedirectLimitExceededException(response.redirects))); | 904 new RedirectLimitExceededException(response.redirects))); |
| 888 } | 905 } |
| 889 } else if (response._shouldAuthenticateProxy) { | 906 } else if (response._shouldAuthenticateProxy) { |
| 890 future = response._authenticate(true); | 907 future = response._authenticate(true); |
| 891 } else if (response._shouldAuthenticate) { | 908 } else if (response._shouldAuthenticate) { |
| 892 future = response._authenticate(false); | 909 future = response._authenticate(false); |
| 893 } else { | 910 } else { |
| 894 future = new Future<HttpClientResponse>.value(response); | 911 future = new Future<HttpClientResponse>.value(response); |
| 895 } | 912 } |
| (...skipping 1423 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2319 | 2336 |
| 2320 | 2337 |
| 2321 class _RedirectInfo implements RedirectInfo { | 2338 class _RedirectInfo implements RedirectInfo { |
| 2322 const _RedirectInfo(int this.statusCode, | 2339 const _RedirectInfo(int this.statusCode, |
| 2323 String this.method, | 2340 String this.method, |
| 2324 Uri this.location); | 2341 Uri this.location); |
| 2325 final int statusCode; | 2342 final int statusCode; |
| 2326 final String method; | 2343 final String method; |
| 2327 final Uri location; | 2344 final Uri location; |
| 2328 } | 2345 } |
| OLD | NEW |