OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const int _HEADERS_BUFFER_SIZE = 8 * 1024; | 7 const int _OUTGOING_BUFFER_SIZE = 8 * 1024; |
8 | 8 |
9 class _HttpIncoming extends Stream<List<int>> { | 9 class _HttpIncoming extends Stream<List<int>> { |
10 final int _transferLength; | 10 final int _transferLength; |
11 final Completer _dataCompleter = new Completer(); | 11 final Completer _dataCompleter = new Completer(); |
12 Stream<List<int>> _stream; | 12 Stream<List<int>> _stream; |
13 | 13 |
14 bool fullBodyRead = false; | 14 bool fullBodyRead = false; |
15 | 15 |
16 // Common properties. | 16 // Common properties. |
17 final _HttpHeaders headers; | 17 final _HttpHeaders headers; |
(...skipping 381 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
399 return retry(); | 399 return retry(); |
400 } else { | 400 } else { |
401 // No credentials available, complete with original response. | 401 // No credentials available, complete with original response. |
402 return this; | 402 return this; |
403 } | 403 } |
404 }); | 404 }); |
405 } | 405 } |
406 } | 406 } |
407 | 407 |
408 | 408 |
409 abstract class _HttpOutboundMessage<T> implements IOSink { | 409 abstract class _HttpOutboundMessage<T> extends _IOSinkImpl { |
410 // Used to mark when the body should be written. This is used for HEAD | 410 // Used to mark when the body should be written. This is used for HEAD |
411 // requests and in error handling. | 411 // requests and in error handling. |
412 bool _ignoreBody = false; | 412 bool _ignoreBody = false; |
413 bool _headersWritten = false; | 413 bool _headersWritten = false; |
414 bool _asGZip = false; | |
415 | 414 |
416 IOSink _headersSink; | 415 final Uri _uri; |
417 IOSink _dataSink; | |
418 | |
419 final _HttpOutgoing _outgoing; | 416 final _HttpOutgoing _outgoing; |
420 final Uri _uri; | |
421 | 417 |
422 final _HttpHeaders headers; | 418 final _HttpHeaders headers; |
423 | 419 |
424 _HttpOutboundMessage(this._uri, | 420 _HttpOutboundMessage(this._uri, |
425 String protocolVersion, | 421 String protocolVersion, |
426 _HttpOutgoing outgoing) | 422 this._outgoing) |
427 : _outgoing = outgoing, | 423 : super(new _HttpOutboundConsumer(), null), |
428 _headersSink = new IOSink(outgoing, encoding: ASCII), | |
429 headers = new _HttpHeaders(protocolVersion) { | 424 headers = new _HttpHeaders(protocolVersion) { |
430 _dataSink = new IOSink(new _HttpOutboundConsumer(this)); | 425 _outgoing.outbound = this; |
426 (_target as _HttpOutboundConsumer).outbound = this; | |
427 _encodingMutable = false; | |
431 } | 428 } |
432 | 429 |
433 int get contentLength => headers.contentLength; | 430 int get contentLength => headers.contentLength; |
434 void set contentLength(int contentLength) { | 431 void set contentLength(int contentLength) { |
435 headers.contentLength = contentLength; | 432 headers.contentLength = contentLength; |
436 } | 433 } |
437 | 434 |
438 bool get persistentConnection => headers.persistentConnection; | 435 bool get persistentConnection => headers.persistentConnection; |
439 void set persistentConnection(bool p) { | 436 void set persistentConnection(bool p) { |
440 headers.persistentConnection = p; | 437 headers.persistentConnection = p; |
441 } | 438 } |
442 | 439 |
440 void write(Object obj) { | |
441 if (!_headersWritten) _encoding = encoding; | |
442 super.write(obj); | |
443 } | |
444 | |
443 Encoding get encoding { | 445 Encoding get encoding { |
444 var charset; | 446 var charset; |
445 if (headers.contentType != null && headers.contentType.charset != null) { | 447 if (headers.contentType != null && headers.contentType.charset != null) { |
446 charset = headers.contentType.charset; | 448 charset = headers.contentType.charset; |
447 } else { | 449 } else { |
448 charset = "iso-8859-1"; | 450 charset = "iso-8859-1"; |
449 } | 451 } |
450 return Encoding.getByName(charset); | 452 return Encoding.getByName(charset); |
451 } | 453 } |
452 | 454 |
453 void set encoding(Encoding value) { | |
454 throw new StateError("IOSink encoding is not mutable"); | |
455 } | |
456 | |
457 void write(Object obj) { | |
458 if (!_headersWritten) _dataSink.encoding = encoding; | |
459 _dataSink.write(obj); | |
460 } | |
461 | |
462 void writeAll(Iterable objects, [String separator = ""]) { | |
463 if (!_headersWritten) _dataSink.encoding = encoding; | |
464 _dataSink.writeAll(objects, separator); | |
465 } | |
466 | |
467 void writeln([Object obj = ""]) { | |
468 if (!_headersWritten) _dataSink.encoding = encoding; | |
469 _dataSink.writeln(obj); | |
470 } | |
471 | |
472 void writeCharCode(int charCode) { | |
473 if (!_headersWritten) _dataSink.encoding = encoding; | |
474 _dataSink.writeCharCode(charCode); | |
475 } | |
476 | |
477 void add(List<int> data) { | 455 void add(List<int> data) { |
478 if (data.length == 0) return; | 456 if (data.length == 0) return; |
479 _dataSink.add(data); | 457 super.add(data); |
480 } | 458 } |
481 | 459 |
482 void addError(error, [StackTrace stackTrace]) => | 460 Future _writeHeaders({bool drainRequest: true, |
483 _dataSink.addError(error, stackTrace); | 461 bool setOutgoing: true}) { |
484 | 462 // TODO(ajohnsen): Avoid excessive futures in this method. |
485 Future<T> addStream(Stream<List<int>> stream) => _dataSink.addStream(stream); | 463 write() { |
486 | |
487 Future flush() => _dataSink.flush(); | |
488 | |
489 Future close() => _dataSink.close(); | |
490 | |
491 Future<T> get done => _dataSink.done; | |
492 | |
493 Future _writeHeaders({bool drainRequest: true}) { | |
494 void write() { | |
495 try { | 464 try { |
496 _writeHeader(); | 465 _writeHeader(); |
497 } catch (error) { | 466 } catch (error, s) { |
Søren Gjesse
2014/03/04 09:29:15
s is not used.
Anders Johnsen
2014/03/04 13:54:16
Done.
| |
498 // Headers too large. | 467 // Headers too large. |
499 throw new HttpException( | 468 throw new HttpException( |
500 "Headers size exceeded the of '$_HEADERS_BUFFER_SIZE' bytes"); | 469 "Headers size exceeded the of '$_OUTGOING_BUFFER_SIZE'" |
470 " bytes"); | |
501 } | 471 } |
472 return this; | |
502 } | 473 } |
503 if (_headersWritten) return new Future.value(); | 474 if (_headersWritten) return new Future.value(this); |
504 _headersWritten = true; | 475 _headersWritten = true; |
505 _dataSink.encoding = encoding; | 476 _encoding = encoding; |
477 Future drainFuture; | |
506 bool isServerSide = this is _HttpResponse; | 478 bool isServerSide = this is _HttpResponse; |
479 bool gzip = false; | |
507 if (isServerSide) { | 480 if (isServerSide) { |
508 var response = this; | 481 var response = this; |
509 if (headers.chunkedTransferEncoding) { | 482 if (headers.chunkedTransferEncoding) { |
510 List acceptEncodings = | 483 List acceptEncodings = |
511 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; | 484 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
512 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; | 485 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
513 if (acceptEncodings != null && | 486 if (acceptEncodings != null && |
514 acceptEncodings | 487 acceptEncodings |
515 .expand((list) => list.split(",")) | 488 .expand((list) => list.split(",")) |
516 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && | 489 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
517 contentEncoding == null) { | 490 contentEncoding == null) { |
518 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); | 491 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
519 _asGZip = true; | 492 gzip = true; |
520 } | 493 } |
521 } | 494 } |
522 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { | 495 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { |
523 return response._httpRequest.drain() | 496 drainFuture = response._httpRequest.drain().catchError((_) {}); |
524 // TODO(ajohnsen): Timeout on drain? | |
525 .catchError((_) {}) // Ignore errors. | |
526 .then((_) => write()); | |
527 } | 497 } |
498 } else { | |
499 drainRequest = false; | |
500 } | |
501 if (_ignoreBody) { | |
502 return new Future.sync(write).then((_) => _outgoing.close()); | |
503 } | |
504 if (setOutgoing) { | |
505 int contentLength = headers.contentLength; | |
506 if (headers.chunkedTransferEncoding) { | |
507 _outgoing.chunked = true; | |
508 if (gzip) _outgoing.gzip = true; | |
509 } else if (contentLength >= 0) { | |
510 _outgoing.contentLength = contentLength; | |
511 } | |
512 } | |
513 if (drainFuture != null) { | |
514 return drainFuture.then((_) => write()); | |
528 } | 515 } |
529 return new Future.sync(write); | 516 return new Future.sync(write); |
530 } | 517 } |
531 | 518 |
532 Future _addStream(Stream<List<int>> stream) { | 519 Future _addStream(Stream<List<int>> stream) { |
533 return _writeHeaders() | 520 // TODO(ajohnsen): Merge into _HttpOutgoing. |
534 .then((_) { | 521 if (_ignoreBody) { |
535 int contentLength = headers.contentLength; | 522 stream.drain().catchError((_) {}); |
536 if (_ignoreBody) { | 523 return _writeHeaders(); |
537 stream.drain().catchError((_) {}); | 524 } |
538 return _headersSink.close(); | 525 if (_headersWritten) { |
539 } | 526 return _outgoing.addStream(stream); |
540 stream = stream.transform(const _BufferTransformer()); | 527 } else { |
541 if (headers.chunkedTransferEncoding) { | 528 var completer = new Completer.sync(); |
542 if (_asGZip) { | 529 var future = _outgoing.addStream(stream, completer.future); |
543 stream = stream.transform(GZIP.encoder); | 530 _writeHeaders().then(completer.complete); |
544 } | 531 return future; |
545 stream = stream.transform(const _ChunkedTransformer()); | 532 } |
546 } else if (contentLength >= 0) { | |
547 stream = stream.transform( | |
548 new _ContentLengthValidator(contentLength, _uri)); | |
549 } | |
550 return _headersSink.addStream(stream); | |
551 }); | |
552 } | 533 } |
553 | 534 |
554 Future _close() { | 535 Future _close() { |
536 // TODO(ajohnsen): Merge into _HttpOutgoing. | |
555 if (!_headersWritten) { | 537 if (!_headersWritten) { |
556 if (!_ignoreBody && headers.contentLength == -1) { | 538 if (!_ignoreBody && headers.contentLength == -1) { |
557 // If no body was written, _ignoreBody is false (it's not a HEAD | 539 // If no body was written, _ignoreBody is false (it's not a HEAD |
558 // request) and the content-length is unspecified, set contentLength to | 540 // request) and the content-length is unspecified, set contentLength to |
559 // 0. | 541 // 0. |
560 headers.chunkedTransferEncoding = false; | 542 headers.chunkedTransferEncoding = false; |
561 headers.contentLength = 0; | 543 headers.contentLength = 0; |
562 } else if (!_ignoreBody && headers.contentLength > 0) { | 544 } else if (!_ignoreBody && headers.contentLength > 0) { |
563 _headersSink.addError(new HttpException( | 545 return _outgoing.addStream( |
564 "No content while contentLength was specified to be greater " | 546 new Stream.fromFuture(new Future.error(new HttpException( |
565 "than 0: ${headers.contentLength}.", | 547 "No content even though contentLength was specified to be " |
566 uri: _uri)); | 548 "greater than 0: ${headers.contentLength}.", |
567 return _headersSink.done; | 549 uri: _uri)))); |
568 } | 550 } |
569 } | 551 } |
570 return _writeHeaders().whenComplete(_headersSink.close); | 552 return _writeHeaders().whenComplete(_outgoing.close); |
571 } | 553 } |
572 | 554 |
573 void _writeHeader(); | 555 void _writeHeader(); |
574 } | 556 } |
575 | 557 |
576 | 558 |
577 class _HttpOutboundConsumer implements StreamConsumer { | 559 class _HttpOutboundConsumer implements StreamConsumer { |
578 final _HttpOutboundMessage _outbound; | 560 // TODO(ajohnsen): Once _addStream and _close is merged into _HttpOutgoing, |
579 StreamController _controller; | 561 // this class can be removed. |
580 StreamSubscription _subscription; | 562 _HttpOutboundMessage outbound; |
581 Completer _closeCompleter = new Completer(); | 563 _HttpOutboundConsumer(); |
582 Completer _completer; | |
583 bool _socketError = false; | |
584 | 564 |
585 _HttpOutboundConsumer(this._outbound); | 565 Future addStream(var stream) => outbound._addStream(stream); |
586 | 566 Future close() => outbound._close(); |
587 void _cancel() { | |
588 if (_subscription != null) { | |
589 StreamSubscription subscription = _subscription; | |
590 _subscription = null; | |
591 subscription.cancel(); | |
592 } | |
593 } | |
594 | |
595 bool _ignoreError(error) | |
596 => (error is SocketException || error is TlsException) && | |
597 _outbound is HttpResponse; | |
598 | |
599 _ensureController() { | |
600 if (_controller != null) return; | |
601 _controller = new StreamController(sync: true, | |
602 onPause: () => _subscription.pause(), | |
603 onResume: () => _subscription.resume(), | |
604 onListen: () => _subscription.resume(), | |
605 onCancel: _cancel); | |
606 _outbound._addStream(_controller.stream) | |
607 .then((_) { | |
608 _cancel(); | |
609 _done(); | |
610 _closeCompleter.complete(_outbound); | |
611 }, | |
612 onError: (error, [StackTrace stackTrace]) { | |
613 _socketError = true; | |
614 if (_ignoreError(error)) { | |
615 _cancel(); | |
616 _done(); | |
617 _closeCompleter.complete(_outbound); | |
618 } else { | |
619 if (!_done(error)) { | |
620 _closeCompleter.completeError(error, stackTrace); | |
621 } | |
622 } | |
623 }); | |
624 } | |
625 | |
626 bool _done([error, StackTrace stackTrace]) { | |
627 if (_completer == null) return false; | |
628 if (error != null) { | |
629 _completer.completeError(error, stackTrace); | |
630 } else { | |
631 _completer.complete(_outbound); | |
632 } | |
633 _completer = null; | |
634 return true; | |
635 } | |
636 | |
637 Future addStream(var stream) { | |
638 // If we saw a socket error subscribe and then cancel, to ignore any data | |
639 // on the stream. | |
640 if (_socketError) { | |
641 stream.listen(null).cancel(); | |
642 return new Future.value(_outbound); | |
643 } | |
644 _completer = new Completer(); | |
645 _subscription = stream.listen( | |
646 (data) => _controller.add(data), | |
647 onDone: _done, | |
648 onError: (e, s) => _controller.addError(e, s), | |
649 cancelOnError: true); | |
650 // Pause the first request. | |
651 if (_controller == null) _subscription.pause(); | |
652 _ensureController(); | |
653 return _completer.future; | |
654 } | |
655 | |
656 Future close() { | |
657 Future closeOutbound() { | |
658 if (_socketError) return new Future.value(_outbound); | |
659 return _outbound._close() | |
660 .catchError((_) {}, test: _ignoreError) | |
661 .then((_) => _outbound); | |
662 } | |
663 if (_controller == null) return closeOutbound(); | |
664 _controller.close(); | |
665 return _closeCompleter.future.then((_) => closeOutbound()); | |
666 } | |
667 } | 567 } |
668 | 568 |
669 | 569 |
670 class _BufferTransformerSink implements EventSink<List<int>> { | |
671 static const int MIN_CHUNK_SIZE = 4 * 1024; | |
672 static const int MAX_BUFFER_SIZE = 16 * 1024; | |
673 | |
674 final BytesBuilder _builder = new BytesBuilder(); | |
675 final EventSink<List<int>> _outSink; | |
676 | |
677 _BufferTransformerSink(this._outSink); | |
678 | |
679 void add(List<int> data) { | |
680 // TODO(ajohnsen): Use timeout? | |
681 if (data.length == 0) return; | |
682 if (data.length >= MIN_CHUNK_SIZE) { | |
683 flush(); | |
684 _outSink.add(data); | |
685 } else { | |
686 _builder.add(data); | |
687 if (_builder.length >= MAX_BUFFER_SIZE) { | |
688 flush(); | |
689 } | |
690 } | |
691 } | |
692 | |
693 void addError(Object error, [StackTrace stackTrace]) { | |
694 _outSink.addError(error, stackTrace); | |
695 } | |
696 | |
697 void close() { | |
698 flush(); | |
699 _outSink.close(); | |
700 } | |
701 | |
702 void flush() { | |
703 if (_builder.length > 0) { | |
704 // takeBytes will clear the BytesBuilder. | |
705 _outSink.add(_builder.takeBytes()); | |
706 } | |
707 } | |
708 } | |
709 | |
710 class _BufferTransformer implements StreamTransformer<List<int>, List<int>> { | |
711 const _BufferTransformer(); | |
712 | |
713 Stream<List<int>> bind(Stream<List<int>> stream) { | |
714 return new Stream<List<int>>.eventTransformed( | |
715 stream, | |
716 (EventSink outSink) => new _BufferTransformerSink(outSink)); | |
717 } | |
718 } | |
719 | |
720 | |
721 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> | 570 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
722 implements HttpResponse { | 571 implements HttpResponse { |
723 int _statusCode = 200; | 572 int _statusCode = 200; |
724 String _reasonPhrase; | 573 String _reasonPhrase; |
725 List<Cookie> _cookies; | 574 List<Cookie> _cookies; |
726 _HttpRequest _httpRequest; | 575 _HttpRequest _httpRequest; |
727 Duration _deadline; | 576 Duration _deadline; |
728 Timer _deadlineTimer; | 577 Timer _deadlineTimer; |
729 | 578 |
730 _HttpResponse(Uri uri, | 579 _HttpResponse(Uri uri, |
(...skipping 25 matching lines...) Expand all Loading... | |
756 if (_headersWritten) throw new StateError("Header already sent"); | 605 if (_headersWritten) throw new StateError("Header already sent"); |
757 statusCode = status; | 606 statusCode = status; |
758 headers.set("location", location.toString()); | 607 headers.set("location", location.toString()); |
759 return close(); | 608 return close(); |
760 } | 609 } |
761 | 610 |
762 Future<Socket> detachSocket() { | 611 Future<Socket> detachSocket() { |
763 if (_headersWritten) throw new StateError("Headers already sent"); | 612 if (_headersWritten) throw new StateError("Headers already sent"); |
764 deadline = null; // Be sure to stop any deadline. | 613 deadline = null; // Be sure to stop any deadline. |
765 var future = _httpRequest._httpConnection.detachSocket(); | 614 var future = _httpRequest._httpConnection.detachSocket(); |
766 _writeHeaders(drainRequest: false).then((_) => close()); | 615 _writeHeaders(drainRequest: false, |
616 setOutgoing: false).then((_) => close()); | |
767 // Close connection so the socket is 'free'. | 617 // Close connection so the socket is 'free'. |
768 close(); | 618 close(); |
769 done.catchError((_) { | 619 done.catchError((_) { |
770 // Catch any error on done, as they automatically will be | 620 // Catch any error on done, as they automatically will be |
771 // propagated to the websocket. | 621 // propagated to the websocket. |
772 }); | 622 }); |
773 return future; | 623 return future; |
774 } | 624 } |
775 | 625 |
776 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; | 626 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
777 | 627 |
778 Duration get deadline => _deadline; | 628 Duration get deadline => _deadline; |
779 | 629 |
780 void set deadline(Duration d) { | 630 void set deadline(Duration d) { |
781 if (_deadlineTimer != null) _deadlineTimer.cancel(); | 631 if (_deadlineTimer != null) _deadlineTimer.cancel(); |
782 _deadline = d; | 632 _deadline = d; |
783 | 633 |
784 if (_deadline == null) return; | 634 if (_deadline == null) return; |
785 _deadlineTimer = new Timer(_deadline, () { | 635 _deadlineTimer = new Timer(_deadline, () { |
636 _outgoing._socketError = true; | |
786 _outgoing.socket.destroy(); | 637 _outgoing.socket.destroy(); |
787 }); | 638 }); |
788 } | 639 } |
789 | 640 |
790 void _writeHeader() { | 641 void _writeHeader() { |
791 Uint8List buffer = _httpRequest._httpConnection._headersBuffer; | 642 Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
792 int offset = 0; | 643 int offset = 0; |
793 | 644 |
794 void write(List<int> bytes) { | 645 void write(List<int> bytes) { |
795 int len = bytes.length; | 646 int len = bytes.length; |
796 for (int i = 0; i < len; i++) { | 647 for (int i = 0; i < len; i++) { |
797 buffer[offset + i] = bytes[i]; | 648 buffer[offset + i] = bytes[i]; |
798 } | 649 } |
799 offset += len; | 650 offset += len; |
800 } | 651 } |
801 | 652 |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
840 headers.add(HttpHeaders.SET_COOKIE, cookie); | 691 headers.add(HttpHeaders.SET_COOKIE, cookie); |
841 }); | 692 }); |
842 } | 693 } |
843 | 694 |
844 headers._finalize(); | 695 headers._finalize(); |
845 | 696 |
846 // Write headers. | 697 // Write headers. |
847 offset = headers._write(buffer, offset); | 698 offset = headers._write(buffer, offset); |
848 buffer[offset++] = _CharCode.CR; | 699 buffer[offset++] = _CharCode.CR; |
849 buffer[offset++] = _CharCode.LF; | 700 buffer[offset++] = _CharCode.LF; |
850 _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); | 701 _outgoing.setHeader(buffer, offset); |
851 } | 702 } |
852 | 703 |
853 String _findReasonPhrase(int statusCode) { | 704 String _findReasonPhrase(int statusCode) { |
854 if (_reasonPhrase != null) { | 705 if (_reasonPhrase != null) { |
855 return _reasonPhrase; | 706 return _reasonPhrase; |
856 } | 707 } |
857 | 708 |
858 switch (statusCode) { | 709 switch (statusCode) { |
859 case HttpStatus.CONTINUE: return "Continue"; | 710 case HttpStatus.CONTINUE: return "Continue"; |
860 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; | 711 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; |
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1029 if (_httpClientConnection._proxyTunnel) { | 880 if (_httpClientConnection._proxyTunnel) { |
1030 return uriStartingFromPath(); | 881 return uriStartingFromPath(); |
1031 } else { | 882 } else { |
1032 return uri.toString(); | 883 return uri.toString(); |
1033 } | 884 } |
1034 } | 885 } |
1035 } | 886 } |
1036 } | 887 } |
1037 | 888 |
1038 void _writeHeader() { | 889 void _writeHeader() { |
1039 Uint8List buffer = _httpClientConnection._headersBuffer; | 890 Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); |
1040 int offset = 0; | 891 int offset = 0; |
1041 | 892 |
1042 void write(List<int> bytes) { | 893 void write(List<int> bytes) { |
1043 int len = bytes.length; | 894 int len = bytes.length; |
1044 for (int i = 0; i < len; i++) { | 895 for (int i = 0; i < len; i++) { |
1045 buffer[offset + i] = bytes[i]; | 896 buffer[offset + i] = bytes[i]; |
1046 } | 897 } |
1047 offset += len; | 898 offset += len; |
1048 } | 899 } |
1049 | 900 |
(...skipping 17 matching lines...) Expand all Loading... | |
1067 } | 918 } |
1068 headers.add(HttpHeaders.COOKIE, sb.toString()); | 919 headers.add(HttpHeaders.COOKIE, sb.toString()); |
1069 } | 920 } |
1070 | 921 |
1071 headers._finalize(); | 922 headers._finalize(); |
1072 | 923 |
1073 // Write headers. | 924 // Write headers. |
1074 offset = headers._write(buffer, offset); | 925 offset = headers._write(buffer, offset); |
1075 buffer[offset++] = _CharCode.CR; | 926 buffer[offset++] = _CharCode.CR; |
1076 buffer[offset++] = _CharCode.LF; | 927 buffer[offset++] = _CharCode.LF; |
1077 _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); | 928 _outgoing.setHeader(buffer, offset); |
1078 } | 929 } |
1079 } | 930 } |
1080 | 931 |
1081 | 932 // Used by _HttpOutgoing as a target of a chunked converter for gzip |
1082 class _ChunkedTransformerSink implements EventSink<List<int>> { | 933 // compression. |
1083 | 934 class _HttpGZipSink extends ByteConversionSink { |
1084 int _pendingFooter = 0; | 935 final Function _consume; |
1085 final EventSink<List<int>> _outSink; | 936 _HttpGZipSink(this._consume); |
1086 | 937 |
1087 _ChunkedTransformerSink(this._outSink); | 938 void add(List<int> chunk) { |
1088 | 939 _consume(chunk); |
1089 void add(List<int> data) { | 940 } |
1090 _outSink.add(_chunkHeader(data.length)); | 941 |
1091 if (data.length > 0) _outSink.add(data); | 942 void addSlice(Uint8List chunk, int start, int end, bool isLast) { |
1092 _pendingFooter = 2; | 943 _consume(new Uint8List.view(chunk.buffer, start, end - start)); |
1093 } | 944 } |
1094 | 945 |
1095 void addError(Object error, [StackTrace stackTrace]) { | 946 void close() {} |
1096 _outSink.addError(error, stackTrace); | 947 } |
1097 } | 948 |
1098 | 949 |
1099 void close() { | 950 // The _HttpOutgoing handles all of the following: |
1100 add(const []); | 951 // - Buffering |
1101 _outSink.close(); | 952 // - GZip compressionm |
953 // - Content-Length validation. | |
954 // - Errors. | |
955 // | |
956 // Most notable is the GZip compression, that uses a double-buffering system, | |
957 // one before gzip (_gzipBuffer) and one after (_buffer). | |
958 class _HttpOutgoing | |
959 implements StreamConsumer<List<int>> { | |
960 static const List<int> _footerAndChunk0Length = | |
961 const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, | |
962 _CharCode.CR, _CharCode.LF]; | |
963 | |
964 static const List<int> _chunk0Length = | |
965 const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]; | |
966 | |
967 final Completer _doneCompleter = new Completer(); | |
968 final Socket socket; | |
969 | |
970 Uint8List _buffer; | |
971 int _length = 0; | |
972 | |
973 Future _closeFuture; | |
974 | |
975 bool chunked = false; | |
976 int _pendingChunkedFooter = 0; | |
977 | |
978 int contentLength; | |
979 int _bytesWritten = 0; | |
980 | |
981 bool _gzip = false; | |
982 ByteConversionSink _gzipSink; | |
983 // _gzipAdd is set iff the sink is being added to. It's used to specify where | |
984 // gzipped data should be taken (sometimes a controller, sometimes a socket). | |
985 Function _gzipAdd; | |
986 Uint8List _gzipBuffer; | |
987 int _gzipBufferLength = 0; | |
988 | |
989 bool _socketError = false; | |
990 | |
991 _HttpOutboundMessage outbound; | |
992 | |
993 bool _ignoreError(error) | |
994 => (error is SocketException || error is TlsException) && | |
995 outbound is HttpResponse; | |
996 | |
997 _HttpOutgoing(this.socket); | |
998 | |
999 Future addStream(Stream<List<int>> stream, [Future pauseFuture]) { | |
1000 if (_socketError) { | |
1001 stream.listen(null).cancel(); | |
1002 return new Future.value(outbound); | |
1003 } | |
1004 var sub; | |
1005 var controller; | |
1006 // Use new stream so we are able to pause (see below listen). The | |
1007 // alternative is to use stream.extand, but that won't give us a way of | |
1008 // pausing. | |
1009 controller = new StreamController( | |
1010 onPause: () => sub.pause(), | |
1011 onResume: () => sub.resume(), | |
1012 sync: true); | |
1013 sub = stream.listen( | |
1014 (data) { | |
1015 if (_socketError) return; | |
1016 if (data.length == 0) return; | |
1017 if (chunked) { | |
1018 if (_gzip) { | |
1019 _gzipAdd = controller.add; | |
1020 _addGZipChunk(data, _gzipSink.add); | |
1021 _gzipAdd = null; | |
1022 return; | |
1023 } | |
1024 _addChunk(_chunkHeader(data.length), controller.add); | |
1025 _pendingChunkedFooter = 2; | |
1026 } else { | |
1027 if (contentLength != null) { | |
1028 _bytesWritten += data.length; | |
1029 if (_bytesWritten > contentLength) { | |
1030 controller.addError(new HttpException( | |
1031 "Content size exceeds specified contentLength. " | |
1032 "$_bytesWritten bytes written while expected " | |
1033 "$contentLength. " | |
1034 "[${new String.fromCharCodes(data)}]")); | |
1035 return; | |
1036 } | |
1037 } | |
1038 } | |
1039 _addChunk(data, controller.add); | |
Søren Gjesse
2014/03/04 09:29:15
Please moving this up to the two placed where it f
Anders Johnsen
2014/03/04 13:54:16
Done.
| |
1040 }, | |
1041 onError: controller.addError, | |
1042 onDone: controller.close, | |
1043 cancelOnError: true); | |
1044 | |
1045 // While incoming is being drained, the pauseFuture is non-null. Pause | |
1046 // output until it's drained. | |
1047 if (pauseFuture != null) { | |
1048 sub.pause(pauseFuture); | |
1049 } | |
1050 | |
1051 return socket.addStream(controller.stream) | |
1052 .then((_) { | |
1053 return outbound; | |
1054 }, onError: (error) { | |
1055 // Be sure to close it in case of an error. | |
1056 if (_gzip) _gzipSink.close(); | |
1057 _socketError = true; | |
1058 _doneCompleter.completeError(error); | |
1059 if (_ignoreError(error)) { | |
1060 return outbound; | |
1061 } else { | |
1062 throw error; | |
1063 } | |
1064 }); | |
1065 } | |
1066 | |
1067 Future close() { | |
1068 // If we are already closed, return that future. | |
1069 if (_closeFuture != null) return _closeFuture; | |
1070 // If we earlier saw an error, return immidiate. The notification to | |
1071 // _Http*Connection is already done. | |
1072 if (_socketError) return new Future.value(outbound); | |
1073 // If contentLength was specified, validate it. | |
1074 if (contentLength != null) { | |
1075 if (_bytesWritten < contentLength) { | |
1076 var error = new HttpException( | |
1077 "Content size below specified contentLength. " | |
1078 " $_bytesWritten bytes written but expected " | |
1079 "$contentLength."); | |
1080 _doneCompleter.completeError(error); | |
1081 return _closeFuture = new Future.error(error); | |
1082 } | |
1083 } | |
1084 // In case of chunked encoding (and gzip), handle remaining gzip data and | |
1085 // append the 'footer' for chunked encoding. | |
1086 if (chunked) { | |
1087 if (_gzip) { | |
1088 _gzipAdd = socket.add; | |
1089 if (_gzipBufferLength > 0) { | |
1090 _gzipSink.add(new Uint8List.view( | |
1091 _gzipBuffer.buffer, 0, _gzipBufferLength)); | |
1092 } | |
1093 _gzipBuffer = null; | |
1094 _gzipSink.close(); | |
1095 _gzipAdd = null; | |
1096 } | |
1097 _addChunk(_chunkHeader(0), socket.add); | |
1098 } | |
1099 // Add any remaining data in the buffer. | |
1100 if (_length > 0) { | |
1101 socket.add(new Uint8List.view(_buffer.buffer, 0, _length)); | |
1102 } | |
1103 // Clear references, for better GC. | |
1104 _buffer = null; | |
1105 // And finally flush it. As we support keep-alive, never close it from here. | |
1106 // Once the socket is flushed, we'll be able to reuse it (signaled by the | |
1107 // 'done' future). | |
1108 return _closeFuture = socket.flush() | |
1109 .then((_) { | |
1110 _doneCompleter.complete(socket); | |
1111 return outbound; | |
1112 }, onError: (error) { | |
1113 _doneCompleter.completeError(error); | |
1114 if (_ignoreError(error)) { | |
1115 return outbound; | |
1116 } else { | |
1117 throw error; | |
1118 } | |
1119 }); | |
1120 } | |
1121 | |
1122 Future get done => _doneCompleter.future; | |
1123 | |
1124 void setHeader(List<int> data, int length) { | |
1125 assert(_length == 0); | |
1126 assert(data.length == _OUTGOING_BUFFER_SIZE); | |
1127 _buffer = data; | |
1128 _length = length; | |
1129 } | |
1130 | |
1131 void set gzip(bool value) { | |
1132 _gzip = value; | |
1133 if (_gzip) { | |
1134 _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE); | |
1135 assert(_gzipSink == null); | |
1136 _gzipSink = new ZLibEncoder(gzip: true) | |
1137 .startChunkedConversion( | |
1138 new _HttpGZipSink((data) { | |
1139 // We are closing down prematurely, due to an error. Discard. | |
1140 if (_gzipAdd == null) return; | |
1141 _addChunk(_chunkHeader(data.length), _gzipAdd); | |
1142 _pendingChunkedFooter = 2; | |
1143 _addChunk(data, _gzipAdd); | |
1144 })); | |
1145 } | |
1146 } | |
1147 | |
1148 void _addGZipChunk(chunk, void add(List<int> data)) { | |
1149 if (chunk.length > _gzipBuffer.length - _gzipBufferLength) { | |
1150 add(new Uint8List.view( | |
1151 _gzipBuffer.buffer, 0, _gzipBufferLength)); | |
1152 _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE); | |
1153 _gzipBufferLength = 0; | |
1154 } | |
1155 if (chunk.length > _OUTGOING_BUFFER_SIZE) { | |
1156 add(chunk); | |
1157 } else { | |
1158 _gzipBuffer.setRange(_gzipBufferLength, | |
1159 _gzipBufferLength + chunk.length, | |
1160 chunk); | |
1161 _gzipBufferLength += chunk.length; | |
1162 } | |
1163 } | |
1164 | |
1165 void _addChunk(chunk, void add(List<int> data)) { | |
1166 if (chunk.length > _buffer.length - _length) { | |
1167 add(new Uint8List.view(_buffer.buffer, 0, _length)); | |
1168 _buffer = new Uint8List(_OUTGOING_BUFFER_SIZE); | |
1169 _length = 0; | |
1170 } | |
1171 if (chunk.length > _OUTGOING_BUFFER_SIZE) { | |
1172 add(chunk); | |
1173 } else { | |
1174 _buffer.setRange(_length, _length + chunk.length, chunk); | |
1175 _length += chunk.length; | |
1176 } | |
1102 } | 1177 } |
1103 | 1178 |
1104 List<int> _chunkHeader(int length) { | 1179 List<int> _chunkHeader(int length) { |
1105 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, | 1180 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
1106 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; | 1181 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
1107 if (length == 0) { | 1182 if (length == 0) { |
1108 if (_pendingFooter == 2) return _footerAndChunk0Length; | 1183 if (_pendingChunkedFooter == 2) return _footerAndChunk0Length; |
1109 return _chunk0Length; | 1184 return _chunk0Length; |
1110 } | 1185 } |
1111 int size = _pendingFooter; | 1186 int size = _pendingChunkedFooter; |
1112 int len = length; | 1187 int len = length; |
1113 // Compute a fast integer version of (log(length + 1) / log(16)).ceil(). | 1188 // Compute a fast integer version of (log(length + 1) / log(16)).ceil(). |
1114 while (len > 0) { | 1189 while (len > 0) { |
1115 size++; | 1190 size++; |
1116 len >>= 4; | 1191 len >>= 4; |
1117 } | 1192 } |
1118 var footerAndHeader = new Uint8List(size + 2); | 1193 var footerAndHeader = new Uint8List(size + 2); |
1119 if (_pendingFooter == 2) { | 1194 if (_pendingChunkedFooter == 2) { |
1120 footerAndHeader[0] = _CharCode.CR; | 1195 footerAndHeader[0] = _CharCode.CR; |
1121 footerAndHeader[1] = _CharCode.LF; | 1196 footerAndHeader[1] = _CharCode.LF; |
1122 } | 1197 } |
1123 int index = size; | 1198 int index = size; |
1124 while (index > _pendingFooter) { | 1199 while (index > _pendingChunkedFooter) { |
1125 footerAndHeader[--index] = hexDigits[length & 15]; | 1200 footerAndHeader[--index] = hexDigits[length & 15]; |
1126 length = length >> 4; | 1201 length = length >> 4; |
1127 } | 1202 } |
1128 footerAndHeader[size + 0] = _CharCode.CR; | 1203 footerAndHeader[size + 0] = _CharCode.CR; |
1129 footerAndHeader[size + 1] = _CharCode.LF; | 1204 footerAndHeader[size + 1] = _CharCode.LF; |
1130 return footerAndHeader; | 1205 return footerAndHeader; |
1131 } | 1206 } |
1132 | |
1133 static List<int> get _footerAndChunk0Length => new Uint8List.fromList( | |
1134 const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, | |
1135 _CharCode.CR, _CharCode.LF]); | |
1136 | |
1137 static List<int> get _chunk0Length => new Uint8List.fromList( | |
1138 const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]); | |
1139 } | |
1140 | |
1141 // Transformer that transforms data to HTTP Chunked Encoding. | |
1142 class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> { | |
1143 const _ChunkedTransformer(); | |
1144 | |
1145 Stream<List<int>> bind(Stream<List<int>> stream) { | |
1146 return new Stream<List<int>>.eventTransformed( | |
1147 stream, | |
1148 (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink)); | |
1149 } | |
1150 } | |
1151 | |
1152 // Transformer that validates the content length. | |
1153 class _ContentLengthValidator | |
1154 implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> { | |
1155 final int expectedContentLength; | |
1156 final Uri uri; | |
1157 int _bytesWritten = 0; | |
1158 | |
1159 EventSink<List<int>> _outSink; | |
1160 | |
1161 _ContentLengthValidator(this.expectedContentLength, this.uri); | |
1162 | |
1163 Stream<List<int>> bind(Stream<List<int>> stream) { | |
1164 return new Stream.eventTransformed( | |
1165 stream, | |
1166 (EventSink sink) { | |
1167 if (_outSink != null) { | |
1168 throw new StateError("Validator transformer already used"); | |
1169 } | |
1170 _outSink = sink; | |
1171 return this; | |
1172 }); | |
1173 } | |
1174 | |
1175 void add(List<int> data) { | |
1176 _bytesWritten += data.length; | |
1177 if (_bytesWritten > expectedContentLength) { | |
1178 _outSink.addError(new HttpException( | |
1179 "Content size exceeds specified contentLength. " | |
1180 "$_bytesWritten bytes written while expected " | |
1181 "$expectedContentLength. " | |
1182 "[${new String.fromCharCodes(data)}]", | |
1183 uri: uri)); | |
1184 _outSink.close(); | |
1185 } else { | |
1186 _outSink.add(data); | |
1187 } | |
1188 } | |
1189 | |
1190 void addError(Object error, [StackTrace stackTrace]) { | |
1191 _outSink.addError(error, stackTrace); | |
1192 } | |
1193 | |
1194 void close() { | |
1195 if (_bytesWritten < expectedContentLength) { | |
1196 _outSink.addError(new HttpException( | |
1197 "Content size below specified contentLength. " | |
1198 " $_bytesWritten bytes written while expected " | |
1199 "$expectedContentLength.", | |
1200 uri: uri)); | |
1201 } | |
1202 _outSink.close(); | |
1203 } | |
1204 } | |
1205 | |
1206 | |
1207 // Extends StreamConsumer as this is an internal type, only used to pipe to. | |
1208 class _HttpOutgoing implements StreamConsumer<List<int>> { | |
1209 final Completer _doneCompleter = new Completer(); | |
1210 final Socket socket; | |
1211 | |
1212 _HttpOutgoing(this.socket); | |
1213 | |
1214 Future addStream(Stream<List<int>> stream) { | |
1215 return socket.addStream(stream) | |
1216 .catchError((error) { | |
1217 _doneCompleter.completeError(error); | |
1218 throw error; | |
1219 }); | |
1220 } | |
1221 | |
1222 Future close() { | |
1223 _doneCompleter.complete(socket); | |
1224 return new Future.value(); | |
1225 } | |
1226 | |
1227 Future get done => _doneCompleter.future; | |
1228 } | 1207 } |
1229 | 1208 |
1230 class _HttpClientConnection { | 1209 class _HttpClientConnection { |
1231 final String key; | 1210 final String key; |
1232 final Socket _socket; | 1211 final Socket _socket; |
1233 final bool _proxyTunnel; | 1212 final bool _proxyTunnel; |
1234 final _HttpParser _httpParser; | 1213 final _HttpParser _httpParser; |
1235 StreamSubscription _subscription; | 1214 StreamSubscription _subscription; |
1236 final _HttpClient _httpClient; | 1215 final _HttpClient _httpClient; |
1237 bool _dispose = false; | 1216 bool _dispose = false; |
1238 Timer _idleTimer; | 1217 Timer _idleTimer; |
1239 bool closed = false; | 1218 bool closed = false; |
1240 Uri _currentUri; | 1219 Uri _currentUri; |
1241 final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE); | |
1242 | 1220 |
1243 Completer<_HttpIncoming> _nextResponseCompleter; | 1221 Completer<_HttpIncoming> _nextResponseCompleter; |
1244 Future _streamFuture; | 1222 Future _streamFuture; |
1245 | 1223 |
1246 _HttpClientConnection(this.key, this._socket, this._httpClient, | 1224 _HttpClientConnection(this.key, this._socket, this._httpClient, |
1247 [this._proxyTunnel = false]) | 1225 [this._proxyTunnel = false]) |
1248 : _httpParser = new _HttpParser.responseParser() { | 1226 : _httpParser = new _HttpParser.responseParser() { |
1249 _httpParser.listenToStream(_socket); | 1227 _httpParser.listenToStream(_socket); |
1250 | 1228 |
1251 // Set up handlers on the parser here, so we are sure to get 'onDone' from | 1229 // Set up handlers on the parser here, so we are sure to get 'onDone' from |
(...skipping 642 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1894 static const _IDLE = 1; | 1872 static const _IDLE = 1; |
1895 static const _CLOSING = 2; | 1873 static const _CLOSING = 2; |
1896 static const _DETACHED = 3; | 1874 static const _DETACHED = 3; |
1897 | 1875 |
1898 final Socket _socket; | 1876 final Socket _socket; |
1899 final _HttpServer _httpServer; | 1877 final _HttpServer _httpServer; |
1900 final _HttpParser _httpParser; | 1878 final _HttpParser _httpParser; |
1901 int _state = _IDLE; | 1879 int _state = _IDLE; |
1902 StreamSubscription _subscription; | 1880 StreamSubscription _subscription; |
1903 Timer _idleTimer; | 1881 Timer _idleTimer; |
1904 final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE); | |
1905 bool _idleMark = false; | 1882 bool _idleMark = false; |
1906 Future _streamFuture; | 1883 Future _streamFuture; |
1907 | 1884 |
1908 _HttpConnection(this._socket, this._httpServer) | 1885 _HttpConnection(this._socket, this._httpServer) |
1909 : _httpParser = new _HttpParser.requestParser() { | 1886 : _httpParser = new _HttpParser.requestParser() { |
1910 _httpParser.listenToStream(_socket); | 1887 _httpParser.listenToStream(_socket); |
1911 _subscription = _httpParser.listen( | 1888 _subscription = _httpParser.listen( |
1912 (incoming) { | 1889 (incoming) { |
1913 _httpServer._markActive(this); | 1890 _httpServer._markActive(this); |
1914 // If the incoming was closed, close the connection. | 1891 // If the incoming was closed, close the connection. |
(...skipping 682 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
2597 const _RedirectInfo(this.statusCode, this.method, this.location); | 2574 const _RedirectInfo(this.statusCode, this.method, this.location); |
2598 } | 2575 } |
2599 | 2576 |
2600 String _getHttpVersion() { | 2577 String _getHttpVersion() { |
2601 var version = Platform.version; | 2578 var version = Platform.version; |
2602 // Only include major and minor version numbers. | 2579 // Only include major and minor version numbers. |
2603 int index = version.indexOf('.', version.indexOf('.') + 1); | 2580 int index = version.indexOf('.', version.indexOf('.') + 1); |
2604 version = version.substring(0, index); | 2581 version = version.substring(0, index); |
2605 return 'Dart/$version (dart:io)'; | 2582 return 'Dart/$version (dart:io)'; |
2606 } | 2583 } |
OLD | NEW |