Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(153)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/convert/converter.dart ('k') | sdk/lib/io/websocket_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 495 matching lines...) Expand 10 before | Expand all | Expand 10 after
506 } 506 }
507 507
508 Future _addStream(Stream<List<int>> stream) { 508 Future _addStream(Stream<List<int>> stream) {
509 return _writeHeaders() 509 return _writeHeaders()
510 .then((_) { 510 .then((_) {
511 int contentLength = headers.contentLength; 511 int contentLength = headers.contentLength;
512 if (_ignoreBody) { 512 if (_ignoreBody) {
513 stream.drain().catchError((_) {}); 513 stream.drain().catchError((_) {});
514 return _headersSink.close(); 514 return _headersSink.close();
515 } 515 }
516 stream = stream.transform(new _BufferTransformer()); 516 stream = stream.transform(const _BufferTransformer());
517 if (headers.chunkedTransferEncoding) { 517 if (headers.chunkedTransferEncoding) {
518 if (_asGZip) { 518 if (_asGZip) {
519 stream = stream.transform(GZIP.encoder); 519 stream = stream.transform(GZIP.encoder);
520 } 520 }
521 stream = stream.transform(new _ChunkedTransformer()); 521 stream = stream.transform(const _ChunkedTransformer());
522 } else if (contentLength >= 0) { 522 } else if (contentLength >= 0) {
523 stream = stream.transform( 523 stream = stream.transform(
524 new _ContentLengthValidator(contentLength, _uri)); 524 new _ContentLengthValidator(contentLength, _uri));
525 } 525 }
526 return _headersSink.addStream(stream); 526 return _headersSink.addStream(stream);
527 }); 527 });
528 } 528 }
529 529
530 Future _close() { 530 Future _close() {
531 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and 531 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
639 .catchError((_) {}, test: _ignoreError) 639 .catchError((_) {}, test: _ignoreError)
640 .then((_) => _outbound); 640 .then((_) => _outbound);
641 } 641 }
642 if (_controller == null) return closeOutbound(); 642 if (_controller == null) return closeOutbound();
643 _controller.close(); 643 _controller.close();
644 return _closeCompleter.future.then((_) => closeOutbound()); 644 return _closeCompleter.future.then((_) => closeOutbound());
645 } 645 }
646 } 646 }
647 647
648 648
649 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { 649 class _BufferTransformerSink implements EventSink<List<int>> {
650 static const int MIN_CHUNK_SIZE = 4 * 1024; 650 static const int MIN_CHUNK_SIZE = 4 * 1024;
651 static const int MAX_BUFFER_SIZE = 16 * 1024; 651 static const int MAX_BUFFER_SIZE = 16 * 1024;
652 652
653 final BytesBuilder _builder = new BytesBuilder(); 653 final BytesBuilder _builder = new BytesBuilder();
654 final EventSink<List<int>> _outSink;
654 655
655 void handleData(List<int> data, EventSink<List<int>> sink) { 656 _BufferTransformerSink(this._outSink);
657
658 void add(List<int> data) {
656 // TODO(ajohnsen): Use timeout? 659 // TODO(ajohnsen): Use timeout?
657 if (data.length == 0) return; 660 if (data.length == 0) return;
658 if (data.length >= MIN_CHUNK_SIZE) { 661 if (data.length >= MIN_CHUNK_SIZE) {
659 flush(sink); 662 flush();
660 sink.add(data); 663 _outSink.add(data);
661 } else { 664 } else {
662 _builder.add(data); 665 _builder.add(data);
663 if (_builder.length >= MAX_BUFFER_SIZE) { 666 if (_builder.length >= MAX_BUFFER_SIZE) {
664 flush(sink); 667 flush();
665 } 668 }
666 } 669 }
667 } 670 }
668 671
669 void handleDone(EventSink<List<int>> sink) { 672 void addError(Object error, [StackTrace stackTrace]) {
670 flush(sink); 673 _outSink.addError(error, stackTrace);
671 sink.close();
672 } 674 }
673 675
674 void flush(EventSink<List<int>> sink) { 676 void close() {
677 flush();
678 _outSink.close();
679 }
680
681 void flush() {
675 if (_builder.length > 0) { 682 if (_builder.length > 0) {
676 // takeBytes will clear the BytesBuilder. 683 // takeBytes will clear the BytesBuilder.
677 sink.add(_builder.takeBytes()); 684 _outSink.add(_builder.takeBytes());
678 } 685 }
679 } 686 }
680 } 687 }
681 688
689 class _BufferTransformer implements StreamTransformer<List<int>, List<int>> {
690 const _BufferTransformer();
691
692 Stream<List<int>> bind(Stream<List<int>> stream) {
693 return new Stream<List<int>>.eventTransformed(
694 stream,
695 (EventSink outSink) => new _BufferTransformerSink(outSink));
696 }
697 }
698
682 699
683 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> 700 class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
684 implements HttpResponse { 701 implements HttpResponse {
685 int statusCode = 200; 702 int statusCode = 200;
686 String _reasonPhrase; 703 String _reasonPhrase;
687 List<Cookie> _cookies; 704 List<Cookie> _cookies;
688 _HttpRequest _httpRequest; 705 _HttpRequest _httpRequest;
689 Duration _deadline; 706 Duration _deadline;
690 Timer _deadlineTimer; 707 Timer _deadlineTimer;
691 708
(...skipping 326 matching lines...) Expand 10 before | Expand all | Expand 10 after
1018 headers._finalize(); 1035 headers._finalize();
1019 1036
1020 // Write headers. 1037 // Write headers.
1021 headers._write(builder); 1038 headers._write(builder);
1022 writeCRLF(); 1039 writeCRLF();
1023 _headersSink.add(builder.takeBytes()); 1040 _headersSink.add(builder.takeBytes());
1024 } 1041 }
1025 } 1042 }
1026 1043
1027 1044
1028 // Transformer that transforms data to HTTP Chunked Encoding. 1045 class _ChunkedTransformerSink implements EventSink<List<int>> {
1029 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { 1046
1030 int _pendingFooter = 0; 1047 int _pendingFooter = 0;
1048 final EventSink<List<int>> _outSink;
1031 1049
1032 void handleData(List<int> data, EventSink<List<int>> sink) { 1050 _ChunkedTransformerSink(this._outSink);
1033 sink.add(_chunkHeader(data.length)); 1051
1034 if (data.length > 0) sink.add(data); 1052 void add(List<int> data) {
1053 _outSink.add(_chunkHeader(data.length));
1054 if (data.length > 0) _outSink.add(data);
1035 _pendingFooter = 2; 1055 _pendingFooter = 2;
1036 } 1056 }
1037 1057
1038 void handleDone(EventSink<List<int>> sink) { 1058 void addError(Object error, [StackTrace stackTrace]) {
1039 handleData(const [], sink); 1059 _outSink.addError(error, stackTrace);
1040 sink.close(); 1060 }
1061
1062 void close() {
1063 add(const []);
1064 _outSink.close();
1041 } 1065 }
1042 1066
1043 List<int> _chunkHeader(int length) { 1067 List<int> _chunkHeader(int length) {
1044 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 1068 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37,
1045 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; 1069 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
1046 if (length == 0) { 1070 if (length == 0) {
1047 if (_pendingFooter == 2) return _footerAndChunk0Length; 1071 if (_pendingFooter == 2) return _footerAndChunk0Length;
1048 return _chunk0Length; 1072 return _chunk0Length;
1049 } 1073 }
1050 int size = _pendingFooter; 1074 int size = _pendingFooter;
(...skipping 19 matching lines...) Expand all
1070 } 1094 }
1071 1095
1072 static List<int> get _footerAndChunk0Length => new Uint8List.fromList( 1096 static List<int> get _footerAndChunk0Length => new Uint8List.fromList(
1073 const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF, 1097 const [_CharCode.CR, _CharCode.LF, 0x30, _CharCode.CR, _CharCode.LF,
1074 _CharCode.CR, _CharCode.LF]); 1098 _CharCode.CR, _CharCode.LF]);
1075 1099
1076 static List<int> get _chunk0Length => new Uint8List.fromList( 1100 static List<int> get _chunk0Length => new Uint8List.fromList(
1077 const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]); 1101 const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]);
1078 } 1102 }
1079 1103
1104 // Transformer that transforms data to HTTP Chunked Encoding.
1105 class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
1106 const _ChunkedTransformer();
1107
1108 Stream<List<int>> bind(Stream<List<int>> stream) {
1109 return new Stream<List<int>>.eventTransformed(
1110 stream,
1111 (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink));
1112 }
1113 }
1080 1114
1081 // Transformer that validates the content length. 1115 // Transformer that validates the content length.
1082 class _ContentLengthValidator 1116 class _ContentLengthValidator
1083 extends StreamEventTransformer<List<int>, List<int>> { 1117 implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> {
1084 final int expectedContentLength; 1118 final int expectedContentLength;
1085 final Uri uri; 1119 final Uri uri;
1086 int _bytesWritten = 0; 1120 int _bytesWritten = 0;
1087 1121
1122 EventSink<List<int>> _outSink;
1123
1088 _ContentLengthValidator(int this.expectedContentLength, Uri this.uri); 1124 _ContentLengthValidator(int this.expectedContentLength, Uri this.uri);
1089 1125
1090 void handleData(List<int> data, EventSink<List<int>> sink) { 1126 Stream<List<int>> bind(Stream<List<int>> stream) {
1127 return new Stream.eventTransformed(
1128 stream,
1129 (EventSink sink) {
1130 if (_outSink != null) {
1131 throw new StateError("Validator transformer already used");
1132 }
1133 _outSink = sink;
1134 return this;
1135 });
1136 }
1137
1138 void add(List<int> data) {
1091 _bytesWritten += data.length; 1139 _bytesWritten += data.length;
1092 if (_bytesWritten > expectedContentLength) { 1140 if (_bytesWritten > expectedContentLength) {
1093 sink.addError(new HttpException( 1141 _outSink.addError(new HttpException(
1094 "Content size exceeds specified contentLength. " 1142 "Content size exceeds specified contentLength. "
1095 "$_bytesWritten bytes written while expected " 1143 "$_bytesWritten bytes written while expected "
1096 "$expectedContentLength. " 1144 "$expectedContentLength. "
1097 "[${new String.fromCharCodes(data)}]", 1145 "[${new String.fromCharCodes(data)}]",
1098 uri: uri)); 1146 uri: uri));
1099 sink.close(); 1147 _outSink.close();
1100 } else { 1148 } else {
1101 sink.add(data); 1149 _outSink.add(data);
1102 } 1150 }
1103 } 1151 }
1104 1152
1105 void handleDone(EventSink<List<int>> sink) { 1153 void addError(Object error, [StackTrace stackTrace]) {
1154 _outSink.addError(error, stackTrace);
1155 }
1156
1157 void close() {
1106 if (_bytesWritten < expectedContentLength) { 1158 if (_bytesWritten < expectedContentLength) {
1107 sink.addError(new HttpException( 1159 _outSink.addError(new HttpException(
1108 "Content size below specified contentLength. " 1160 "Content size below specified contentLength. "
1109 " $_bytesWritten bytes written while expected " 1161 " $_bytesWritten bytes written while expected "
1110 "$expectedContentLength.", 1162 "$expectedContentLength.",
1111 uri: uri)); 1163 uri: uri));
1112 } 1164 }
1113 sink.close(); 1165 _outSink.close();
1114 } 1166 }
1115 } 1167 }
1116 1168
1117 1169
1118 // Extends StreamConsumer as this is an internal type, only used to pipe to. 1170 // Extends StreamConsumer as this is an internal type, only used to pipe to.
1119 class _HttpOutgoing implements StreamConsumer<List<int>> { 1171 class _HttpOutgoing implements StreamConsumer<List<int>> {
1120 final Completer _doneCompleter = new Completer(); 1172 final Completer _doneCompleter = new Completer();
1121 final Socket socket; 1173 final Socket socket;
1122 1174
1123 _HttpOutgoing(Socket this.socket); 1175 _HttpOutgoing(Socket this.socket);
(...skipping 1318 matching lines...) Expand 10 before | Expand all | Expand 10 after
2442 final Uri location; 2494 final Uri location;
2443 } 2495 }
2444 2496
2445 String _getHttpVersion() { 2497 String _getHttpVersion() {
2446 var version = Platform.version; 2498 var version = Platform.version;
2447 // Only include major and minor version numbers. 2499 // Only include major and minor version numbers.
2448 int index = version.indexOf('.', version.indexOf('.') + 1); 2500 int index = version.indexOf('.', version.indexOf('.') + 1);
2449 version = version.substring(0, index); 2501 version = version.substring(0, index);
2450 return 'Dart/$version (dart:io)'; 2502 return 'Dart/$version (dart:io)';
2451 } 2503 }
OLDNEW
« no previous file with comments | « sdk/lib/convert/converter.dart ('k') | sdk/lib/io/websocket_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698