OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
8 final int _transferLength; | 8 final int _transferLength; |
9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
11 | 11 |
12 bool fullBodyRead = false; | 12 bool fullBodyRead = false; |
13 | 13 |
14 // Common properties. | 14 // Common properties. |
15 final _HttpHeaders headers; | 15 final _HttpHeaders headers; |
16 bool upgraded = false; | 16 bool upgraded = false; |
17 | 17 |
18 // ClientResponse properties. | 18 // ClientResponse properties. |
19 int statusCode; | 19 int statusCode; |
20 String reasonPhrase; | 20 String reasonPhrase; |
21 | 21 |
22 // Request properties. | 22 // Request properties. |
23 String method; | 23 String method; |
24 Uri uri; | 24 Uri uri; |
25 | 25 |
| 26 bool hasSubscriber = false; |
| 27 |
26 // The transfer length if the length of the message body as it | 28 // The transfer length if the length of the message body as it |
27 // appears in the message (RFC 2616 section 4.4). This can be -1 if | 29 // appears in the message (RFC 2616 section 4.4). This can be -1 if |
28 // the length of the massage body is not known due to transfer | 30 // the length of the massage body is not known due to transfer |
29 // codings. | 31 // codings. |
30 int get transferLength => _transferLength; | 32 int get transferLength => _transferLength; |
31 | 33 |
32 _HttpIncoming(_HttpHeaders this.headers, | 34 _HttpIncoming(_HttpHeaders this.headers, |
33 int this._transferLength, | 35 int this._transferLength, |
34 Stream<List<int>> this._stream) { | 36 Stream<List<int>> this._stream) { |
35 } | 37 } |
36 | 38 |
37 StreamSubscription<List<int>> listen(void onData(List<int> event), | 39 StreamSubscription<List<int>> listen(void onData(List<int> event), |
38 {void onError(error), | 40 {void onError(error), |
39 void onDone(), | 41 void onDone(), |
40 bool cancelOnError}) { | 42 bool cancelOnError}) { |
| 43 hasSubscriber = true; |
41 return _stream.listen(onData, | 44 return _stream.listen(onData, |
42 onError: onError, | 45 onError: onError, |
43 onDone: onDone, | 46 onDone: onDone, |
44 cancelOnError: cancelOnError); | 47 cancelOnError: cancelOnError); |
45 } | 48 } |
46 | 49 |
47 // Is completed once all data have been received. | 50 // Is completed once all data have been received. |
48 Future get dataDone => _dataCompleter.future; | 51 Future get dataDone => _dataCompleter.future; |
49 | 52 |
50 void close(bool closing) { | 53 void close(bool closing) { |
51 fullBodyRead = true; | 54 fullBodyRead = true; |
| 55 hasSubscriber = true; |
52 _dataCompleter.complete(closing); | 56 _dataCompleter.complete(closing); |
53 } | 57 } |
54 } | 58 } |
55 | 59 |
56 abstract class _HttpInboundMessage extends Stream<List<int>> { | 60 abstract class _HttpInboundMessage extends Stream<List<int>> { |
57 final _HttpIncoming _incoming; | 61 final _HttpIncoming _incoming; |
58 List<Cookie> _cookies; | 62 List<Cookie> _cookies; |
59 | 63 |
60 _HttpInboundMessage(_HttpIncoming this._incoming); | 64 _HttpInboundMessage(_HttpIncoming this._incoming); |
61 | 65 |
(...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
267 bool get _shouldAuthenticate { | 271 bool get _shouldAuthenticate { |
268 // Only try to authenticate if there is a challenge in the response. | 272 // Only try to authenticate if there is a challenge in the response. |
269 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; | 273 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; |
270 return statusCode == HttpStatus.UNAUTHORIZED && | 274 return statusCode == HttpStatus.UNAUTHORIZED && |
271 challenge != null && challenge.length == 1; | 275 challenge != null && challenge.length == 1; |
272 } | 276 } |
273 | 277 |
274 Future<HttpClientResponse> _authenticate(bool proxyAuth) { | 278 Future<HttpClientResponse> _authenticate(bool proxyAuth) { |
275 Future<HttpClientResponse> retry() { | 279 Future<HttpClientResponse> retry() { |
276 // Drain body and retry. | 280 // Drain body and retry. |
277 return fold(null, (x, y) {}).then((_) { | 281 return drain().then((_) { |
278 return _httpClient._openUrlFromRequest(_httpRequest.method, | 282 return _httpClient._openUrlFromRequest(_httpRequest.method, |
279 _httpRequest.uri, | 283 _httpRequest.uri, |
280 _httpRequest) | 284 _httpRequest) |
281 .then((request) => request.close()); | 285 .then((request) => request.close()); |
282 }); | 286 }); |
283 } | 287 } |
284 | 288 |
285 List<String> authChallenge() { | 289 List<String> authChallenge() { |
286 if (proxyAuth) { | 290 if (proxyAuth) { |
287 return headers[HttpHeaders.PROXY_AUTHENTICATE]; | 291 return headers[HttpHeaders.PROXY_AUTHENTICATE]; |
(...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 Future<T> addStream(Stream<List<int>> stream) { | 467 Future<T> addStream(Stream<List<int>> stream) { |
464 return _dataSink.addStream(stream); | 468 return _dataSink.addStream(stream); |
465 } | 469 } |
466 | 470 |
467 Future close() { | 471 Future close() { |
468 return _dataSink.close(); | 472 return _dataSink.close(); |
469 } | 473 } |
470 | 474 |
471 Future<T> get done => _dataSink.done; | 475 Future<T> get done => _dataSink.done; |
472 | 476 |
473 void _writeHeaders() { | 477 Future _writeHeaders({drainRequest: true}) { |
474 if (_headersWritten) return; | 478 if (_headersWritten) return new Future.value(); |
475 _headersWritten = true; | 479 _headersWritten = true; |
476 headers._synchronize(); // Be sure the 'chunked' option is updated. | 480 headers._synchronize(); // Be sure the 'chunked' option is updated. |
477 bool isServerSide = this is _HttpResponse; | 481 bool isServerSide = this is _HttpResponse; |
478 if (isServerSide && headers.chunkedTransferEncoding) { | 482 if (isServerSide) { |
479 var response = this; | 483 var response = this; |
480 List acceptEncodings = | 484 if (headers.chunkedTransferEncoding) { |
481 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; | 485 List acceptEncodings = |
482 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; | 486 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
483 if (acceptEncodings != null && | 487 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
484 acceptEncodings | 488 if (acceptEncodings != null && |
485 .expand((list) => list.split(",")) | 489 acceptEncodings |
486 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && | 490 .expand((list) => list.split(",")) |
487 contentEncoding == null) { | 491 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
488 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); | 492 contentEncoding == null) { |
489 _asGZip = true; | 493 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
| 494 _asGZip = true; |
| 495 } |
| 496 } |
| 497 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { |
| 498 return response._httpRequest.drain() |
| 499 // TODO(ajohnsen): Timeout on drain? |
| 500 .catchError((_) {}) // Ignore errors. |
| 501 .then((_) => _writeHeader()); |
490 } | 502 } |
491 } | 503 } |
492 _writeHeader(); | 504 return new Future.sync(_writeHeader); |
493 } | 505 } |
494 | 506 |
495 Future _addStream(Stream<List<int>> stream) { | 507 Future _addStream(Stream<List<int>> stream) { |
496 _writeHeaders(); | 508 return _writeHeaders() |
497 int contentLength = headers.contentLength; | 509 .then((_) { |
498 if (_ignoreBody) { | 510 int contentLength = headers.contentLength; |
499 stream.fold(null, (x, y) {}).catchError((_) {}); | 511 if (_ignoreBody) { |
500 return _headersSink.close(); | 512 stream.drain().catchError((_) {}); |
501 } | 513 return _headersSink.close(); |
502 stream = stream.transform(new _BufferTransformer()); | 514 } |
503 if (headers.chunkedTransferEncoding) { | 515 stream = stream.transform(new _BufferTransformer()); |
504 if (_asGZip) { | 516 if (headers.chunkedTransferEncoding) { |
505 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); | 517 if (_asGZip) { |
506 } | 518 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
507 stream = stream.transform(new _ChunkedTransformer()); | 519 } |
508 } else if (contentLength >= 0) { | 520 stream = stream.transform(new _ChunkedTransformer()); |
509 stream = stream.transform(new _ContentLengthValidator(contentLength)); | 521 } else if (contentLength >= 0) { |
510 } | 522 stream = stream.transform( |
511 return _headersSink.addStream(stream); | 523 new _ContentLengthValidator(contentLength)); |
| 524 } |
| 525 return _headersSink.addStream(stream); |
| 526 }); |
512 } | 527 } |
513 | 528 |
514 Future _close() { | 529 Future _close() { |
515 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and | 530 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and |
516 // persistentConnection is not guaranteed to be in sync. | 531 // persistentConnection is not guaranteed to be in sync. |
517 if (!_headersWritten) { | 532 if (!_headersWritten) { |
518 if (!_ignoreBody && headers.contentLength == -1) { | 533 if (!_ignoreBody && headers.contentLength == -1) { |
519 // If no body was written, _ignoreBody is false (it's not a HEAD | 534 // If no body was written, _ignoreBody is false (it's not a HEAD |
520 // request) and the content-length is unspecified, set contentLength to | 535 // request) and the content-length is unspecified, set contentLength to |
521 // 0. | 536 // 0. |
522 headers.chunkedTransferEncoding = false; | 537 headers.chunkedTransferEncoding = false; |
523 headers.contentLength = 0; | 538 headers.contentLength = 0; |
524 } else if (!_ignoreBody && headers.contentLength > 0) { | 539 } else if (!_ignoreBody && headers.contentLength > 0) { |
525 _headersSink.close().catchError((_) {}); | 540 _headersSink.close().catchError((_) {}); |
526 return new Future.error(new HttpException( | 541 return new Future.error(new HttpException( |
527 "No content while contentLength was specified to be greater " | 542 "No content while contentLength was specified to be greater " |
528 " than 0: ${headers.contentLength}.")); | 543 " than 0: ${headers.contentLength}.")); |
529 } | 544 } |
530 } | 545 } |
531 _writeHeaders(); | 546 return _writeHeaders().then((_) => _headersSink.close()); |
532 return _headersSink.close(); | |
533 } | 547 } |
534 | 548 |
535 void _writeHeader(); // TODO(ajohnsen): Better name. | 549 void _writeHeader(); // TODO(ajohnsen): Better name. |
536 } | 550 } |
537 | 551 |
538 | 552 |
539 class _HttpOutboundConsumer implements StreamConsumer { | 553 class _HttpOutboundConsumer implements StreamConsumer { |
540 final _HttpOutboundMessage _outbound; | 554 final _HttpOutboundMessage _outbound; |
541 StreamController _controller; | 555 StreamController _controller; |
542 StreamSubscription _subscription; | 556 StreamSubscription _subscription; |
543 Completer _closeCompleter = new Completer(); | 557 Completer _closeCompleter = new Completer(); |
544 Completer _completer; | 558 Completer _completer; |
545 bool _socketError = false; | 559 bool _socketError = false; |
546 | 560 |
547 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); | 561 _HttpOutboundConsumer(_HttpOutboundMessage this._outbound); |
548 | 562 |
549 void _cancel() { | 563 void _cancel() { |
550 if (_subscription != null) { | 564 if (_subscription != null) { |
551 _subscription.cancel(); | 565 _subscription.cancel(); |
552 } | 566 } |
553 } | 567 } |
554 | 568 |
555 _ensureController() { | 569 _ensureController() { |
556 if (_controller != null) return; | 570 if (_controller != null) return; |
557 _controller = new StreamController(onPause: () => _subscription.pause(), | 571 _controller = new StreamController(onPause: () => _subscription.pause(), |
558 onResume: () => _subscription.resume(), | 572 onResume: () => _subscription.resume(), |
| 573 onListen: () => _subscription.resume(), |
559 onCancel: _cancel); | 574 onCancel: _cancel); |
560 _outbound._addStream(_controller.stream) | 575 _outbound._addStream(_controller.stream) |
561 .then((_) { | 576 .then((_) { |
562 _cancel(); | 577 _cancel(); |
563 _done(); | 578 _done(); |
564 _closeCompleter.complete(_outbound); | 579 _closeCompleter.complete(_outbound); |
565 }, | 580 }, |
566 onError: (error) { | 581 onError: (error) { |
567 _socketError = true; | 582 _socketError = true; |
568 if (error is SocketIOException && | 583 if (error is SocketIOException && |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
601 (data) { | 616 (data) { |
602 _controller.add(data); | 617 _controller.add(data); |
603 }, | 618 }, |
604 onDone: () { | 619 onDone: () { |
605 _done(); | 620 _done(); |
606 }, | 621 }, |
607 onError: (error) { | 622 onError: (error) { |
608 _done(error); | 623 _done(error); |
609 }, | 624 }, |
610 cancelOnError: true); | 625 cancelOnError: true); |
| 626 // Pause the first request. |
| 627 if (_controller == null) _subscription.pause(); |
611 _ensureController(); | 628 _ensureController(); |
612 return _completer.future; | 629 return _completer.future; |
613 } | 630 } |
614 | 631 |
615 Future close() { | 632 Future close() { |
616 Future closeOutbound() { | 633 Future closeOutbound() { |
617 if (_socketError) return new Future.value(_outbound); | 634 if (_socketError) return new Future.value(_outbound); |
618 return _outbound._close().then((_) => _outbound); | 635 return _outbound._close().then((_) => _outbound); |
619 } | 636 } |
620 if (_controller == null) return closeOutbound(); | 637 if (_controller == null) return closeOutbound(); |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
675 } | 692 } |
676 | 693 |
677 String get reasonPhrase => _findReasonPhrase(statusCode); | 694 String get reasonPhrase => _findReasonPhrase(statusCode); |
678 void set reasonPhrase(String reasonPhrase) { | 695 void set reasonPhrase(String reasonPhrase) { |
679 if (_headersWritten) throw new StateError("Header already sent"); | 696 if (_headersWritten) throw new StateError("Header already sent"); |
680 _reasonPhrase = reasonPhrase; | 697 _reasonPhrase = reasonPhrase; |
681 } | 698 } |
682 | 699 |
683 Future<Socket> detachSocket() { | 700 Future<Socket> detachSocket() { |
684 if (_headersWritten) throw new StateError("Headers already sent"); | 701 if (_headersWritten) throw new StateError("Headers already sent"); |
685 _writeHeaders(); | |
686 var future = _httpRequest._httpConnection.detachSocket(); | 702 var future = _httpRequest._httpConnection.detachSocket(); |
| 703 _writeHeaders(drainRequest: false).then((_) => close()); |
687 // Close connection so the socket is 'free'. | 704 // Close connection so the socket is 'free'. |
688 close(); | 705 close(); |
689 done.catchError((_) { | 706 done.catchError((_) { |
690 // Catch any error on done, as they automatically will be | 707 // Catch any error on done, as they automatically will be |
691 // propagated to the websocket. | 708 // propagated to the websocket. |
692 }); | 709 }); |
693 return future; | 710 return future; |
694 } | 711 } |
695 | 712 |
696 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; | 713 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
871 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; | 888 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; |
872 | 889 |
873 void _onIncoming(_HttpIncoming incoming) { | 890 void _onIncoming(_HttpIncoming incoming) { |
874 var response = new _HttpClientResponse(incoming, | 891 var response = new _HttpClientResponse(incoming, |
875 this, | 892 this, |
876 _httpClient); | 893 _httpClient); |
877 Future<HttpClientResponse> future; | 894 Future<HttpClientResponse> future; |
878 if (followRedirects && response.isRedirect) { | 895 if (followRedirects && response.isRedirect) { |
879 if (response.redirects.length < maxRedirects) { | 896 if (response.redirects.length < maxRedirects) { |
880 // Redirect and drain response. | 897 // Redirect and drain response. |
881 future = response.fold(null, (x, y) {}) | 898 future = response.drain() |
882 .then((_) => response.redirect()); | 899 .then((_) => response.redirect()); |
883 } else { | 900 } else { |
884 // End with exception, too many redirects. | 901 // End with exception, too many redirects. |
885 future = response.fold(null, (x, y) {}) | 902 future = response.drain() |
886 .then((_) => new Future.error( | 903 .then((_) => new Future.error( |
887 new RedirectLimitExceededException(response.redirects))); | 904 new RedirectLimitExceededException(response.redirects))); |
888 } | 905 } |
889 } else if (response._shouldAuthenticateProxy) { | 906 } else if (response._shouldAuthenticateProxy) { |
890 future = response._authenticate(true); | 907 future = response._authenticate(true); |
891 } else if (response._shouldAuthenticate) { | 908 } else if (response._shouldAuthenticate) { |
892 future = response._authenticate(false); | 909 future = response._authenticate(false); |
893 } else { | 910 } else { |
894 future = new Future<HttpClientResponse>.value(response); | 911 future = new Future<HttpClientResponse>.value(response); |
895 } | 912 } |
(...skipping 1423 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2319 | 2336 |
2320 | 2337 |
2321 class _RedirectInfo implements RedirectInfo { | 2338 class _RedirectInfo implements RedirectInfo { |
2322 const _RedirectInfo(int this.statusCode, | 2339 const _RedirectInfo(int this.statusCode, |
2323 String this.method, | 2340 String this.method, |
2324 Uri this.location); | 2341 Uri this.location); |
2325 final int statusCode; | 2342 final int statusCode; |
2326 final String method; | 2343 final String method; |
2327 final Uri location; | 2344 final Uri location; |
2328 } | 2345 } |
OLD | NEW |