OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // Global constants. | 7 // Global constants. |
8 class _Const { | 8 class _Const { |
9 // Bytes for "HTTP". | 9 // Bytes for "HTTP". |
10 static const HTTP = const [72, 84, 84, 80]; | 10 static const HTTP = const [72, 84, 84, 80]; |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
67 static const int CHUNK_SIZE_STARTING_CR = 17; | 67 static const int CHUNK_SIZE_STARTING_CR = 17; |
68 static const int CHUNK_SIZE_STARTING_LF = 18; | 68 static const int CHUNK_SIZE_STARTING_LF = 18; |
69 static const int CHUNK_SIZE = 19; | 69 static const int CHUNK_SIZE = 19; |
70 static const int CHUNK_SIZE_EXTENSION = 20; | 70 static const int CHUNK_SIZE_EXTENSION = 20; |
71 static const int CHUNK_SIZE_ENDING = 21; | 71 static const int CHUNK_SIZE_ENDING = 21; |
72 static const int CHUNKED_BODY_DONE_CR = 22; | 72 static const int CHUNKED_BODY_DONE_CR = 22; |
73 static const int CHUNKED_BODY_DONE_LF = 23; | 73 static const int CHUNKED_BODY_DONE_LF = 23; |
74 static const int BODY = 24; | 74 static const int BODY = 24; |
75 static const int CLOSED = 25; | 75 static const int CLOSED = 25; |
76 static const int UPGRADED = 26; | 76 static const int UPGRADED = 26; |
77 static const int CANCELED = 27; | 77 static const int FAILURE = 27; |
78 static const int FAILURE = 28; | |
79 | 78 |
80 static const int FIRST_BODY_STATE = CHUNK_SIZE_STARTING_CR; | 79 static const int FIRST_BODY_STATE = CHUNK_SIZE_STARTING_CR; |
81 static const int FIRST_PARSE_STOP_STATE = CLOSED; | |
82 } | 80 } |
83 | 81 |
84 // HTTP version of the request or response being parsed. | 82 // HTTP version of the request or response being parsed. |
85 class _HttpVersion { | 83 class _HttpVersion { |
86 static const int UNDETERMINED = 0; | 84 static const int UNDETERMINED = 0; |
87 static const int HTTP10 = 1; | 85 static const int HTTP10 = 1; |
88 static const int HTTP11 = 2; | 86 static const int HTTP11 = 2; |
89 } | 87 } |
90 | 88 |
91 // States of the HTTP parser state machine. | 89 // States of the HTTP parser state machine. |
92 class _MessageType { | 90 class _MessageType { |
93 static const int UNDETERMINED = 0; | 91 static const int UNDETERMINED = 0; |
94 static const int REQUEST = 1; | 92 static const int REQUEST = 1; |
95 static const int RESPONSE = 0; | 93 static const int RESPONSE = 0; |
96 } | 94 } |
97 | 95 |
| 96 class _HttpDetachedIncoming extends Stream<List<int>> { |
| 97 StreamController<List<int>> controller; |
| 98 final StreamSubscription subscription; |
| 99 |
| 100 List<int> carryOverData; |
| 101 bool paused; |
| 102 |
| 103 Completer resumeCompleter; |
| 104 |
| 105 _HttpDetachedIncoming(StreamSubscription this.subscription, |
| 106 List<int> this.carryOverData, |
| 107 Completer oldResumeCompleter) { |
| 108 controller = new StreamController<List<int>>( |
| 109 onSubscriptionStateChange: onSubscriptionStateChange, |
| 110 onPauseStateChange: onPauseStateChange); |
| 111 pause(); |
| 112 if (oldResumeCompleter != null) oldResumeCompleter.complete(); |
| 113 subscription.resume(); |
| 114 subscription.onData(controller.add); |
| 115 subscription.onDone(controller.close); |
| 116 subscription.onError(controller.signalError); |
| 117 } |
| 118 |
| 119 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 120 {void onError(AsyncError error), |
| 121 void onDone(), |
| 122 bool unsubscribeOnError}) { |
| 123 return controller.stream.listen( |
| 124 onData, |
| 125 onError: onError, |
| 126 onDone: onDone, |
| 127 unsubscribeOnError: unsubscribeOnError); |
| 128 } |
| 129 |
| 130 void resume() { |
| 131 paused = false; |
| 132 if (carryOverData != null) { |
| 133 var data = carryOverData; |
| 134 carryOverData = null; |
| 135 controller.add(data); |
| 136 // If the consumer pauses again after the carry-over data, we'll not |
| 137 // continue our subscriber until the next resume. |
| 138 if (paused) return; |
| 139 } |
| 140 if (resumeCompleter != null) { |
| 141 resumeCompleter.complete(); |
| 142 resumeCompleter = null; |
| 143 } |
| 144 } |
| 145 |
| 146 void pause() { |
| 147 paused = true; |
| 148 if (resumeCompleter == null) { |
| 149 resumeCompleter = new Completer(); |
| 150 subscription.pause(resumeCompleter.future); |
| 151 } |
| 152 } |
| 153 |
| 154 void onPauseStateChange() { |
| 155 if (controller.isPaused) { |
| 156 pause(); |
| 157 } else { |
| 158 resume(); |
| 159 } |
| 160 } |
| 161 |
| 162 void onSubscriptionStateChange() { |
| 163 if (controller.hasSubscribers) { |
| 164 resume(); |
| 165 } else { |
| 166 subscription.cancel(); |
| 167 } |
| 168 } |
| 169 } |
| 170 |
98 | 171 |
99 /** | 172 /** |
100 * HTTP parser which parses the HTTP stream as data is supplied | 173 * HTTP parser which parses the data stream given to [consume]. |
101 * through the [:streamData:] and [:streamDone:] methods. As the | |
102 * data is parsed the following callbacks are called: | |
103 * | 174 * |
104 * [:requestStart:] | 175 * If an HTTP parser error occours, the parser will signal an error to either |
105 * [:responseStart:] | 176 * the current _HttpIncoming or the _parser itself. |
106 * [:dataReceived:] | |
107 * [:dataEnd:] | |
108 * [:closed:] | |
109 * [:error:] | |
110 * | |
111 * If an HTTP parser error occours it is possible to get an exception | |
112 * thrown from the [:streamData:] and [:streamDone:] methods if | |
113 * the error callback is not set. | |
114 * | 177 * |
115 * The connection upgrades (e.g. switching from HTTP/1.1 to the | 178 * The connection upgrades (e.g. switching from HTTP/1.1 to the |
116 * WebSocket protocol) is handled in a special way. If connection | 179 * WebSocket protocol) is handled in a special way. If connection |
117 * upgrade is specified in the headers, then on the callback to | 180 * upgrade is specified in the headers, then on the callback to |
118 * [:responseStart:] the [:upgrade:] property on the [:HttpParser:] | 181 * [:responseStart:] the [:upgrade:] property on the [:HttpParser:] |
119 * object will be [:true:] indicating that from now on the protocol is | 182 * object will be [:true:] indicating that from now on the protocol is |
120 * not HTTP anymore and no more callbacks will happen, that is | 183 * not HTTP anymore and no more callbacks will happen, that is |
121 * [:dataReceived:] and [:dataEnd:] are not called in this case as | 184 * [:dataReceived:] and [:dataEnd:] are not called in this case as |
122 * there is no more HTTP data. After the upgrade the method | 185 * there is no more HTTP data. After the upgrade the method |
123 * [:readUnparsedData:] can be used to read any remaining bytes in the | 186 * [:readUnparsedData:] can be used to read any remaining bytes in the |
124 * HTTP parser which are part of the protocol the connection is | 187 * HTTP parser which are part of the protocol the connection is |
125 * upgrading to. These bytes cannot be processed by the HTTP parser | 188 * upgrading to. These bytes cannot be processed by the HTTP parser |
126 * and should be handled according to whatever protocol is being | 189 * and should be handled according to whatever protocol is being |
127 * upgraded to. | 190 * upgraded to. |
128 */ | 191 */ |
129 class _HttpParser { | 192 class _HttpParser |
130 _HttpParser.requestParser() { | 193 extends Stream<_HttpIncoming> |
131 _requestParser = true; | 194 implements StreamConsumer<List<int>, _HttpParser> { |
| 195 |
| 196 factory _HttpParser.requestParser() { |
| 197 return new _HttpParser._(true); |
| 198 } |
| 199 |
| 200 factory _HttpParser.responseParser() { |
| 201 return new _HttpParser._(false); |
| 202 } |
| 203 |
| 204 _HttpParser._(this._requestParser) { |
| 205 _controller = new StreamController<_HttpIncoming>( |
| 206 onSubscriptionStateChange: _updateParsePauseState, |
| 207 onPauseStateChange: _updateParsePauseState); |
132 _reset(); | 208 _reset(); |
133 } | 209 } |
134 _HttpParser.responseParser() { | 210 |
135 _requestParser = false; | 211 |
136 _reset(); | 212 StreamSubscription<_HttpIncoming> listen(void onData(List<int> event), |
| 213 {void onError(AsyncError error), |
| 214 void onDone(), |
| 215 bool unsubscribeOnError}) { |
| 216 return _controller.stream.listen(onData, |
| 217 onError: onError, |
| 218 onDone: onDone, |
| 219 unsubscribeOnError: unsubscribeOnError); |
| 220 } |
| 221 |
| 222 Future<_HttpParser> consume(Stream<List<int>> stream) { |
| 223 // Listen to the stream and handle data accordingly. When a |
| 224 // _HttpIncoming is created, _dataPause, _dataResume, _dataDone is |
| 225 // given to provide a way of controlling the parser. |
| 226 // TODO(ajohnsen): Remove _dataPause, _dataResume and _dataDone and clean up |
| 227 // how the _HttpIncoming signals the parser. |
| 228 var completer = new Completer(); |
| 229 _socketSubscription = stream.listen( |
| 230 _onData, |
| 231 onError: _onError, |
| 232 onDone: () { |
| 233 _onDone(); |
| 234 completer.complete(this); |
| 235 }); |
| 236 return completer.future; |
137 } | 237 } |
138 | 238 |
139 // From RFC 2616. | 239 // From RFC 2616. |
140 // generic-message = start-line | 240 // generic-message = start-line |
141 // *(message-header CRLF) | 241 // *(message-header CRLF) |
142 // CRLF | 242 // CRLF |
143 // [ message-body ] | 243 // [ message-body ] |
144 // start-line = Request-Line | Status-Line | 244 // start-line = Request-Line | Status-Line |
145 // Request-Line = Method SP Request-URI SP HTTP-Version CRLF | 245 // Request-Line = Method SP Request-URI SP HTTP-Version CRLF |
146 // Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF | 246 // Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF |
147 // message-header = field-name ":" [ field-value ] | 247 // message-header = field-name ":" [ field-value ] |
148 void _parse() { | 248 void _parse() { |
| 249 assert(!_parserCalled); |
| 250 _parserCalled = true; |
149 try { | 251 try { |
150 if (_state == _State.CLOSED) { | 252 if (_state == _State.CLOSED) { |
151 throw new HttpParserException("Data on closed connection"); | 253 throw new HttpParserException("Data on closed connection"); |
152 } | 254 } |
153 if (_state == _State.UPGRADED) { | |
154 throw new HttpParserException("Data on upgraded connection"); | |
155 } | |
156 if (_state == _State.FAILURE) { | 255 if (_state == _State.FAILURE) { |
157 throw new HttpParserException("Data on failed connection"); | 256 throw new HttpParserException("Data on failed connection"); |
158 } | 257 } |
159 if (_state == _State.CANCELED) { | |
160 throw new HttpParserException("Data on canceled connection"); | |
161 } | |
162 while (_buffer != null && | 258 while (_buffer != null && |
163 _index < _lastIndex && | 259 _index < _buffer.length && |
164 _state <= _State.FIRST_PARSE_STOP_STATE) { | 260 _state != _State.FAILURE && |
| 261 _state != _State.UPGRADED) { |
| 262 if (_paused) { |
| 263 _parserCalled = false; |
| 264 return; |
| 265 } |
165 int byte = _buffer[_index++]; | 266 int byte = _buffer[_index++]; |
166 switch (_state) { | 267 switch (_state) { |
167 case _State.START: | 268 case _State.START: |
168 if (byte == _Const.HTTP[0]) { | 269 if (byte == _Const.HTTP[0]) { |
169 // Start parsing method or HTTP version. | 270 // Start parsing method or HTTP version. |
170 _httpVersionIndex = 1; | 271 _httpVersionIndex = 1; |
171 _state = _State.METHOD_OR_RESPONSE_HTTP_VERSION; | 272 _state = _State.METHOD_OR_RESPONSE_HTTP_VERSION; |
172 } else { | 273 } else { |
173 // Start parsing method. | 274 // Start parsing method. |
174 if (!_isTokenChar(byte)) { | 275 if (!_isTokenChar(byte)) { |
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
321 if (byte == _CharCode.CR || byte == _CharCode.LF) { | 422 if (byte == _CharCode.CR || byte == _CharCode.LF) { |
322 throw new HttpParserException("Invalid response reason phrase"); | 423 throw new HttpParserException("Invalid response reason phrase"); |
323 } | 424 } |
324 _uri_or_reason_phrase.add(byte); | 425 _uri_or_reason_phrase.add(byte); |
325 } | 426 } |
326 break; | 427 break; |
327 | 428 |
328 case _State.RESPONSE_LINE_ENDING: | 429 case _State.RESPONSE_LINE_ENDING: |
329 _expect(byte, _CharCode.LF); | 430 _expect(byte, _CharCode.LF); |
330 _messageType == _MessageType.RESPONSE; | 431 _messageType == _MessageType.RESPONSE; |
331 _statusCode = int.parse( | 432 _statusCode = int.parse( |
332 new String.fromCharCodes(_method_or_status_code)); | 433 new String.fromCharCodes(_method_or_status_code)); |
333 if (_statusCode < 100 || _statusCode > 599) { | 434 if (_statusCode < 100 || _statusCode > 599) { |
334 throw new HttpParserException("Invalid response status code"); | 435 throw new HttpParserException("Invalid response status code"); |
| 436 } else { |
| 437 // Check whether this response will never have a body. |
| 438 _noMessageBody = _statusCode <= 199 || _statusCode == 204 || |
| 439 _statusCode == 304; |
335 } | 440 } |
336 _state = _State.HEADER_START; | 441 _state = _State.HEADER_START; |
337 break; | 442 break; |
338 | 443 |
339 case _State.HEADER_START: | 444 case _State.HEADER_START: |
| 445 _headers = new _HttpHeaders(version); |
340 if (byte == _CharCode.CR) { | 446 if (byte == _CharCode.CR) { |
341 _state = _State.HEADER_ENDING; | 447 _state = _State.HEADER_ENDING; |
342 } else { | 448 } else { |
343 // Start of new header field. | 449 // Start of new header field. |
344 _headerField.add(_toLowerCase(byte)); | 450 _headerField.add(_toLowerCase(byte)); |
345 _state = _State.HEADER_FIELD; | 451 _state = _State.HEADER_FIELD; |
346 } | 452 } |
347 break; | 453 break; |
348 | 454 |
349 case _State.HEADER_FIELD: | 455 case _State.HEADER_FIELD: |
(...skipping 29 matching lines...) Expand all Loading... |
379 _expect(byte, _CharCode.LF); | 485 _expect(byte, _CharCode.LF); |
380 _state = _State.HEADER_VALUE_FOLD_OR_END; | 486 _state = _State.HEADER_VALUE_FOLD_OR_END; |
381 break; | 487 break; |
382 | 488 |
383 case _State.HEADER_VALUE_FOLD_OR_END: | 489 case _State.HEADER_VALUE_FOLD_OR_END: |
384 if (byte == _CharCode.SP || byte == _CharCode.HT) { | 490 if (byte == _CharCode.SP || byte == _CharCode.HT) { |
385 _state = _State.HEADER_VALUE_START; | 491 _state = _State.HEADER_VALUE_START; |
386 } else { | 492 } else { |
387 String headerField = new String.fromCharCodes(_headerField); | 493 String headerField = new String.fromCharCodes(_headerField); |
388 String headerValue = new String.fromCharCodes(_headerValue); | 494 String headerValue = new String.fromCharCodes(_headerValue); |
389 bool reportHeader = true; | 495 if (headerField == "transfer-encoding" && |
| 496 headerValue.toLowerCase() == "chunked") { |
| 497 _chunked = true; |
| 498 } |
390 if (headerField == "connection") { | 499 if (headerField == "connection") { |
391 List<String> tokens = _tokenizeFieldValue(headerValue); | 500 List<String> tokens = _tokenizeFieldValue(headerValue); |
392 for (int i = 0; i < tokens.length; i++) { | 501 for (int i = 0; i < tokens.length; i++) { |
393 String token = tokens[i].toLowerCase(); | 502 if (tokens[i].toLowerCase() == "upgrade") { |
394 if (token == "keep-alive") { | |
395 _persistentConnection = true; | |
396 } else if (token == "close") { | |
397 _persistentConnection = false; | |
398 } else if (token == "upgrade") { | |
399 _connectionUpgrade = true; | 503 _connectionUpgrade = true; |
400 } | 504 } |
401 _headers.add(headerField, token); | 505 _headers.add(headerField, tokens[i]); |
402 | |
403 } | 506 } |
404 reportHeader = false; | 507 } else { |
405 } else if (headerField == "transfer-encoding" && | |
406 headerValue.toLowerCase() == "chunked") { | |
407 _chunked = true; | |
408 } | |
409 if (reportHeader) { | |
410 _headers.add(headerField, headerValue); | 508 _headers.add(headerField, headerValue); |
411 } | 509 } |
412 _headerField.clear(); | 510 _headerField.clear(); |
413 _headerValue.clear(); | 511 _headerValue.clear(); |
414 | 512 |
415 if (byte == _CharCode.CR) { | 513 if (byte == _CharCode.CR) { |
416 _state = _State.HEADER_ENDING; | 514 _state = _State.HEADER_ENDING; |
417 } else { | 515 } else { |
418 // Start of new header field. | 516 // Start of new header field. |
419 _headerField.add(_toLowerCase(byte)); | 517 _headerField.add(_toLowerCase(byte)); |
420 _state = _State.HEADER_FIELD; | 518 _state = _State.HEADER_FIELD; |
421 } | 519 } |
422 } | 520 } |
423 break; | 521 break; |
424 | 522 |
425 case _State.HEADER_ENDING: | 523 case _State.HEADER_ENDING: |
426 _expect(byte, _CharCode.LF); | 524 _expect(byte, _CharCode.LF); |
427 _headers._mutable = false; | 525 _headers._mutable = false; |
428 | 526 |
429 _contentLength = _headers.contentLength; | 527 _transferLength = _headers.contentLength; |
430 // Ignore the Content-Length header if Transfer-Encoding | 528 // Ignore the Content-Length header if Transfer-Encoding |
431 // is chunked (RFC 2616 section 4.4) | 529 // is chunked (RFC 2616 section 4.4) |
432 if (_chunked) _contentLength = -1; | 530 if (_chunked) _transferLength = -1; |
433 | 531 |
434 // If a request message has neither Content-Length nor | 532 // If a request message has neither Content-Length nor |
435 // Transfer-Encoding the message must not have a body (RFC | 533 // Transfer-Encoding the message must not have a body (RFC |
436 // 2616 section 4.3). | 534 // 2616 section 4.3). |
437 if (_messageType == _MessageType.REQUEST && | 535 if (_messageType == _MessageType.REQUEST && |
438 _contentLength < 0 && | 536 _transferLength < 0 && |
439 _chunked == false) { | 537 _chunked == false) { |
440 _contentLength = 0; | 538 _transferLength = 0; |
441 } | 539 } |
442 if (_connectionUpgrade) { | 540 if (_connectionUpgrade) { |
443 _state = _State.UPGRADED; | 541 _state = _State.UPGRADED; |
| 542 _transferLength = 0; |
444 } | 543 } |
445 var noBody; | 544 _createIncoming(_transferLength); |
446 if (_requestParser) { | 545 if (_requestParser) { |
447 noBody = _contentLength == 0; | 546 _incoming.method = |
448 requestStart(new String.fromCharCodes(_method_or_status_code), | 547 new String.fromCharCodes(_method_or_status_code); |
449 new String.fromCharCodes(_uri_or_reason_phrase), | 548 _incoming.uri = |
450 version, | 549 Uri.parse( |
451 _headers, | 550 new String.fromCharCodes(_uri_or_reason_phrase)); |
452 !noBody); | |
453 } else { | 551 } else { |
454 // Check whether this response will never have a body. | 552 _incoming.statusCode = _statusCode; |
455 noBody = _contentLength == 0 || | 553 _incoming.reasonPhrase = |
456 _statusCode <= 199 || | 554 new String.fromCharCodes(_uri_or_reason_phrase); |
457 _statusCode == HttpStatus.NO_CONTENT || | |
458 _statusCode == HttpStatus.NOT_MODIFIED || | |
459 _responseToMethod == "HEAD"; | |
460 responseStart(_statusCode, | |
461 new String.fromCharCodes(_uri_or_reason_phrase), | |
462 version, | |
463 _headers, | |
464 !noBody); | |
465 } | 555 } |
466 _method_or_status_code.clear(); | 556 _method_or_status_code.clear(); |
467 _uri_or_reason_phrase.clear(); | 557 _uri_or_reason_phrase.clear(); |
468 if (_state == _State.CANCELED) continue; | 558 if (_connectionUpgrade) { |
469 if (!_connectionUpgrade) { | 559 _incoming.upgraded = true; |
470 if (noBody) { | 560 _controller.add(_incoming); |
471 _bodyEnd(); | 561 break; |
472 _reset(); | |
473 } else if (_chunked) { | |
474 _state = _State.CHUNK_SIZE; | |
475 _remainingContent = 0; | |
476 } else if (_contentLength > 0) { | |
477 _remainingContent = _contentLength; | |
478 _state = _State.BODY; | |
479 } else { | |
480 // Neither chunked nor content length. End of body | |
481 // indicated by close. | |
482 _state = _State.BODY; | |
483 } | |
484 } | 562 } |
| 563 if (_chunked) { |
| 564 _state = _State.CHUNK_SIZE; |
| 565 _remainingContent = 0; |
| 566 } else if (_transferLength == 0 || |
| 567 (_messageType == _MessageType.RESPONSE && |
| 568 (_noMessageBody || _responseToMethod == "HEAD"))) { |
| 569 _state = _State.START; |
| 570 var tmp = _incoming; |
| 571 _closeIncoming(); |
| 572 _controller.add(tmp); |
| 573 break; |
| 574 } else if (_transferLength > 0) { |
| 575 _remainingContent = _transferLength; |
| 576 _state = _State.BODY; |
| 577 } else { |
| 578 // Neither chunked nor content length. End of body |
| 579 // indicated by close. |
| 580 _state = _State.BODY; |
| 581 } |
| 582 _controller.add(_incoming); |
485 break; | 583 break; |
486 | 584 |
487 case _State.CHUNK_SIZE_STARTING_CR: | 585 case _State.CHUNK_SIZE_STARTING_CR: |
488 _expect(byte, _CharCode.CR); | 586 _expect(byte, _CharCode.CR); |
489 _state = _State.CHUNK_SIZE_STARTING_LF; | 587 _state = _State.CHUNK_SIZE_STARTING_LF; |
490 break; | 588 break; |
491 | 589 |
492 case _State.CHUNK_SIZE_STARTING_LF: | 590 case _State.CHUNK_SIZE_STARTING_LF: |
493 _expect(byte, _CharCode.LF); | 591 _expect(byte, _CharCode.LF); |
494 _state = _State.CHUNK_SIZE; | 592 _state = _State.CHUNK_SIZE; |
(...skipping 25 matching lines...) Expand all Loading... |
520 } | 618 } |
521 break; | 619 break; |
522 | 620 |
523 case _State.CHUNKED_BODY_DONE_CR: | 621 case _State.CHUNKED_BODY_DONE_CR: |
524 _expect(byte, _CharCode.CR); | 622 _expect(byte, _CharCode.CR); |
525 _state = _State.CHUNKED_BODY_DONE_LF; | 623 _state = _State.CHUNKED_BODY_DONE_LF; |
526 break; | 624 break; |
527 | 625 |
528 case _State.CHUNKED_BODY_DONE_LF: | 626 case _State.CHUNKED_BODY_DONE_LF: |
529 _expect(byte, _CharCode.LF); | 627 _expect(byte, _CharCode.LF); |
530 _bodyEnd(); | 628 _state = _State.START; |
531 if (_state == _State.CANCELED) continue; | 629 _closeIncoming(); |
532 _reset(); | |
533 break; | 630 break; |
534 | 631 |
535 case _State.BODY: | 632 case _State.BODY: |
536 // The body is not handled one byte at a time but in blocks. | 633 // The body is not handled one byte at a time but in blocks. |
537 _index--; | 634 _index--; |
538 int dataAvailable = _lastIndex - _index; | 635 int dataAvailable = _buffer.length - _index; |
539 List<int> data; | 636 List<int> data; |
540 if (_remainingContent == null || | 637 if (_remainingContent == null || |
541 dataAvailable <= _remainingContent) { | 638 dataAvailable <= _remainingContent) { |
542 data = new Uint8List(dataAvailable); | 639 data = new Uint8List(dataAvailable); |
543 data.setRange(0, dataAvailable, _buffer, _index); | 640 data.setRange(0, dataAvailable, _buffer, _index); |
544 } else { | 641 } else { |
545 data = new Uint8List(_remainingContent); | 642 data = new Uint8List(_remainingContent); |
546 data.setRange(0, _remainingContent, _buffer, _index); | 643 data.setRange(0, _remainingContent, _buffer, _index); |
547 } | 644 } |
548 | 645 |
549 dataReceived(data); | 646 _bodyController.add(data); |
550 if (_state == _State.CANCELED) continue; | |
551 if (_remainingContent != null) { | 647 if (_remainingContent != null) { |
552 _remainingContent -= data.length; | 648 _remainingContent -= data.length; |
553 } | 649 } |
554 _index += data.length; | 650 _index += data.length; |
555 if (_remainingContent == 0) { | 651 if (_remainingContent == 0) { |
556 if (!_chunked) { | 652 if (!_chunked) { |
557 _bodyEnd(); | 653 _state = _State.START; |
558 if (_state == _State.CANCELED) continue; | 654 _closeIncoming(); |
559 _reset(); | |
560 } else { | 655 } else { |
561 _state = _State.CHUNK_SIZE_STARTING_CR; | 656 _state = _State.CHUNK_SIZE_STARTING_CR; |
562 } | 657 } |
563 } | 658 } |
564 break; | 659 break; |
565 | 660 |
566 case _State.FAILURE: | 661 case _State.FAILURE: |
567 // Should be unreachable. | 662 // Should be unreachable. |
568 assert(false); | 663 assert(false); |
569 break; | 664 break; |
570 | 665 |
571 default: | 666 default: |
572 // Should be unreachable. | 667 // Should be unreachable. |
573 assert(false); | 668 assert(false); |
574 break; | 669 break; |
575 } | 670 } |
576 } | 671 } |
577 } catch (e) { | 672 } catch (e, s) { |
578 _state = _State.FAILURE; | 673 _state = _State.FAILURE; |
579 error(e); | 674 error(new AsyncError(e, s)); |
580 } | 675 } |
581 | 676 |
582 // If all data is parsed or not needed due to failure there is no | 677 _parserCalled = false; |
583 // need to hold on to the buffer. | 678 if (_buffer != null && _index == _buffer.length) { |
584 if (_state != _State.UPGRADED) _releaseBuffer(); | 679 // If all data is parsed release the buffer and resume receiving |
| 680 // data. |
| 681 _releaseBuffer(); |
| 682 if (_state != _State.UPGRADED && _state != _State.FAILURE) { |
| 683 _socketSubscription.resume(); |
| 684 } |
| 685 } |
585 } | 686 } |
586 | 687 |
587 void streamData(List<int> buffer) { | 688 void _onData(List<int> buffer) { |
| 689 _socketSubscription.pause(); |
588 assert(_buffer == null); | 690 assert(_buffer == null); |
589 _buffer = buffer; | 691 _buffer = buffer; |
590 _index = 0; | 692 _index = 0; |
591 _lastIndex = buffer.length; | |
592 _parse(); | 693 _parse(); |
593 } | 694 } |
594 | 695 |
595 void streamDone() { | 696 void _onDone() { |
596 String type() => _requestParser ? "request" : "response"; | 697 // onDone cancles the subscription. |
| 698 _socketSubscription = null; |
| 699 if (_state == _State.CLOSED || _state == _State.FAILURE) return; |
597 | 700 |
| 701 if (_incoming != null) { |
| 702 if (_state != _State.UPGRADED && |
| 703 !(_state == _State.START && !_requestParser) && |
| 704 !(_state == _State.BODY && !_chunked && _transferLength == -1)) { |
| 705 _bodyController.signalError( |
| 706 new AsyncError( |
| 707 new HttpParserException( |
| 708 "Connection closed while receiving data"))); |
| 709 } |
| 710 _closeIncoming(); |
| 711 _controller.close(); |
| 712 return; |
| 713 } |
598 // If the connection is idle the HTTP stream is closed. | 714 // If the connection is idle the HTTP stream is closed. |
599 if (_state == _State.START) { | 715 if (_state == _State.START) { |
600 if (_requestParser) { | 716 if (!_requestParser) { |
601 closed(); | |
602 } else { | |
603 error( | 717 error( |
604 new HttpParserException( | 718 new AsyncError( |
605 "Connection closed before full ${type()} header was received")); | 719 new HttpParserException( |
| 720 "Connection closed before full header was received"))); |
606 } | 721 } |
| 722 _controller.close(); |
607 return; | 723 return; |
608 } | 724 } |
609 | 725 |
| 726 if (_state == _State.UPGRADED) { |
| 727 _controller.close(); |
| 728 return; |
| 729 } |
| 730 |
610 if (_state < _State.FIRST_BODY_STATE) { | 731 if (_state < _State.FIRST_BODY_STATE) { |
611 _state = _State.FAILURE; | 732 _state = _State.FAILURE; |
612 // Report the error through the error callback if any. Otherwise | 733 // Report the error through the error callback if any. Otherwise |
613 // throw the error. | 734 // throw the error. |
614 error( | 735 error( |
615 new HttpParserException( | 736 new AsyncError( |
616 "Connection closed before full ${type()} header was received")); | 737 new HttpParserException( |
| 738 "Connection closed before full header was received"))); |
| 739 _controller.close(); |
617 return; | 740 return; |
618 } | 741 } |
619 | 742 |
620 if (!_chunked && _contentLength == -1) { | 743 if (!_chunked && _transferLength == -1) { |
621 dataEnd(true); | |
622 _state = _State.CLOSED; | 744 _state = _State.CLOSED; |
623 closed(); | |
624 } else { | 745 } else { |
625 _state = _State.FAILURE; | 746 _state = _State.FAILURE; |
626 // Report the error through the error callback if any. Otherwise | 747 // Report the error through the error callback if any. Otherwise |
627 // throw the error. | 748 // throw the error. |
628 error( | 749 error( |
629 new HttpParserException( | 750 new AsyncError( |
630 "Connection closed before full ${type()} body was received")); | 751 new HttpParserException( |
| 752 "Connection closed before full body was received"))); |
631 } | 753 } |
| 754 _controller.close(); |
632 } | 755 } |
633 | 756 |
634 void streamError(e) { | 757 void _onError(e) { |
635 error(e); | 758 _controller.signalError(e); |
636 } | 759 } |
637 | 760 |
638 String get version { | 761 String get version { |
639 switch (_httpVersion) { | 762 switch (_httpVersion) { |
640 case _HttpVersion.HTTP10: | 763 case _HttpVersion.HTTP10: |
641 return "1.0"; | 764 return "1.0"; |
642 case _HttpVersion.HTTP11: | 765 case _HttpVersion.HTTP11: |
643 return "1.1"; | 766 return "1.1"; |
644 } | 767 } |
645 return null; | 768 return null; |
646 } | 769 } |
647 | 770 |
648 void cancel() { | |
649 _state = _State.CANCELED; | |
650 } | |
651 | |
652 void restart() { | |
653 _reset(); | |
654 } | |
655 | |
656 int get messageType => _messageType; | 771 int get messageType => _messageType; |
657 int get contentLength => _contentLength; | 772 int get transferLength => _transferLength; |
658 bool get upgrade => _connectionUpgrade && _state == _State.UPGRADED; | 773 bool get upgrade => _connectionUpgrade && _state == _State.UPGRADED; |
659 bool get persistentConnection => _persistentConnection; | 774 bool get persistentConnection => _persistentConnection; |
660 | 775 |
661 void set responseToMethod(String method) { _responseToMethod = method; } | 776 void set responseToMethod(String method) { _responseToMethod = method; } |
662 | 777 |
| 778 _HttpDetachedIncoming detachIncoming() { |
| 779 var completer = _pauseCompleter; |
| 780 _pauseCompleter = null; |
| 781 return new _HttpDetachedIncoming(_socketSubscription, |
| 782 readUnparsedData(), |
| 783 completer); |
| 784 } |
| 785 |
663 List<int> readUnparsedData() { | 786 List<int> readUnparsedData() { |
664 if (_buffer == null) return []; | 787 if (_buffer == null) return null; |
665 if (_index == _lastIndex) return []; | 788 if (_index == _buffer.length) return null; |
666 var result = _buffer.getRange(_index, _lastIndex - _index); | 789 var result = _buffer.getRange(_index, _buffer.length - _index); |
667 _releaseBuffer(); | 790 _releaseBuffer(); |
668 return result; | 791 return result; |
669 } | 792 } |
670 | 793 |
671 void _bodyEnd() { | |
672 dataEnd(_messageType == _MessageType.RESPONSE && !_persistentConnection); | |
673 } | |
674 | |
675 _reset() { | 794 _reset() { |
| 795 if (_state == _State.UPGRADED) return; |
676 _state = _State.START; | 796 _state = _State.START; |
677 _messageType = _MessageType.UNDETERMINED; | 797 _messageType = _MessageType.UNDETERMINED; |
678 _headerField = new List(); | 798 _headerField = new List(); |
679 _headerValue = new List(); | 799 _headerValue = new List(); |
680 _method_or_status_code = new List(); | 800 _method_or_status_code = new List(); |
681 _uri_or_reason_phrase = new List(); | 801 _uri_or_reason_phrase = new List(); |
682 | 802 |
683 _httpVersion = _HttpVersion.UNDETERMINED; | 803 _httpVersion = _HttpVersion.UNDETERMINED; |
684 _contentLength = -1; | 804 _transferLength = -1; |
685 _persistentConnection = false; | 805 _persistentConnection = false; |
686 _connectionUpgrade = false; | 806 _connectionUpgrade = false; |
687 _chunked = false; | 807 _chunked = false; |
688 | 808 |
| 809 _noMessageBody = false; |
689 _responseToMethod = null; | 810 _responseToMethod = null; |
690 _remainingContent = null; | 811 _remainingContent = null; |
691 | 812 |
692 _headers = new _HttpHeaders(); | 813 _headers = null; |
693 } | 814 } |
694 | 815 |
695 _releaseBuffer() { | 816 _releaseBuffer() { |
696 _buffer = null; | 817 _buffer = null; |
697 _index = null; | 818 _index = null; |
698 _lastIndex = null; | |
699 } | 819 } |
700 | 820 |
701 bool _isTokenChar(int byte) { | 821 bool _isTokenChar(int byte) { |
702 return byte > 31 && byte < 128 && _Const.SEPARATORS.indexOf(byte) == -1; | 822 return byte > 31 && byte < 128 && _Const.SEPARATORS.indexOf(byte) == -1; |
703 } | 823 } |
704 | 824 |
705 List<String> _tokenizeFieldValue(String headerValue) { | 825 List<String> _tokenizeFieldValue(String headerValue) { |
706 List<String> tokens = new List<String>(); | 826 List<String> tokens = new List<String>(); |
707 int start = 0; | 827 int start = 0; |
708 int index = 0; | 828 int index = 0; |
(...skipping 28 matching lines...) Expand all Loading... |
737 return byte - 0x30; // 0 - 9 | 857 return byte - 0x30; // 0 - 9 |
738 } else if (0x41 <= byte && byte <= 0x46) { | 858 } else if (0x41 <= byte && byte <= 0x46) { |
739 return byte - 0x41 + 10; // A - F | 859 return byte - 0x41 + 10; // A - F |
740 } else if (0x61 <= byte && byte <= 0x66) { | 860 } else if (0x61 <= byte && byte <= 0x66) { |
741 return byte - 0x61 + 10; // a - f | 861 return byte - 0x61 + 10; // a - f |
742 } else { | 862 } else { |
743 throw new HttpParserException("Failed to parse HTTP"); | 863 throw new HttpParserException("Failed to parse HTTP"); |
744 } | 864 } |
745 } | 865 } |
746 | 866 |
| 867 void _createIncoming(int transferLength) { |
| 868 assert(_incoming == null); |
| 869 assert(_bodyController == null); |
| 870 _bodyController = new StreamController<List<int>>( |
| 871 onSubscriptionStateChange: _updateParsePauseState, |
| 872 onPauseStateChange: _updateParsePauseState); |
| 873 _incoming = new _HttpIncoming( |
| 874 _headers, transferLength, _bodyController.stream); |
| 875 _pauseParsing(); // Needed to handle detaching - don't start on the body! |
| 876 } |
| 877 |
| 878 void _closeIncoming() { |
| 879 assert(_incoming != null); |
| 880 var tmp = _incoming; |
| 881 _incoming = null; |
| 882 tmp.close(); |
| 883 if (_bodyController != null) { |
| 884 _bodyController.close(); |
| 885 _bodyController = null; |
| 886 } |
| 887 _updateParsePauseState(); |
| 888 } |
| 889 |
| 890 void _continueParsing() { |
| 891 _paused = false; |
| 892 if (!_parserCalled && _buffer != null) _parse(); |
| 893 } |
| 894 |
| 895 void _pauseParsing() { |
| 896 _paused = true; |
| 897 } |
| 898 |
| 899 void _updateParsePauseState() { |
| 900 if (_bodyController != null) { |
| 901 if (_bodyController.hasSubscribers && !_bodyController.isPaused) { |
| 902 _continueParsing(); |
| 903 } else { |
| 904 _pauseParsing(); |
| 905 } |
| 906 } else { |
| 907 if (_controller.hasSubscribers && !_controller.isPaused) { |
| 908 _continueParsing(); |
| 909 } else { |
| 910 _pauseParsing(); |
| 911 } |
| 912 } |
| 913 } |
| 914 |
| 915 void error(error) { |
| 916 if (_socketSubscription != null) _socketSubscription.cancel(); |
| 917 _state = _State.FAILURE; |
| 918 _controller.signalError(error); |
| 919 _controller.close(); |
| 920 } |
| 921 |
| 922 // State. |
| 923 bool _parserCalled = false; |
| 924 |
747 // The data that is currently being parsed. | 925 // The data that is currently being parsed. |
748 List<int> _buffer; | 926 List<int> _buffer; |
749 int _index; | 927 int _index; |
750 int _lastIndex; | |
751 | 928 |
752 bool _requestParser; | 929 final bool _requestParser; |
753 int _state; | 930 int _state; |
754 int _httpVersionIndex; | 931 int _httpVersionIndex; |
755 int _messageType; | 932 int _messageType; |
756 int _statusCode; | 933 int _statusCode; |
757 List _method_or_status_code; | 934 List _method_or_status_code; |
758 List _uri_or_reason_phrase; | 935 List _uri_or_reason_phrase; |
759 List _headerField; | 936 List _headerField; |
760 List _headerValue; | 937 List _headerValue; |
761 | 938 |
762 int _httpVersion; | 939 int _httpVersion; |
763 int _contentLength; | 940 int _transferLength; |
764 bool _persistentConnection; | 941 bool _persistentConnection; |
765 bool _connectionUpgrade; | 942 bool _connectionUpgrade; |
766 bool _chunked; | 943 bool _chunked; |
767 | 944 |
| 945 bool _noMessageBody; |
768 String _responseToMethod; // Indicates the method used for the request. | 946 String _responseToMethod; // Indicates the method used for the request. |
769 int _remainingContent; | 947 int _remainingContent; |
770 | 948 |
771 _HttpHeaders _headers = new _HttpHeaders(); | 949 _HttpHeaders _headers; |
772 | 950 |
773 // Callbacks. | 951 // The current incoming connection. |
774 Function requestStart; | 952 _HttpIncoming _incoming; |
775 Function responseStart; | 953 StreamSubscription _socketSubscription; |
776 Function dataReceived; | 954 bool _paused = false; |
777 Function dataEnd; | 955 Completer _pauseCompleter; |
778 Function error; | 956 StreamController<_HttpIncoming> _controller; |
779 Function closed; | 957 StreamController<List<int>> _bodyController; |
780 } | 958 } |
781 | 959 |
782 | 960 |
783 class HttpParserException implements Exception { | 961 class HttpParserException implements Exception { |
784 const HttpParserException([String this.message = ""]); | 962 const HttpParserException([String this.message = ""]); |
785 String toString() => "HttpParserException: $message"; | 963 String toString() => "HttpParserException: $message"; |
786 final String message; | 964 final String message; |
787 } | 965 } |
OLD | NEW |