Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Side by Side Diff: sdk/lib/io/http_parser.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/io/http_session.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Global constants. 7 // Global constants.
8 class _Const { 8 class _Const {
9 // Bytes for "HTTP". 9 // Bytes for "HTTP".
10 static const HTTP = const [72, 84, 84, 80]; 10 static const HTTP = const [72, 84, 84, 80];
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 static const int CHUNK_SIZE_STARTING_CR = 17; 67 static const int CHUNK_SIZE_STARTING_CR = 17;
68 static const int CHUNK_SIZE_STARTING_LF = 18; 68 static const int CHUNK_SIZE_STARTING_LF = 18;
69 static const int CHUNK_SIZE = 19; 69 static const int CHUNK_SIZE = 19;
70 static const int CHUNK_SIZE_EXTENSION = 20; 70 static const int CHUNK_SIZE_EXTENSION = 20;
71 static const int CHUNK_SIZE_ENDING = 21; 71 static const int CHUNK_SIZE_ENDING = 21;
72 static const int CHUNKED_BODY_DONE_CR = 22; 72 static const int CHUNKED_BODY_DONE_CR = 22;
73 static const int CHUNKED_BODY_DONE_LF = 23; 73 static const int CHUNKED_BODY_DONE_LF = 23;
74 static const int BODY = 24; 74 static const int BODY = 24;
75 static const int CLOSED = 25; 75 static const int CLOSED = 25;
76 static const int UPGRADED = 26; 76 static const int UPGRADED = 26;
77 static const int CANCELED = 27; 77 static const int FAILURE = 27;
78 static const int FAILURE = 28;
79 78
80 static const int FIRST_BODY_STATE = CHUNK_SIZE_STARTING_CR; 79 static const int FIRST_BODY_STATE = CHUNK_SIZE_STARTING_CR;
81 static const int FIRST_PARSE_STOP_STATE = CLOSED;
82 } 80 }
83 81
84 // HTTP version of the request or response being parsed. 82 // HTTP version of the request or response being parsed.
85 class _HttpVersion { 83 class _HttpVersion {
86 static const int UNDETERMINED = 0; 84 static const int UNDETERMINED = 0;
87 static const int HTTP10 = 1; 85 static const int HTTP10 = 1;
88 static const int HTTP11 = 2; 86 static const int HTTP11 = 2;
89 } 87 }
90 88
91 // States of the HTTP parser state machine. 89 // States of the HTTP parser state machine.
92 class _MessageType { 90 class _MessageType {
93 static const int UNDETERMINED = 0; 91 static const int UNDETERMINED = 0;
94 static const int REQUEST = 1; 92 static const int REQUEST = 1;
95 static const int RESPONSE = 0; 93 static const int RESPONSE = 0;
96 } 94 }
97 95
96 class _HttpDetachedIncoming extends Stream<List<int>> {
97 StreamController<List<int>> controller;
98 final StreamSubscription subscription;
99
100 List<int> carryOverData;
101 bool paused;
102
103 Completer resumeCompleter;
104
105 _HttpDetachedIncoming(StreamSubscription this.subscription,
106 List<int> this.carryOverData,
107 Completer oldResumeCompleter) {
108 controller = new StreamController<List<int>>(
109 onSubscriptionStateChange: onSubscriptionStateChange,
110 onPauseStateChange: onPauseStateChange);
111 pause();
112 if (oldResumeCompleter != null) oldResumeCompleter.complete();
113 subscription.resume();
114 subscription.onData(controller.add);
115 subscription.onDone(controller.close);
116 subscription.onError(controller.signalError);
117 }
118
119 StreamSubscription<List<int>> listen(void onData(List<int> event),
120 {void onError(AsyncError error),
121 void onDone(),
122 bool unsubscribeOnError}) {
123 return controller.stream.listen(
124 onData,
125 onError: onError,
126 onDone: onDone,
127 unsubscribeOnError: unsubscribeOnError);
128 }
129
130 void resume() {
131 paused = false;
132 if (carryOverData != null) {
133 var data = carryOverData;
134 carryOverData = null;
135 controller.add(data);
136 // If the consumer pauses again after the carry-over data, we'll not
137 // continue our subscriber until the next resume.
138 if (paused) return;
139 }
140 if (resumeCompleter != null) {
141 resumeCompleter.complete();
142 resumeCompleter = null;
143 }
144 }
145
146 void pause() {
147 paused = true;
148 if (resumeCompleter == null) {
149 resumeCompleter = new Completer();
150 subscription.pause(resumeCompleter.future);
151 }
152 }
153
154 void onPauseStateChange() {
155 if (controller.isPaused) {
156 pause();
157 } else {
158 resume();
159 }
160 }
161
162 void onSubscriptionStateChange() {
163 if (controller.hasSubscribers) {
164 resume();
165 } else {
166 subscription.cancel();
167 }
168 }
169 }
170
98 171
99 /** 172 /**
100 * HTTP parser which parses the HTTP stream as data is supplied 173 * HTTP parser which parses the data stream given to [consume].
101 * through the [:streamData:] and [:streamDone:] methods. As the
102 * data is parsed the following callbacks are called:
103 * 174 *
104 * [:requestStart:] 175 * If an HTTP parser error occours, the parser will signal an error to either
105 * [:responseStart:] 176 * the current _HttpIncoming or the _parser itself.
106 * [:dataReceived:]
107 * [:dataEnd:]
108 * [:closed:]
109 * [:error:]
110 *
111 * If an HTTP parser error occours it is possible to get an exception
112 * thrown from the [:streamData:] and [:streamDone:] methods if
113 * the error callback is not set.
114 * 177 *
115 * The connection upgrades (e.g. switching from HTTP/1.1 to the 178 * The connection upgrades (e.g. switching from HTTP/1.1 to the
116 * WebSocket protocol) is handled in a special way. If connection 179 * WebSocket protocol) is handled in a special way. If connection
117 * upgrade is specified in the headers, then on the callback to 180 * upgrade is specified in the headers, then on the callback to
118 * [:responseStart:] the [:upgrade:] property on the [:HttpParser:] 181 * [:responseStart:] the [:upgrade:] property on the [:HttpParser:]
119 * object will be [:true:] indicating that from now on the protocol is 182 * object will be [:true:] indicating that from now on the protocol is
120 * not HTTP anymore and no more callbacks will happen, that is 183 * not HTTP anymore and no more callbacks will happen, that is
121 * [:dataReceived:] and [:dataEnd:] are not called in this case as 184 * [:dataReceived:] and [:dataEnd:] are not called in this case as
122 * there is no more HTTP data. After the upgrade the method 185 * there is no more HTTP data. After the upgrade the method
123 * [:readUnparsedData:] can be used to read any remaining bytes in the 186 * [:readUnparsedData:] can be used to read any remaining bytes in the
124 * HTTP parser which are part of the protocol the connection is 187 * HTTP parser which are part of the protocol the connection is
125 * upgrading to. These bytes cannot be processed by the HTTP parser 188 * upgrading to. These bytes cannot be processed by the HTTP parser
126 * and should be handled according to whatever protocol is being 189 * and should be handled according to whatever protocol is being
127 * upgraded to. 190 * upgraded to.
128 */ 191 */
129 class _HttpParser { 192 class _HttpParser
130 _HttpParser.requestParser() { 193 extends Stream<_HttpIncoming>
131 _requestParser = true; 194 implements StreamConsumer<List<int>, _HttpParser> {
195
196 factory _HttpParser.requestParser() {
197 return new _HttpParser._(true);
198 }
199
200 factory _HttpParser.responseParser() {
201 return new _HttpParser._(false);
202 }
203
204 _HttpParser._(this._requestParser) {
205 _controller = new StreamController<_HttpIncoming>(
206 onSubscriptionStateChange: _updateParsePauseState,
207 onPauseStateChange: _updateParsePauseState);
132 _reset(); 208 _reset();
133 } 209 }
134 _HttpParser.responseParser() { 210
135 _requestParser = false; 211
136 _reset(); 212 StreamSubscription<_HttpIncoming> listen(void onData(List<int> event),
213 {void onError(AsyncError error),
214 void onDone(),
215 bool unsubscribeOnError}) {
216 return _controller.stream.listen(onData,
217 onError: onError,
218 onDone: onDone,
219 unsubscribeOnError: unsubscribeOnError);
220 }
221
222 Future<_HttpParser> consume(Stream<List<int>> stream) {
223 // Listen to the stream and handle data accordingly. When a
224 // _HttpIncoming is created, _dataPause, _dataResume, _dataDone is
225 // given to provide a way of controlling the parser.
226 // TODO(ajohnsen): Remove _dataPause, _dataResume and _dataDone and clean up
227 // how the _HttpIncoming signals the parser.
228 var completer = new Completer();
229 _socketSubscription = stream.listen(
230 _onData,
231 onError: _onError,
232 onDone: () {
233 _onDone();
234 completer.complete(this);
235 });
236 return completer.future;
137 } 237 }
138 238
139 // From RFC 2616. 239 // From RFC 2616.
140 // generic-message = start-line 240 // generic-message = start-line
141 // *(message-header CRLF) 241 // *(message-header CRLF)
142 // CRLF 242 // CRLF
143 // [ message-body ] 243 // [ message-body ]
144 // start-line = Request-Line | Status-Line 244 // start-line = Request-Line | Status-Line
145 // Request-Line = Method SP Request-URI SP HTTP-Version CRLF 245 // Request-Line = Method SP Request-URI SP HTTP-Version CRLF
146 // Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF 246 // Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF
147 // message-header = field-name ":" [ field-value ] 247 // message-header = field-name ":" [ field-value ]
148 void _parse() { 248 void _parse() {
249 assert(!_parserCalled);
250 _parserCalled = true;
149 try { 251 try {
150 if (_state == _State.CLOSED) { 252 if (_state == _State.CLOSED) {
151 throw new HttpParserException("Data on closed connection"); 253 throw new HttpParserException("Data on closed connection");
152 } 254 }
153 if (_state == _State.UPGRADED) {
154 throw new HttpParserException("Data on upgraded connection");
155 }
156 if (_state == _State.FAILURE) { 255 if (_state == _State.FAILURE) {
157 throw new HttpParserException("Data on failed connection"); 256 throw new HttpParserException("Data on failed connection");
158 } 257 }
159 if (_state == _State.CANCELED) {
160 throw new HttpParserException("Data on canceled connection");
161 }
162 while (_buffer != null && 258 while (_buffer != null &&
163 _index < _lastIndex && 259 _index < _buffer.length &&
164 _state <= _State.FIRST_PARSE_STOP_STATE) { 260 _state != _State.FAILURE &&
261 _state != _State.UPGRADED) {
262 if (_paused) {
263 _parserCalled = false;
264 return;
265 }
165 int byte = _buffer[_index++]; 266 int byte = _buffer[_index++];
166 switch (_state) { 267 switch (_state) {
167 case _State.START: 268 case _State.START:
168 if (byte == _Const.HTTP[0]) { 269 if (byte == _Const.HTTP[0]) {
169 // Start parsing method or HTTP version. 270 // Start parsing method or HTTP version.
170 _httpVersionIndex = 1; 271 _httpVersionIndex = 1;
171 _state = _State.METHOD_OR_RESPONSE_HTTP_VERSION; 272 _state = _State.METHOD_OR_RESPONSE_HTTP_VERSION;
172 } else { 273 } else {
173 // Start parsing method. 274 // Start parsing method.
174 if (!_isTokenChar(byte)) { 275 if (!_isTokenChar(byte)) {
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after
321 if (byte == _CharCode.CR || byte == _CharCode.LF) { 422 if (byte == _CharCode.CR || byte == _CharCode.LF) {
322 throw new HttpParserException("Invalid response reason phrase"); 423 throw new HttpParserException("Invalid response reason phrase");
323 } 424 }
324 _uri_or_reason_phrase.add(byte); 425 _uri_or_reason_phrase.add(byte);
325 } 426 }
326 break; 427 break;
327 428
328 case _State.RESPONSE_LINE_ENDING: 429 case _State.RESPONSE_LINE_ENDING:
329 _expect(byte, _CharCode.LF); 430 _expect(byte, _CharCode.LF);
330 _messageType == _MessageType.RESPONSE; 431 _messageType == _MessageType.RESPONSE;
331 _statusCode = int.parse( 432 _statusCode = int.parse(
332 new String.fromCharCodes(_method_or_status_code)); 433 new String.fromCharCodes(_method_or_status_code));
333 if (_statusCode < 100 || _statusCode > 599) { 434 if (_statusCode < 100 || _statusCode > 599) {
334 throw new HttpParserException("Invalid response status code"); 435 throw new HttpParserException("Invalid response status code");
436 } else {
437 // Check whether this response will never have a body.
438 _noMessageBody = _statusCode <= 199 || _statusCode == 204 ||
439 _statusCode == 304;
335 } 440 }
336 _state = _State.HEADER_START; 441 _state = _State.HEADER_START;
337 break; 442 break;
338 443
339 case _State.HEADER_START: 444 case _State.HEADER_START:
445 _headers = new _HttpHeaders(version);
340 if (byte == _CharCode.CR) { 446 if (byte == _CharCode.CR) {
341 _state = _State.HEADER_ENDING; 447 _state = _State.HEADER_ENDING;
342 } else { 448 } else {
343 // Start of new header field. 449 // Start of new header field.
344 _headerField.add(_toLowerCase(byte)); 450 _headerField.add(_toLowerCase(byte));
345 _state = _State.HEADER_FIELD; 451 _state = _State.HEADER_FIELD;
346 } 452 }
347 break; 453 break;
348 454
349 case _State.HEADER_FIELD: 455 case _State.HEADER_FIELD:
(...skipping 29 matching lines...) Expand all
379 _expect(byte, _CharCode.LF); 485 _expect(byte, _CharCode.LF);
380 _state = _State.HEADER_VALUE_FOLD_OR_END; 486 _state = _State.HEADER_VALUE_FOLD_OR_END;
381 break; 487 break;
382 488
383 case _State.HEADER_VALUE_FOLD_OR_END: 489 case _State.HEADER_VALUE_FOLD_OR_END:
384 if (byte == _CharCode.SP || byte == _CharCode.HT) { 490 if (byte == _CharCode.SP || byte == _CharCode.HT) {
385 _state = _State.HEADER_VALUE_START; 491 _state = _State.HEADER_VALUE_START;
386 } else { 492 } else {
387 String headerField = new String.fromCharCodes(_headerField); 493 String headerField = new String.fromCharCodes(_headerField);
388 String headerValue = new String.fromCharCodes(_headerValue); 494 String headerValue = new String.fromCharCodes(_headerValue);
389 bool reportHeader = true; 495 if (headerField == "transfer-encoding" &&
496 headerValue.toLowerCase() == "chunked") {
497 _chunked = true;
498 }
390 if (headerField == "connection") { 499 if (headerField == "connection") {
391 List<String> tokens = _tokenizeFieldValue(headerValue); 500 List<String> tokens = _tokenizeFieldValue(headerValue);
392 for (int i = 0; i < tokens.length; i++) { 501 for (int i = 0; i < tokens.length; i++) {
393 String token = tokens[i].toLowerCase(); 502 if (tokens[i].toLowerCase() == "upgrade") {
394 if (token == "keep-alive") {
395 _persistentConnection = true;
396 } else if (token == "close") {
397 _persistentConnection = false;
398 } else if (token == "upgrade") {
399 _connectionUpgrade = true; 503 _connectionUpgrade = true;
400 } 504 }
401 _headers.add(headerField, token); 505 _headers.add(headerField, tokens[i]);
402
403 } 506 }
404 reportHeader = false; 507 } else {
405 } else if (headerField == "transfer-encoding" &&
406 headerValue.toLowerCase() == "chunked") {
407 _chunked = true;
408 }
409 if (reportHeader) {
410 _headers.add(headerField, headerValue); 508 _headers.add(headerField, headerValue);
411 } 509 }
412 _headerField.clear(); 510 _headerField.clear();
413 _headerValue.clear(); 511 _headerValue.clear();
414 512
415 if (byte == _CharCode.CR) { 513 if (byte == _CharCode.CR) {
416 _state = _State.HEADER_ENDING; 514 _state = _State.HEADER_ENDING;
417 } else { 515 } else {
418 // Start of new header field. 516 // Start of new header field.
419 _headerField.add(_toLowerCase(byte)); 517 _headerField.add(_toLowerCase(byte));
420 _state = _State.HEADER_FIELD; 518 _state = _State.HEADER_FIELD;
421 } 519 }
422 } 520 }
423 break; 521 break;
424 522
425 case _State.HEADER_ENDING: 523 case _State.HEADER_ENDING:
426 _expect(byte, _CharCode.LF); 524 _expect(byte, _CharCode.LF);
427 _headers._mutable = false; 525 _headers._mutable = false;
428 526
429 _contentLength = _headers.contentLength; 527 _transferLength = _headers.contentLength;
430 // Ignore the Content-Length header if Transfer-Encoding 528 // Ignore the Content-Length header if Transfer-Encoding
431 // is chunked (RFC 2616 section 4.4) 529 // is chunked (RFC 2616 section 4.4)
432 if (_chunked) _contentLength = -1; 530 if (_chunked) _transferLength = -1;
433 531
434 // If a request message has neither Content-Length nor 532 // If a request message has neither Content-Length nor
435 // Transfer-Encoding the message must not have a body (RFC 533 // Transfer-Encoding the message must not have a body (RFC
436 // 2616 section 4.3). 534 // 2616 section 4.3).
437 if (_messageType == _MessageType.REQUEST && 535 if (_messageType == _MessageType.REQUEST &&
438 _contentLength < 0 && 536 _transferLength < 0 &&
439 _chunked == false) { 537 _chunked == false) {
440 _contentLength = 0; 538 _transferLength = 0;
441 } 539 }
442 if (_connectionUpgrade) { 540 if (_connectionUpgrade) {
443 _state = _State.UPGRADED; 541 _state = _State.UPGRADED;
542 _transferLength = 0;
444 } 543 }
445 var noBody; 544 _createIncoming(_transferLength);
446 if (_requestParser) { 545 if (_requestParser) {
447 noBody = _contentLength == 0; 546 _incoming.method =
448 requestStart(new String.fromCharCodes(_method_or_status_code), 547 new String.fromCharCodes(_method_or_status_code);
449 new String.fromCharCodes(_uri_or_reason_phrase), 548 _incoming.uri =
450 version, 549 Uri.parse(
451 _headers, 550 new String.fromCharCodes(_uri_or_reason_phrase));
452 !noBody);
453 } else { 551 } else {
454 // Check whether this response will never have a body. 552 _incoming.statusCode = _statusCode;
455 noBody = _contentLength == 0 || 553 _incoming.reasonPhrase =
456 _statusCode <= 199 || 554 new String.fromCharCodes(_uri_or_reason_phrase);
457 _statusCode == HttpStatus.NO_CONTENT ||
458 _statusCode == HttpStatus.NOT_MODIFIED ||
459 _responseToMethod == "HEAD";
460 responseStart(_statusCode,
461 new String.fromCharCodes(_uri_or_reason_phrase),
462 version,
463 _headers,
464 !noBody);
465 } 555 }
466 _method_or_status_code.clear(); 556 _method_or_status_code.clear();
467 _uri_or_reason_phrase.clear(); 557 _uri_or_reason_phrase.clear();
468 if (_state == _State.CANCELED) continue; 558 if (_connectionUpgrade) {
469 if (!_connectionUpgrade) { 559 _incoming.upgraded = true;
470 if (noBody) { 560 _controller.add(_incoming);
471 _bodyEnd(); 561 break;
472 _reset();
473 } else if (_chunked) {
474 _state = _State.CHUNK_SIZE;
475 _remainingContent = 0;
476 } else if (_contentLength > 0) {
477 _remainingContent = _contentLength;
478 _state = _State.BODY;
479 } else {
480 // Neither chunked nor content length. End of body
481 // indicated by close.
482 _state = _State.BODY;
483 }
484 } 562 }
563 if (_chunked) {
564 _state = _State.CHUNK_SIZE;
565 _remainingContent = 0;
566 } else if (_transferLength == 0 ||
567 (_messageType == _MessageType.RESPONSE &&
568 (_noMessageBody || _responseToMethod == "HEAD"))) {
569 _state = _State.START;
570 var tmp = _incoming;
571 _closeIncoming();
572 _controller.add(tmp);
573 break;
574 } else if (_transferLength > 0) {
575 _remainingContent = _transferLength;
576 _state = _State.BODY;
577 } else {
578 // Neither chunked nor content length. End of body
579 // indicated by close.
580 _state = _State.BODY;
581 }
582 _controller.add(_incoming);
485 break; 583 break;
486 584
487 case _State.CHUNK_SIZE_STARTING_CR: 585 case _State.CHUNK_SIZE_STARTING_CR:
488 _expect(byte, _CharCode.CR); 586 _expect(byte, _CharCode.CR);
489 _state = _State.CHUNK_SIZE_STARTING_LF; 587 _state = _State.CHUNK_SIZE_STARTING_LF;
490 break; 588 break;
491 589
492 case _State.CHUNK_SIZE_STARTING_LF: 590 case _State.CHUNK_SIZE_STARTING_LF:
493 _expect(byte, _CharCode.LF); 591 _expect(byte, _CharCode.LF);
494 _state = _State.CHUNK_SIZE; 592 _state = _State.CHUNK_SIZE;
(...skipping 25 matching lines...) Expand all
520 } 618 }
521 break; 619 break;
522 620
523 case _State.CHUNKED_BODY_DONE_CR: 621 case _State.CHUNKED_BODY_DONE_CR:
524 _expect(byte, _CharCode.CR); 622 _expect(byte, _CharCode.CR);
525 _state = _State.CHUNKED_BODY_DONE_LF; 623 _state = _State.CHUNKED_BODY_DONE_LF;
526 break; 624 break;
527 625
528 case _State.CHUNKED_BODY_DONE_LF: 626 case _State.CHUNKED_BODY_DONE_LF:
529 _expect(byte, _CharCode.LF); 627 _expect(byte, _CharCode.LF);
530 _bodyEnd(); 628 _state = _State.START;
531 if (_state == _State.CANCELED) continue; 629 _closeIncoming();
532 _reset();
533 break; 630 break;
534 631
535 case _State.BODY: 632 case _State.BODY:
536 // The body is not handled one byte at a time but in blocks. 633 // The body is not handled one byte at a time but in blocks.
537 _index--; 634 _index--;
538 int dataAvailable = _lastIndex - _index; 635 int dataAvailable = _buffer.length - _index;
539 List<int> data; 636 List<int> data;
540 if (_remainingContent == null || 637 if (_remainingContent == null ||
541 dataAvailable <= _remainingContent) { 638 dataAvailable <= _remainingContent) {
542 data = new Uint8List(dataAvailable); 639 data = new Uint8List(dataAvailable);
543 data.setRange(0, dataAvailable, _buffer, _index); 640 data.setRange(0, dataAvailable, _buffer, _index);
544 } else { 641 } else {
545 data = new Uint8List(_remainingContent); 642 data = new Uint8List(_remainingContent);
546 data.setRange(0, _remainingContent, _buffer, _index); 643 data.setRange(0, _remainingContent, _buffer, _index);
547 } 644 }
548 645
549 dataReceived(data); 646 _bodyController.add(data);
550 if (_state == _State.CANCELED) continue;
551 if (_remainingContent != null) { 647 if (_remainingContent != null) {
552 _remainingContent -= data.length; 648 _remainingContent -= data.length;
553 } 649 }
554 _index += data.length; 650 _index += data.length;
555 if (_remainingContent == 0) { 651 if (_remainingContent == 0) {
556 if (!_chunked) { 652 if (!_chunked) {
557 _bodyEnd(); 653 _state = _State.START;
558 if (_state == _State.CANCELED) continue; 654 _closeIncoming();
559 _reset();
560 } else { 655 } else {
561 _state = _State.CHUNK_SIZE_STARTING_CR; 656 _state = _State.CHUNK_SIZE_STARTING_CR;
562 } 657 }
563 } 658 }
564 break; 659 break;
565 660
566 case _State.FAILURE: 661 case _State.FAILURE:
567 // Should be unreachable. 662 // Should be unreachable.
568 assert(false); 663 assert(false);
569 break; 664 break;
570 665
571 default: 666 default:
572 // Should be unreachable. 667 // Should be unreachable.
573 assert(false); 668 assert(false);
574 break; 669 break;
575 } 670 }
576 } 671 }
577 } catch (e) { 672 } catch (e, s) {
578 _state = _State.FAILURE; 673 _state = _State.FAILURE;
579 error(e); 674 error(new AsyncError(e, s));
580 } 675 }
581 676
582 // If all data is parsed or not needed due to failure there is no 677 _parserCalled = false;
583 // need to hold on to the buffer. 678 if (_buffer != null && _index == _buffer.length) {
584 if (_state != _State.UPGRADED) _releaseBuffer(); 679 // If all data is parsed release the buffer and resume receiving
680 // data.
681 _releaseBuffer();
682 if (_state != _State.UPGRADED && _state != _State.FAILURE) {
683 _socketSubscription.resume();
684 }
685 }
585 } 686 }
586 687
587 void streamData(List<int> buffer) { 688 void _onData(List<int> buffer) {
689 _socketSubscription.pause();
588 assert(_buffer == null); 690 assert(_buffer == null);
589 _buffer = buffer; 691 _buffer = buffer;
590 _index = 0; 692 _index = 0;
591 _lastIndex = buffer.length;
592 _parse(); 693 _parse();
593 } 694 }
594 695
595 void streamDone() { 696 void _onDone() {
596 String type() => _requestParser ? "request" : "response"; 697 // onDone cancles the subscription.
698 _socketSubscription = null;
699 if (_state == _State.CLOSED || _state == _State.FAILURE) return;
597 700
701 if (_incoming != null) {
702 if (_state != _State.UPGRADED &&
703 !(_state == _State.START && !_requestParser) &&
704 !(_state == _State.BODY && !_chunked && _transferLength == -1)) {
705 _bodyController.signalError(
706 new AsyncError(
707 new HttpParserException(
708 "Connection closed while receiving data")));
709 }
710 _closeIncoming();
711 _controller.close();
712 return;
713 }
598 // If the connection is idle the HTTP stream is closed. 714 // If the connection is idle the HTTP stream is closed.
599 if (_state == _State.START) { 715 if (_state == _State.START) {
600 if (_requestParser) { 716 if (!_requestParser) {
601 closed();
602 } else {
603 error( 717 error(
604 new HttpParserException( 718 new AsyncError(
605 "Connection closed before full ${type()} header was received")); 719 new HttpParserException(
720 "Connection closed before full header was received")));
606 } 721 }
722 _controller.close();
607 return; 723 return;
608 } 724 }
609 725
726 if (_state == _State.UPGRADED) {
727 _controller.close();
728 return;
729 }
730
610 if (_state < _State.FIRST_BODY_STATE) { 731 if (_state < _State.FIRST_BODY_STATE) {
611 _state = _State.FAILURE; 732 _state = _State.FAILURE;
612 // Report the error through the error callback if any. Otherwise 733 // Report the error through the error callback if any. Otherwise
613 // throw the error. 734 // throw the error.
614 error( 735 error(
615 new HttpParserException( 736 new AsyncError(
616 "Connection closed before full ${type()} header was received")); 737 new HttpParserException(
738 "Connection closed before full header was received")));
739 _controller.close();
617 return; 740 return;
618 } 741 }
619 742
620 if (!_chunked && _contentLength == -1) { 743 if (!_chunked && _transferLength == -1) {
621 dataEnd(true);
622 _state = _State.CLOSED; 744 _state = _State.CLOSED;
623 closed();
624 } else { 745 } else {
625 _state = _State.FAILURE; 746 _state = _State.FAILURE;
626 // Report the error through the error callback if any. Otherwise 747 // Report the error through the error callback if any. Otherwise
627 // throw the error. 748 // throw the error.
628 error( 749 error(
629 new HttpParserException( 750 new AsyncError(
630 "Connection closed before full ${type()} body was received")); 751 new HttpParserException(
752 "Connection closed before full body was received")));
631 } 753 }
754 _controller.close();
632 } 755 }
633 756
634 void streamError(e) { 757 void _onError(e) {
635 error(e); 758 _controller.signalError(e);
636 } 759 }
637 760
638 String get version { 761 String get version {
639 switch (_httpVersion) { 762 switch (_httpVersion) {
640 case _HttpVersion.HTTP10: 763 case _HttpVersion.HTTP10:
641 return "1.0"; 764 return "1.0";
642 case _HttpVersion.HTTP11: 765 case _HttpVersion.HTTP11:
643 return "1.1"; 766 return "1.1";
644 } 767 }
645 return null; 768 return null;
646 } 769 }
647 770
648 void cancel() {
649 _state = _State.CANCELED;
650 }
651
652 void restart() {
653 _reset();
654 }
655
656 int get messageType => _messageType; 771 int get messageType => _messageType;
657 int get contentLength => _contentLength; 772 int get transferLength => _transferLength;
658 bool get upgrade => _connectionUpgrade && _state == _State.UPGRADED; 773 bool get upgrade => _connectionUpgrade && _state == _State.UPGRADED;
659 bool get persistentConnection => _persistentConnection; 774 bool get persistentConnection => _persistentConnection;
660 775
661 void set responseToMethod(String method) { _responseToMethod = method; } 776 void set responseToMethod(String method) { _responseToMethod = method; }
662 777
778 _HttpDetachedIncoming detachIncoming() {
779 var completer = _pauseCompleter;
780 _pauseCompleter = null;
781 return new _HttpDetachedIncoming(_socketSubscription,
782 readUnparsedData(),
783 completer);
784 }
785
663 List<int> readUnparsedData() { 786 List<int> readUnparsedData() {
664 if (_buffer == null) return []; 787 if (_buffer == null) return null;
665 if (_index == _lastIndex) return []; 788 if (_index == _buffer.length) return null;
666 var result = _buffer.getRange(_index, _lastIndex - _index); 789 var result = _buffer.getRange(_index, _buffer.length - _index);
667 _releaseBuffer(); 790 _releaseBuffer();
668 return result; 791 return result;
669 } 792 }
670 793
671 void _bodyEnd() {
672 dataEnd(_messageType == _MessageType.RESPONSE && !_persistentConnection);
673 }
674
675 _reset() { 794 _reset() {
795 if (_state == _State.UPGRADED) return;
676 _state = _State.START; 796 _state = _State.START;
677 _messageType = _MessageType.UNDETERMINED; 797 _messageType = _MessageType.UNDETERMINED;
678 _headerField = new List(); 798 _headerField = new List();
679 _headerValue = new List(); 799 _headerValue = new List();
680 _method_or_status_code = new List(); 800 _method_or_status_code = new List();
681 _uri_or_reason_phrase = new List(); 801 _uri_or_reason_phrase = new List();
682 802
683 _httpVersion = _HttpVersion.UNDETERMINED; 803 _httpVersion = _HttpVersion.UNDETERMINED;
684 _contentLength = -1; 804 _transferLength = -1;
685 _persistentConnection = false; 805 _persistentConnection = false;
686 _connectionUpgrade = false; 806 _connectionUpgrade = false;
687 _chunked = false; 807 _chunked = false;
688 808
809 _noMessageBody = false;
689 _responseToMethod = null; 810 _responseToMethod = null;
690 _remainingContent = null; 811 _remainingContent = null;
691 812
692 _headers = new _HttpHeaders(); 813 _headers = null;
693 } 814 }
694 815
695 _releaseBuffer() { 816 _releaseBuffer() {
696 _buffer = null; 817 _buffer = null;
697 _index = null; 818 _index = null;
698 _lastIndex = null;
699 } 819 }
700 820
701 bool _isTokenChar(int byte) { 821 bool _isTokenChar(int byte) {
702 return byte > 31 && byte < 128 && _Const.SEPARATORS.indexOf(byte) == -1; 822 return byte > 31 && byte < 128 && _Const.SEPARATORS.indexOf(byte) == -1;
703 } 823 }
704 824
705 List<String> _tokenizeFieldValue(String headerValue) { 825 List<String> _tokenizeFieldValue(String headerValue) {
706 List<String> tokens = new List<String>(); 826 List<String> tokens = new List<String>();
707 int start = 0; 827 int start = 0;
708 int index = 0; 828 int index = 0;
(...skipping 28 matching lines...) Expand all
737 return byte - 0x30; // 0 - 9 857 return byte - 0x30; // 0 - 9
738 } else if (0x41 <= byte && byte <= 0x46) { 858 } else if (0x41 <= byte && byte <= 0x46) {
739 return byte - 0x41 + 10; // A - F 859 return byte - 0x41 + 10; // A - F
740 } else if (0x61 <= byte && byte <= 0x66) { 860 } else if (0x61 <= byte && byte <= 0x66) {
741 return byte - 0x61 + 10; // a - f 861 return byte - 0x61 + 10; // a - f
742 } else { 862 } else {
743 throw new HttpParserException("Failed to parse HTTP"); 863 throw new HttpParserException("Failed to parse HTTP");
744 } 864 }
745 } 865 }
746 866
867 void _createIncoming(int transferLength) {
868 assert(_incoming == null);
869 assert(_bodyController == null);
870 _bodyController = new StreamController<List<int>>(
871 onSubscriptionStateChange: _updateParsePauseState,
872 onPauseStateChange: _updateParsePauseState);
873 _incoming = new _HttpIncoming(
874 _headers, transferLength, _bodyController.stream);
875 _pauseParsing(); // Needed to handle detaching - don't start on the body!
876 }
877
878 void _closeIncoming() {
879 assert(_incoming != null);
880 var tmp = _incoming;
881 _incoming = null;
882 tmp.close();
883 if (_bodyController != null) {
884 _bodyController.close();
885 _bodyController = null;
886 }
887 _updateParsePauseState();
888 }
889
890 void _continueParsing() {
891 _paused = false;
892 if (!_parserCalled && _buffer != null) _parse();
893 }
894
895 void _pauseParsing() {
896 _paused = true;
897 }
898
899 void _updateParsePauseState() {
900 if (_bodyController != null) {
901 if (_bodyController.hasSubscribers && !_bodyController.isPaused) {
902 _continueParsing();
903 } else {
904 _pauseParsing();
905 }
906 } else {
907 if (_controller.hasSubscribers && !_controller.isPaused) {
908 _continueParsing();
909 } else {
910 _pauseParsing();
911 }
912 }
913 }
914
915 void error(error) {
916 if (_socketSubscription != null) _socketSubscription.cancel();
917 _state = _State.FAILURE;
918 _controller.signalError(error);
919 _controller.close();
920 }
921
922 // State.
923 bool _parserCalled = false;
924
747 // The data that is currently being parsed. 925 // The data that is currently being parsed.
748 List<int> _buffer; 926 List<int> _buffer;
749 int _index; 927 int _index;
750 int _lastIndex;
751 928
752 bool _requestParser; 929 final bool _requestParser;
753 int _state; 930 int _state;
754 int _httpVersionIndex; 931 int _httpVersionIndex;
755 int _messageType; 932 int _messageType;
756 int _statusCode; 933 int _statusCode;
757 List _method_or_status_code; 934 List _method_or_status_code;
758 List _uri_or_reason_phrase; 935 List _uri_or_reason_phrase;
759 List _headerField; 936 List _headerField;
760 List _headerValue; 937 List _headerValue;
761 938
762 int _httpVersion; 939 int _httpVersion;
763 int _contentLength; 940 int _transferLength;
764 bool _persistentConnection; 941 bool _persistentConnection;
765 bool _connectionUpgrade; 942 bool _connectionUpgrade;
766 bool _chunked; 943 bool _chunked;
767 944
945 bool _noMessageBody;
768 String _responseToMethod; // Indicates the method used for the request. 946 String _responseToMethod; // Indicates the method used for the request.
769 int _remainingContent; 947 int _remainingContent;
770 948
771 _HttpHeaders _headers = new _HttpHeaders(); 949 _HttpHeaders _headers;
772 950
773 // Callbacks. 951 // The current incoming connection.
774 Function requestStart; 952 _HttpIncoming _incoming;
775 Function responseStart; 953 StreamSubscription _socketSubscription;
776 Function dataReceived; 954 bool _paused = false;
777 Function dataEnd; 955 Completer _pauseCompleter;
778 Function error; 956 StreamController<_HttpIncoming> _controller;
779 Function closed; 957 StreamController<List<int>> _bodyController;
780 } 958 }
781 959
782 960
783 class HttpParserException implements Exception { 961 class HttpParserException implements Exception {
784 const HttpParserException([String this.message = ""]); 962 const HttpParserException([String this.message = ""]);
785 String toString() => "HttpParserException: $message"; 963 String toString() => "HttpParserException: $message";
786 final String message; 964 final String message;
787 } 965 }
OLDNEW
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/io/http_session.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698