Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(42)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 185543004: Merge all http-outgoing transfomers into _HttpOutgoing, including simpler(better) buffering. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const int _HEADERS_BUFFER_SIZE = 8 * 1024; 7 const int _OUTGOING_BUFFER_SIZE = 8 * 1024;
8 8
9 class _HttpIncoming extends Stream<List<int>> { 9 class _HttpIncoming extends Stream<List<int>> {
10 final int _transferLength; 10 final int _transferLength;
11 final Completer _dataCompleter = new Completer(); 11 final Completer _dataCompleter = new Completer();
12 Stream<List<int>> _stream; 12 Stream<List<int>> _stream;
13 13
14 bool fullBodyRead = false; 14 bool fullBodyRead = false;
15 15
16 // Common properties. 16 // Common properties.
17 final _HttpHeaders headers; 17 final _HttpHeaders headers;
(...skipping 381 matching lines...) Expand 10 before | Expand all | Expand 10 after
399 return retry(); 399 return retry();
400 } else { 400 } else {
401 // No credentials available, complete with original response. 401 // No credentials available, complete with original response.
402 return this; 402 return this;
403 } 403 }
404 }); 404 });
405 } 405 }
406 } 406 }
407 407
408 408
409 abstract class _HttpOutboundMessage<T> implements IOSink { 409 abstract class _HttpOutboundMessage<T> extends _IOSinkImpl {
410 // Used to mark when the body should be written. This is used for HEAD 410 // Used to mark when the body should be written. This is used for HEAD
411 // requests and in error handling. 411 // requests and in error handling.
412 bool _ignoreBody = false; 412 bool _ignoreBody = false;
413 bool _headersWritten = false; 413 bool _headersWritten = false;
414 bool _asGZip = false;
415 414
416 IOSink _headersSink; 415 final Uri _uri;
417 IOSink _dataSink;
418
419 final _HttpOutgoing _outgoing; 416 final _HttpOutgoing _outgoing;
420 final Uri _uri;
421 417
422 final _HttpHeaders headers; 418 final _HttpHeaders headers;
423 419
424 _HttpOutboundMessage(this._uri, 420 _HttpOutboundMessage(this._uri,
425 String protocolVersion, 421 String protocolVersion,
426 _HttpOutgoing outgoing) 422 this._outgoing)
427 : _outgoing = outgoing, 423 : super(new _HttpOutboundConsumer(), null),
428 _headersSink = new IOSink(outgoing, encoding: ASCII),
429 headers = new _HttpHeaders(protocolVersion) { 424 headers = new _HttpHeaders(protocolVersion) {
430 _dataSink = new IOSink(new _HttpOutboundConsumer(this)); 425 _outgoing.outbound = this;
426 (_target as _HttpOutboundConsumer).outbound = this;
Lasse Reichstein Nielsen 2014/03/03 11:37:23 Consider just doing an assignment to a _HttpOutbou
Anders Johnsen 2014/03/03 19:43:14 I see. However, this is planned to go away, so I'l
427 _encodingMutable = false;
431 } 428 }
432 429
433 int get contentLength => headers.contentLength; 430 int get contentLength => headers.contentLength;
434 void set contentLength(int contentLength) { 431 void set contentLength(int contentLength) {
435 headers.contentLength = contentLength; 432 headers.contentLength = contentLength;
436 } 433 }
437 434
438 bool get persistentConnection => headers.persistentConnection; 435 bool get persistentConnection => headers.persistentConnection;
439 void set persistentConnection(bool p) { 436 void set persistentConnection(bool p) {
440 headers.persistentConnection = p; 437 headers.persistentConnection = p;
441 } 438 }
442 439
443 Encoding get encoding { 440 Encoding get encoding {
444 var charset; 441 var charset;
445 if (headers.contentType != null && headers.contentType.charset != null) { 442 if (headers.contentType != null && headers.contentType.charset != null) {
446 charset = headers.contentType.charset; 443 charset = headers.contentType.charset;
447 } else { 444 } else {
448 charset = "iso-8859-1"; 445 charset = "iso-8859-1";
449 } 446 }
450 return Encoding.getByName(charset); 447 return Encoding.getByName(charset);
451 } 448 }
452 449
453 void set encoding(Encoding value) {
454 throw new StateError("IOSink encoding is not mutable");
455 }
456
457 void write(Object obj) {
458 if (!_headersWritten) _dataSink.encoding = encoding;
459 _dataSink.write(obj);
460 }
461
462 void writeAll(Iterable objects, [String separator = ""]) {
463 if (!_headersWritten) _dataSink.encoding = encoding;
464 _dataSink.writeAll(objects, separator);
465 }
466
467 void writeln([Object obj = ""]) {
468 if (!_headersWritten) _dataSink.encoding = encoding;
469 _dataSink.writeln(obj);
470 }
471
472 void writeCharCode(int charCode) {
473 if (!_headersWritten) _dataSink.encoding = encoding;
474 _dataSink.writeCharCode(charCode);
475 }
476
477 void add(List<int> data) { 450 void add(List<int> data) {
478 if (data.length == 0) return; 451 if (data.length == 0) return;
479 _dataSink.add(data); 452 super.add(data);
480 } 453 }
481 454
482 void addError(error, [StackTrace stackTrace]) => 455 Future _writeHeaders({bool drainRequest: true,
483 _dataSink.addError(error, stackTrace); 456 bool setOutgoing: true}) {
484 457 // TODO(ajohnsen): Avoid excessive futures in this method.
485 Future<T> addStream(Stream<List<int>> stream) => _dataSink.addStream(stream); 458 write() {
486
487 Future flush() => _dataSink.flush();
488
489 Future close() => _dataSink.close();
490
491 Future<T> get done => _dataSink.done;
492
493 Future _writeHeaders({bool drainRequest: true}) {
494 void write() {
495 try { 459 try {
496 _writeHeader(); 460 _writeHeader();
497 } catch (error) { 461 } catch (error, s) {
498 // Headers too large. 462 // Headers too large.
499 throw new HttpException( 463 throw new HttpException(
500 "Headers size exceeded the of '$_HEADERS_BUFFER_SIZE' bytes"); 464 "Headers size exceeded the of '$_OUTGOING_BUFFER_SIZE'"
465 " bytes");
501 } 466 }
467 return this;
502 } 468 }
503 if (_headersWritten) return new Future.value(); 469 if (_headersWritten) return new Future.value(this);
504 _headersWritten = true; 470 _headersWritten = true;
505 _dataSink.encoding = encoding; 471 Future drainFuture;
506 bool isServerSide = this is _HttpResponse; 472 bool isServerSide = this is _HttpResponse;
473 bool gzip = false;
507 if (isServerSide) { 474 if (isServerSide) {
508 var response = this; 475 var response = this;
509 if (headers.chunkedTransferEncoding) { 476 if (headers.chunkedTransferEncoding) {
510 List acceptEncodings = 477 List acceptEncodings =
511 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; 478 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
512 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; 479 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
513 if (acceptEncodings != null && 480 if (acceptEncodings != null &&
514 acceptEncodings 481 acceptEncodings
515 .expand((list) => list.split(",")) 482 .expand((list) => list.split(","))
516 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && 483 .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
517 contentEncoding == null) { 484 contentEncoding == null) {
518 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); 485 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
519 _asGZip = true; 486 gzip = true;
520 } 487 }
521 } 488 }
522 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) { 489 if (drainRequest && !response._httpRequest._incoming.hasSubscriber) {
523 return response._httpRequest.drain() 490 drainFuture = response._httpRequest.drain().catchError((_) {});
524 // TODO(ajohnsen): Timeout on drain?
525 .catchError((_) {}) // Ignore errors.
526 .then((_) => write());
527 } 491 }
492 } else {
493 drainRequest = false;
494 }
495 if (_ignoreBody) {
496 return new Future.sync(write).then((_) => _outgoing.close());
497 }
498 if (setOutgoing) {
499 int contentLength = headers.contentLength;
500 if (headers.chunkedTransferEncoding) {
501 _outgoing.chunked = true;
502 if (gzip) _outgoing.gzip = true;
503 } else if (contentLength >= 0) {
504 _outgoing.contentLength = contentLength;
505 }
506 }
507 if (drainFuture != null) {
508 return drainFuture.then((_) => write());
528 } 509 }
529 return new Future.sync(write); 510 return new Future.sync(write);
530 } 511 }
531 512
532 Future _addStream(Stream<List<int>> stream) { 513 Future _addStream(Stream<List<int>> stream) {
533 return _writeHeaders() 514 // TODO(ajohnsen): Merge into _HttpOutgoing.
534 .then((_) { 515 if (_ignoreBody) {
535 int contentLength = headers.contentLength; 516 stream.drain().catchError((_) {});
Lasse Reichstein Nielsen 2014/03/03 11:37:23 The "drain" method just ignores input and waits fo
Anders Johnsen 2014/03/03 19:43:14 No. The remote user may see that as write failing,
536 if (_ignoreBody) { 517 return _writeHeaders();
537 stream.drain().catchError((_) {}); 518 }
538 return _headersSink.close(); 519 if (_headersWritten) {
539 } 520 return _outgoing.addStream(stream);
540 stream = stream.transform(const _BufferTransformer()); 521 } else {
541 if (headers.chunkedTransferEncoding) { 522 var completer = new Completer.sync();
542 if (_asGZip) { 523 var future = _outgoing.addStream(stream, completer.future);
543 stream = stream.transform(GZIP.encoder); 524 _writeHeaders().then(completer.complete);
544 } 525 return future;
545 stream = stream.transform(const _ChunkedTransformer()); 526 }
546 } else if (contentLength >= 0) {
547 stream = stream.transform(
548 new _ContentLengthValidator(contentLength, _uri));
549 }
550 return _headersSink.addStream(stream);
551 });
552 } 527 }
553 528
554 Future _close() { 529 Future _close() {
530 // TODO(ajohnsen): Merge into _HttpOutgoing.
555 if (!_headersWritten) { 531 if (!_headersWritten) {
556 if (!_ignoreBody && headers.contentLength == -1) { 532 if (!_ignoreBody && headers.contentLength == -1) {
557 // If no body was written, _ignoreBody is false (it's not a HEAD 533 // If no body was written, _ignoreBody is false (it's not a HEAD
558 // request) and the content-length is unspecified, set contentLength to 534 // request) and the content-length is unspecified, set contentLength to
559 // 0. 535 // 0.
560 headers.chunkedTransferEncoding = false; 536 headers.chunkedTransferEncoding = false;
561 headers.contentLength = 0; 537 headers.contentLength = 0;
562 } else if (!_ignoreBody && headers.contentLength > 0) { 538 } else if (!_ignoreBody && headers.contentLength > 0) {
563 _headersSink.addError(new HttpException( 539 return _outgoing.addStream(
564 "No content while contentLength was specified to be greater " 540 new Stream.fromFuture(new Future.error(new HttpException(
565 "than 0: ${headers.contentLength}.", 541 "No content while contentLength was specified to be greater "
Lasse Reichstein Nielsen 2014/03/03 11:37:23 while -> and Or perhaps "even though".
Anders Johnsen 2014/03/03 19:43:14 Done.
566 uri: _uri)); 542 "than 0: ${headers.contentLength}.",
567 return _headersSink.done; 543 uri: _uri))));
568 } 544 }
569 } 545 }
570 return _writeHeaders().whenComplete(_headersSink.close); 546 return _writeHeaders().whenComplete(_outgoing.close);
571 } 547 }
572 548
573 void _writeHeader(); 549 void _writeHeader();
574 } 550 }
575 551
576 552
577 class _HttpOutboundConsumer implements StreamConsumer { 553 class _HttpOutboundConsumer implements StreamConsumer {
578 final _HttpOutboundMessage _outbound; 554 // TODO(ajohnsen): Once _addStream and _close is merged into _HttpOutgoing,
579 StreamController _controller; 555 // this class can be removed.
580 StreamSubscription _subscription; 556 _HttpOutboundMessage outbound;
581 Completer _closeCompleter = new Completer(); 557 _HttpOutboundConsumer();
582 Completer _completer;
583 bool _socketError = false;
584 558
585 _HttpOutboundConsumer(this._outbound); 559 Future addStream(var stream) => outbound._addStream(stream);
586 560 Future close() => outbound._close();
587 void _cancel() {
588 if (_subscription != null) {
589 StreamSubscription subscription = _subscription;
590 _subscription = null;
591 subscription.cancel();
592 }
593 }
594
595 bool _ignoreError(error)
596 => (error is SocketException || error is TlsException) &&
597 _outbound is HttpResponse;
598
599 _ensureController() {
600 if (_controller != null) return;
601 _controller = new StreamController(sync: true,
602 onPause: () => _subscription.pause(),
603 onResume: () => _subscription.resume(),
604 onListen: () => _subscription.resume(),
605 onCancel: _cancel);
606 _outbound._addStream(_controller.stream)
607 .then((_) {
608 _cancel();
609 _done();
610 _closeCompleter.complete(_outbound);
611 },
612 onError: (error, [StackTrace stackTrace]) {
613 _socketError = true;
614 if (_ignoreError(error)) {
615 _cancel();
616 _done();
617 _closeCompleter.complete(_outbound);
618 } else {
619 if (!_done(error)) {
620 _closeCompleter.completeError(error, stackTrace);
621 }
622 }
623 });
624 }
625
626 bool _done([error, StackTrace stackTrace]) {
627 if (_completer == null) return false;
628 if (error != null) {
629 _completer.completeError(error, stackTrace);
630 } else {
631 _completer.complete(_outbound);
632 }
633 _completer = null;
634 return true;
635 }
636
637 Future addStream(var stream) {
638 // If we saw a socket error subscribe and then cancel, to ignore any data
639 // on the stream.
640 if (_socketError) {
641 stream.listen(null).cancel();
642 return new Future.value(_outbound);
643 }
644 _completer = new Completer();
645 _subscription = stream.listen(
646 (data) => _controller.add(data),
647 onDone: _done,
648 onError: (e, s) => _controller.addError(e, s),
649 cancelOnError: true);
650 // Pause the first request.
651 if (_controller == null) _subscription.pause();
652 _ensureController();
653 return _completer.future;
654 }
655
656 Future close() {
657 Future closeOutbound() {
658 if (_socketError) return new Future.value(_outbound);
659 return _outbound._close()
660 .catchError((_) {}, test: _ignoreError)
661 .then((_) => _outbound);
662 }
663 if (_controller == null) return closeOutbound();
664 _controller.close();
665 return _closeCompleter.future.then((_) => closeOutbound());
666 }
667 } 561 }
668 562
669 563
670 class _BufferTransformerSink implements EventSink<List<int>> {
671 static const int MIN_CHUNK_SIZE = 4 * 1024;
672 static const int MAX_BUFFER_SIZE = 16 * 1024;
673
674 final BytesBuilder _builder = new BytesBuilder();
675 final EventSink<List<int>> _outSink;
676
677 _BufferTransformerSink(this._outSink);
678
679 void add(List<int> data) {
680 // TODO(ajohnsen): Use timeout?
681 if (data.length == 0) return;
682 if (data.length >= MIN_CHUNK_SIZE) {
683 flush();
684 _outSink.add(data);
685 } else {
686 _builder.add(data);
687 if (_builder.length >= MAX_BUFFER_SIZE) {
688 flush();
689 }
690 }
691 }
692
693 void addError(Object error, [StackTrace stackTrace]) {
694 _outSink.addError(error, stackTrace);
695 }
696
697 void close() {
698 flush();
699 _outSink.close();
700 }
701
702 void flush() {
703 if (_builder.length > 0) {
704 // takeBytes will clear the BytesBuilder.
705 _outSink.add(_builder.takeBytes());
706 }
707 }
708 }
709
710 class _BufferTransformer implements StreamTransformer<List<int>, List<int>> {
711 const _BufferTransformer();
712
713 Stream<List<int>> bind(Stream<List<int>> stream) {
714 return new Stream<List<int>>.eventTransformed(
715 stream,
716 (EventSink outSink) => new _BufferTransformerSink(outSink));
717 }
718 }
719
720
721 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> 564 class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
722 implements HttpResponse { 565 implements HttpResponse {
723 int _statusCode = 200; 566 int _statusCode = 200;
724 String _reasonPhrase; 567 String _reasonPhrase;
725 List<Cookie> _cookies; 568 List<Cookie> _cookies;
726 _HttpRequest _httpRequest; 569 _HttpRequest _httpRequest;
727 Duration _deadline; 570 Duration _deadline;
728 Timer _deadlineTimer; 571 Timer _deadlineTimer;
729 572
730 _HttpResponse(Uri uri, 573 _HttpResponse(Uri uri,
(...skipping 25 matching lines...) Expand all
756 if (_headersWritten) throw new StateError("Header already sent"); 599 if (_headersWritten) throw new StateError("Header already sent");
757 statusCode = status; 600 statusCode = status;
758 headers.set("location", location.toString()); 601 headers.set("location", location.toString());
759 return close(); 602 return close();
760 } 603 }
761 604
762 Future<Socket> detachSocket() { 605 Future<Socket> detachSocket() {
763 if (_headersWritten) throw new StateError("Headers already sent"); 606 if (_headersWritten) throw new StateError("Headers already sent");
764 deadline = null; // Be sure to stop any deadline. 607 deadline = null; // Be sure to stop any deadline.
765 var future = _httpRequest._httpConnection.detachSocket(); 608 var future = _httpRequest._httpConnection.detachSocket();
766 _writeHeaders(drainRequest: false).then((_) => close()); 609 _writeHeaders(drainRequest: false,
610 setOutgoing: false).then((_) => close());
767 // Close connection so the socket is 'free'. 611 // Close connection so the socket is 'free'.
768 close(); 612 close();
769 done.catchError((_) { 613 done.catchError((_) {
770 // Catch any error on done, as they automatically will be 614 // Catch any error on done, as they automatically will be
771 // propagated to the websocket. 615 // propagated to the websocket.
772 }); 616 });
773 return future; 617 return future;
774 } 618 }
775 619
776 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; 620 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
777 621
778 Duration get deadline => _deadline; 622 Duration get deadline => _deadline;
779 623
780 void set deadline(Duration d) { 624 void set deadline(Duration d) {
781 if (_deadlineTimer != null) _deadlineTimer.cancel(); 625 if (_deadlineTimer != null) _deadlineTimer.cancel();
782 _deadline = d; 626 _deadline = d;
783 627
784 if (_deadline == null) return; 628 if (_deadline == null) return;
785 _deadlineTimer = new Timer(_deadline, () { 629 _deadlineTimer = new Timer(_deadline, () {
630 _outgoing._socketError = true;
786 _outgoing.socket.destroy(); 631 _outgoing.socket.destroy();
787 }); 632 });
788 } 633 }
789 634
790 void _writeHeader() { 635 void _writeHeader() {
791 Uint8List buffer = _httpRequest._httpConnection._headersBuffer; 636 Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
792 int offset = 0; 637 int offset = 0;
793 638
794 void write(List<int> bytes) { 639 void write(List<int> bytes) {
795 int len = bytes.length; 640 int len = bytes.length;
796 for (int i = 0; i < len; i++) { 641 for (int i = 0; i < len; i++) {
797 buffer[offset + i] = bytes[i]; 642 buffer[offset + i] = bytes[i];
798 } 643 }
799 offset += len; 644 offset += len;
800 } 645 }
801 646
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
840 headers.add(HttpHeaders.SET_COOKIE, cookie); 685 headers.add(HttpHeaders.SET_COOKIE, cookie);
841 }); 686 });
842 } 687 }
843 688
844 headers._finalize(); 689 headers._finalize();
845 690
846 // Write headers. 691 // Write headers.
847 offset = headers._write(buffer, offset); 692 offset = headers._write(buffer, offset);
848 buffer[offset++] = _CharCode.CR; 693 buffer[offset++] = _CharCode.CR;
849 buffer[offset++] = _CharCode.LF; 694 buffer[offset++] = _CharCode.LF;
850 _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); 695 _outgoing.setHeader(buffer, offset);
851 } 696 }
852 697
853 String _findReasonPhrase(int statusCode) { 698 String _findReasonPhrase(int statusCode) {
854 if (_reasonPhrase != null) { 699 if (_reasonPhrase != null) {
855 return _reasonPhrase; 700 return _reasonPhrase;
856 } 701 }
857 702
858 switch (statusCode) { 703 switch (statusCode) {
859 case HttpStatus.CONTINUE: return "Continue"; 704 case HttpStatus.CONTINUE: return "Continue";
860 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; 705 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols";
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
1029 if (_httpClientConnection._proxyTunnel) { 874 if (_httpClientConnection._proxyTunnel) {
1030 return uriStartingFromPath(); 875 return uriStartingFromPath();
1031 } else { 876 } else {
1032 return uri.toString(); 877 return uri.toString();
1033 } 878 }
1034 } 879 }
1035 } 880 }
1036 } 881 }
1037 882
1038 void _writeHeader() { 883 void _writeHeader() {
1039 Uint8List buffer = _httpClientConnection._headersBuffer; 884 Uint8List buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
1040 int offset = 0; 885 int offset = 0;
1041 886
1042 void write(List<int> bytes) { 887 void write(List<int> bytes) {
1043 int len = bytes.length; 888 int len = bytes.length;
1044 for (int i = 0; i < len; i++) { 889 for (int i = 0; i < len; i++) {
1045 buffer[offset + i] = bytes[i]; 890 buffer[offset + i] = bytes[i];
1046 } 891 }
1047 offset += len; 892 offset += len;
1048 } 893 }
1049 894
(...skipping 17 matching lines...) Expand all
1067 } 912 }
1068 headers.add(HttpHeaders.COOKIE, sb.toString()); 913 headers.add(HttpHeaders.COOKIE, sb.toString());
1069 } 914 }
1070 915
1071 headers._finalize(); 916 headers._finalize();
1072 917
1073 // Write headers. 918 // Write headers.
1074 offset = headers._write(buffer, offset); 919 offset = headers._write(buffer, offset);
1075 buffer[offset++] = _CharCode.CR; 920 buffer[offset++] = _CharCode.CR;
1076 buffer[offset++] = _CharCode.LF; 921 buffer[offset++] = _CharCode.LF;
1077 _headersSink.add(new Uint8List.view(buffer.buffer, 0, offset)); 922 _outgoing.setHeader(buffer, offset);
1078 } 923 }
1079 } 924 }
1080 925
1081 926 // Used by _HttpOutgoing as a target of a chunked converter for gzip
1082 class _ChunkedTransformerSink implements EventSink<List<int>> { 927 // compression.
1083 928 class _HttpGZipSink extends ByteConversionSink {
1084 int _pendingFooter = 0; 929 final Function _consume;
1085 final EventSink<List<int>> _outSink; 930 _HttpGZipSink(this._consume);
1086 931
1087 _ChunkedTransformerSink(this._outSink); 932 void add(List<int> chunk) {
1088 933 _consume(chunk);
1089 void add(List<int> data) { 934 }
1090 _outSink.add(_chunkHeader(data.length)); 935
1091 if (data.length > 0) _outSink.add(data); 936 void addSlice(Uint8List chunk, int start, int end, bool isLast) {
1092 _pendingFooter = 2; 937 _consume(new Uint8List.view(chunk.buffer, start, end - start));
1093 } 938 }
1094 939
1095 void addError(Object error, [StackTrace stackTrace]) { 940 void close() {}
1096 _outSink.addError(error, stackTrace); 941 }
1097 } 942
1098 943
1099 void close() { 944 // The _HttpOutgoing handles all of the following:
1100 add(const []); 945 // - Buffering
1101 _outSink.close(); 946 // - GZip compressionm
947 // - Content-Length validation.
948 // - Errors.
949 //
950 // Most notable is the GZip compression, that uses a double-buffering system,
951 // one before gzip (_gzipBuffer) and one after (_buffer).
952 class _HttpOutgoing
953 implements StreamConsumer<List<int>> {
954 static const List<int> _footerAndChunk0Length =
955 const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
956 _CharCode.CR, _CharCode.LF];
957
958 static const List<int> _chunk0Length =
959 const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF];
960
961 final Completer _doneCompleter = new Completer();
962 final Socket socket;
963
964 Uint8List _buffer;
965 int _length = 0;
966
967 Future _closeFuture;
968
969 bool chunked = false;
970 int _pendingChunkedFooter = 0;
971
972 int contentLength;
973 int _bytesWritten = 0;
974
975 bool _gzip = false;
976 ByteConversionSink _gzipSink;
977 // _gzipAdd is set iff the sink is being added to. It's used to specify where
978 // gzipped data should be taken (sometimes a controller, sometimes a socket).
979 Function _gzipAdd;
980 Uint8List _gzipBuffer;
981 int _gzipBufferLength = 0;
982
983 bool _socketError = false;
984
985 _HttpOutboundMessage outbound;
986
987 bool _ignoreError(error)
988 => (error is SocketException || error is TlsException) &&
989 outbound is HttpResponse;
990
991 _HttpOutgoing(this.socket);
992
993 Future addStream(Stream<List<int>> stream, [Future pauseFuture]) {
994 if (_socketError) {
995 stream.listen(null).cancel();
996 return new Future.value(outbound);
997 }
998 var sub;
999 var controller;
1000 // Use new stream so we are able to pause (see below listen). The
1001 // alternative is to use stream.extand, but that won't give us a way of
1002 // pausing.
1003 controller = new StreamController(
1004 onPause: () => sub.pause(),
1005 onResume: () => sub.resume(),
1006 sync: true);
1007 sub = stream.listen(
1008 (data) {
1009 if (_socketError) return;
1010 if (data.length == 0) return;
1011 if (chunked) {
1012 if (_gzip) {
1013 _gzipAdd = controller.add;
1014 _addGZipChunk(data, _gzipSink.add);
1015 _gzipAdd = null;
1016 return;
1017 }
1018 _addChunk(_chunkHeader(data.length), controller.add);
1019 _pendingChunkedFooter = 2;
1020 } else {
1021 if (contentLength != null) {
1022 _bytesWritten += data.length;
1023 if (_bytesWritten > contentLength) {
1024 controller.addError(new HttpException(
1025 "Content size exceeds specified contentLength. "
1026 "$_bytesWritten bytes written while expected "
1027 "$contentLength. "
1028 "[${new String.fromCharCodes(data)}]"));
1029 return;
1030 }
1031 }
1032 }
1033 _addChunk(data, controller.add);
1034 },
1035 onError: controller.addError,
1036 onDone: controller.close,
1037 cancelOnError: true);
Lasse Reichstein Nielsen 2014/03/03 11:37:23 If you want to avoid extra closures, you can swap
Anders Johnsen 2014/03/03 19:43:14 I'm fine by doing it like this. Also, this will no
1038
1039 // If incoming is being drained, the pauseFuture is != null. Pause output
Lasse Reichstein Nielsen 2014/03/03 11:37:23 If -> while? Is this the only case where pauseFutu
Anders Johnsen 2014/03/03 19:43:14 Done. No, it can be it in many cases (one of the r
1040 // until it's drained.
1041 if (pauseFuture != null) {
1042 sub.pause(pauseFuture);
1043 }
1044
1045 return socket.addStream(controller.stream)
1046 .then((_) {
1047 return outbound;
1048 }, onError: (error) {
1049 // Be sure to close it in case of an error.
1050 if (_gzip) _gzipSink.close();
1051 _socketError = true;
1052 _doneCompleter.completeError(error);
1053 if (_ignoreError(error)) {
1054 return outbound;
1055 } else {
1056 throw error;
1057 }
1058 });
1059 }
1060
1061 Future close() {
1062 // If we was already closed, return that future.
Lasse Reichstein Nielsen 2014/03/03 11:37:23 was -> are
Anders Johnsen 2014/03/03 19:43:14 Done.
1063 if (_closeFuture != null) return _closeFuture;
1064 // If we earlier saw an error, return immidiate. The internals are already
Lasse Reichstein Nielsen 2014/03/03 11:37:23 if we have seen an error, return immediately. inte
Anders Johnsen 2014/03/03 19:43:14 Done.
1065 // closed correctly.
1066 if (_socketError) return new Future.value(outbound);
1067 // If contentLength was specified, validate it.
1068 if (contentLength != null) {
1069 if (_bytesWritten < contentLength) {
1070 var error = new HttpException(
1071 "Content size below specified contentLength. "
1072 " $_bytesWritten bytes written while expected "
Lasse Reichstein Nielsen 2014/03/03 11:37:23 while -> but.
Anders Johnsen 2014/03/03 19:43:14 Done.
1073 "$contentLength.");
1074 _doneCompleter.completeError(error);
1075 return _closeFuture = new Future.error(error);
1076 }
1077 }
1078 // In case of chunked encoding (and gzip), handle remaining gzip data and
1079 // append the 'footer' for chunked encoding.
1080 if (chunked) {
1081 if (_gzip) {
1082 _gzipAdd = socket.add;
1083 if (_gzipBufferLength > 0) {
1084 _gzipSink.add(new Uint8List.view(
1085 _gzipBuffer.buffer, 0, _gzipBufferLength));
1086 }
1087 _gzipBuffer = null;
1088 _gzipSink.close();
1089 _gzipAdd = null;
1090 }
1091 _addChunk(_chunkHeader(0), socket.add);
1092 }
1093 // Add any remaining data in the buffer.
1094 if (_length > 0) {
1095 socket.add(new Uint8List.view(_buffer.buffer, 0, _length));
1096 }
1097 // Clear references, for better GC.
1098 _buffer = null;
1099 // And finally flush it. As we support keep-alive, never close it from here.
1100 // Once the socket is flushed, we'll be able to reuse it (signaled by the
1101 // 'done' future).
1102 return _closeFuture = socket.flush()
1103 .then((_) {
1104 _doneCompleter.complete(socket);
1105 return outbound;
1106 }, onError: (error) {
1107 _doneCompleter.completeError(error);
1108 if (_ignoreError(error)) {
1109 return outbound;
1110 } else {
1111 throw error;
1112 }
1113 });
1114 }
1115
1116 Future get done => _doneCompleter.future;
1117
1118 void setHeader(List<int> data, int length) {
1119 assert(_length == 0);
1120 assert(data.length == _OUTGOING_BUFFER_SIZE);
1121 _buffer = data;
1122 _length = length;
1123 }
1124
1125 void set gzip(bool value) {
1126 _gzip = value;
1127 if (_gzip) {
1128 _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
1129 assert(_gzipSink == null);
1130 _gzipSink = new ZLibEncoder(gzip: true)
1131 .startChunkedConversion(
1132 new _HttpGZipSink((data) {
1133 // We are closing down prematurely, due to an error. Discard.
1134 if (_gzipAdd == null) return;
1135 _addChunk(_chunkHeader(data.length), _gzipAdd);
1136 _pendingChunkedFooter = 2;
1137 _addChunk(data, _gzipAdd);
1138 }));
1139 }
1140 }
1141
1142 void _addGZipChunk(chunk, void add(List<int> data)) {
1143 if (chunk.length > _gzipBuffer.length - _gzipBufferLength) {
1144 add(new Uint8List.view(
1145 _gzipBuffer.buffer, 0, _gzipBufferLength));
1146 _gzipBuffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
1147 _gzipBufferLength = 0;
1148 }
1149 if (chunk.length > _OUTGOING_BUFFER_SIZE) {
1150 add(chunk);
1151 } else {
1152 _gzipBuffer.setRange(_gzipBufferLength,
1153 _gzipBufferLength + chunk.length,
1154 chunk);
1155 _gzipBufferLength += chunk.length;
1156 }
1157 }
1158
1159 void _addChunk(chunk, void add(List<int> data)) {
1160 if (chunk.length > _buffer.length - _length) {
1161 add(new Uint8List.view(_buffer.buffer, 0, _length));
1162 _buffer = new Uint8List(_OUTGOING_BUFFER_SIZE);
1163 _length = 0;
1164 }
1165 if (chunk.length > _OUTGOING_BUFFER_SIZE) {
1166 add(chunk);
1167 } else {
1168 _buffer.setRange(_length, _length + chunk.length, chunk);
1169 _length += chunk.length;
1170 }
1102 } 1171 }
1103 1172
1104 List<int> _chunkHeader(int length) { 1173 List<int> _chunkHeader(int length) {
1105 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 1174 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37,
1106 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; 1175 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
1107 if (length == 0) { 1176 if (length == 0) {
1108 if (_pendingFooter == 2) return _footerAndChunk0Length; 1177 if (_pendingChunkedFooter == 2) return _footerAndChunk0Length;
1109 return _chunk0Length; 1178 return _chunk0Length;
1110 } 1179 }
1111 int size = _pendingFooter; 1180 int size = _pendingChunkedFooter;
1112 int len = length; 1181 int len = length;
1113 // Compute a fast integer version of (log(length + 1) / log(16)).ceil(). 1182 // Compute a fast integer version of (log(length + 1) / log(16)).ceil().
1114 while (len > 0) { 1183 while (len > 0) {
1115 size++; 1184 size++;
1116 len >>= 4; 1185 len >>= 4;
1117 } 1186 }
1118 var footerAndHeader = new Uint8List(size + 2); 1187 var footerAndHeader = new Uint8List(size + 2);
1119 if (_pendingFooter == 2) { 1188 if (_pendingChunkedFooter == 2) {
1120 footerAndHeader[0] = _CharCode.CR; 1189 footerAndHeader[0] = _CharCode.CR;
1121 footerAndHeader[1] = _CharCode.LF; 1190 footerAndHeader[1] = _CharCode.LF;
1122 } 1191 }
1123 int index = size; 1192 int index = size;
1124 while (index > _pendingFooter) { 1193 while (index > _pendingChunkedFooter) {
1125 footerAndHeader[--index] = hexDigits[length & 15]; 1194 footerAndHeader[--index] = hexDigits[length & 15];
1126 length = length >> 4; 1195 length = length >> 4;
1127 } 1196 }
1128 footerAndHeader[size + 0] = _CharCode.CR; 1197 footerAndHeader[size + 0] = _CharCode.CR;
1129 footerAndHeader[size + 1] = _CharCode.LF; 1198 footerAndHeader[size + 1] = _CharCode.LF;
1130 return footerAndHeader; 1199 return footerAndHeader;
1131 } 1200 }
1132
1133 static List<int> get _footerAndChunk0Length => new Uint8List.fromList(
1134 const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
1135 _CharCode.CR, _CharCode.LF]);
1136
1137 static List<int> get _chunk0Length => new Uint8List.fromList(
1138 const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]);
1139 }
1140
1141 // Transformer that transforms data to HTTP Chunked Encoding.
1142 class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
1143 const _ChunkedTransformer();
1144
1145 Stream<List<int>> bind(Stream<List<int>> stream) {
1146 return new Stream<List<int>>.eventTransformed(
1147 stream,
1148 (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink));
1149 }
1150 }
1151
1152 // Transformer that validates the content length.
1153 class _ContentLengthValidator
1154 implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> {
1155 final int expectedContentLength;
1156 final Uri uri;
1157 int _bytesWritten = 0;
1158
1159 EventSink<List<int>> _outSink;
1160
1161 _ContentLengthValidator(this.expectedContentLength, this.uri);
1162
1163 Stream<List<int>> bind(Stream<List<int>> stream) {
1164 return new Stream.eventTransformed(
1165 stream,
1166 (EventSink sink) {
1167 if (_outSink != null) {
1168 throw new StateError("Validator transformer already used");
1169 }
1170 _outSink = sink;
1171 return this;
1172 });
1173 }
1174
1175 void add(List<int> data) {
1176 _bytesWritten += data.length;
1177 if (_bytesWritten > expectedContentLength) {
1178 _outSink.addError(new HttpException(
1179 "Content size exceeds specified contentLength. "
1180 "$_bytesWritten bytes written while expected "
1181 "$expectedContentLength. "
1182 "[${new String.fromCharCodes(data)}]",
1183 uri: uri));
1184 _outSink.close();
1185 } else {
1186 _outSink.add(data);
1187 }
1188 }
1189
1190 void addError(Object error, [StackTrace stackTrace]) {
1191 _outSink.addError(error, stackTrace);
1192 }
1193
1194 void close() {
1195 if (_bytesWritten < expectedContentLength) {
1196 _outSink.addError(new HttpException(
1197 "Content size below specified contentLength. "
1198 " $_bytesWritten bytes written while expected "
1199 "$expectedContentLength.",
1200 uri: uri));
1201 }
1202 _outSink.close();
1203 }
1204 }
1205
1206
1207 // Extends StreamConsumer as this is an internal type, only used to pipe to.
1208 class _HttpOutgoing implements StreamConsumer<List<int>> {
1209 final Completer _doneCompleter = new Completer();
1210 final Socket socket;
1211
1212 _HttpOutgoing(this.socket);
1213
1214 Future addStream(Stream<List<int>> stream) {
1215 return socket.addStream(stream)
1216 .catchError((error) {
1217 _doneCompleter.completeError(error);
1218 throw error;
1219 });
1220 }
1221
1222 Future close() {
1223 _doneCompleter.complete(socket);
1224 return new Future.value();
1225 }
1226
1227 Future get done => _doneCompleter.future;
1228 } 1201 }
1229 1202
1230 class _HttpClientConnection { 1203 class _HttpClientConnection {
1231 final String key; 1204 final String key;
1232 final Socket _socket; 1205 final Socket _socket;
1233 final bool _proxyTunnel; 1206 final bool _proxyTunnel;
1234 final _HttpParser _httpParser; 1207 final _HttpParser _httpParser;
1235 StreamSubscription _subscription; 1208 StreamSubscription _subscription;
1236 final _HttpClient _httpClient; 1209 final _HttpClient _httpClient;
1237 bool _dispose = false; 1210 bool _dispose = false;
1238 Timer _idleTimer; 1211 Timer _idleTimer;
1239 bool closed = false; 1212 bool closed = false;
1240 Uri _currentUri; 1213 Uri _currentUri;
1241 final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE);
1242 1214
1243 Completer<_HttpIncoming> _nextResponseCompleter; 1215 Completer<_HttpIncoming> _nextResponseCompleter;
1244 Future _streamFuture; 1216 Future _streamFuture;
1245 1217
1246 _HttpClientConnection(this.key, this._socket, this._httpClient, 1218 _HttpClientConnection(this.key, this._socket, this._httpClient,
1247 [this._proxyTunnel = false]) 1219 [this._proxyTunnel = false])
1248 : _httpParser = new _HttpParser.responseParser() { 1220 : _httpParser = new _HttpParser.responseParser() {
1249 _httpParser.listenToStream(_socket); 1221 _httpParser.listenToStream(_socket);
1250 1222
1251 // Set up handlers on the parser here, so we are sure to get 'onDone' from 1223 // Set up handlers on the parser here, so we are sure to get 'onDone' from
(...skipping 642 matching lines...) Expand 10 before | Expand all | Expand 10 after
1894 static const _IDLE = 1; 1866 static const _IDLE = 1;
1895 static const _CLOSING = 2; 1867 static const _CLOSING = 2;
1896 static const _DETACHED = 3; 1868 static const _DETACHED = 3;
1897 1869
1898 final Socket _socket; 1870 final Socket _socket;
1899 final _HttpServer _httpServer; 1871 final _HttpServer _httpServer;
1900 final _HttpParser _httpParser; 1872 final _HttpParser _httpParser;
1901 int _state = _IDLE; 1873 int _state = _IDLE;
1902 StreamSubscription _subscription; 1874 StreamSubscription _subscription;
1903 Timer _idleTimer; 1875 Timer _idleTimer;
1904 final Uint8List _headersBuffer = new Uint8List(_HEADERS_BUFFER_SIZE);
1905 bool _idleMark = false; 1876 bool _idleMark = false;
1906 Future _streamFuture; 1877 Future _streamFuture;
1907 1878
1908 _HttpConnection(this._socket, this._httpServer) 1879 _HttpConnection(this._socket, this._httpServer)
1909 : _httpParser = new _HttpParser.requestParser() { 1880 : _httpParser = new _HttpParser.requestParser() {
1910 _httpParser.listenToStream(_socket); 1881 _httpParser.listenToStream(_socket);
1911 _subscription = _httpParser.listen( 1882 _subscription = _httpParser.listen(
1912 (incoming) { 1883 (incoming) {
1913 _httpServer._markActive(this); 1884 _httpServer._markActive(this);
1914 // If the incoming was closed, close the connection. 1885 // If the incoming was closed, close the connection.
(...skipping 682 matching lines...) Expand 10 before | Expand all | Expand 10 after
2597 const _RedirectInfo(this.statusCode, this.method, this.location); 2568 const _RedirectInfo(this.statusCode, this.method, this.location);
2598 } 2569 }
2599 2570
2600 String _getHttpVersion() { 2571 String _getHttpVersion() {
2601 var version = Platform.version; 2572 var version = Platform.version;
2602 // Only include major and minor version numbers. 2573 // Only include major and minor version numbers.
2603 int index = version.indexOf('.', version.indexOf('.') + 1); 2574 int index = version.indexOf('.', version.indexOf('.') + 1);
2604 version = version.substring(0, index); 2575 version = version.substring(0, index);
2605 return 'Dart/$version (dart:io)'; 2576 return 'Dart/$version (dart:io)';
2606 } 2577 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698