Index: sdk/lib/io/http_parser.dart |
diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart |
index c0a2819ae8aaf1f7c1a77417e33730ebb9aa7a6e..587d89e67ca39f0fd3e0e09cb7f79e9f75171720 100644 |
--- a/sdk/lib/io/http_parser.dart |
+++ b/sdk/lib/io/http_parser.dart |
@@ -74,11 +74,9 @@ class _State { |
static const int BODY = 24; |
static const int CLOSED = 25; |
static const int UPGRADED = 26; |
- static const int CANCELED = 27; |
- static const int FAILURE = 28; |
+ static const int FAILURE = 27; |
static const int FIRST_BODY_STATE = CHUNK_SIZE_STARTING_CR; |
- static const int FIRST_PARSE_STOP_STATE = CLOSED; |
} |
// HTTP version of the request or response being parsed. |
@@ -95,22 +93,87 @@ class _MessageType { |
static const int RESPONSE = 0; |
} |
+class _HttpDetachedIncoming extends Stream<List<int>> { |
+ StreamController<List<int>> controller; |
+ final StreamSubscription subscription; |
+ |
+ List<int> carryOverData; |
+ bool paused; |
+ |
+ Completer resumeCompleter; |
+ |
+ _HttpDetachedIncoming(StreamSubscription this.subscription, |
+ List<int> this.carryOverData, |
+ Completer oldResumeCompleter) { |
+ controller = new StreamController<List<int>>( |
+ onSubscriptionStateChange: onSubscriptionStateChange, |
+ onPauseStateChange: onPauseStateChange); |
+ pause(); |
+ if (oldResumeCompleter != null) oldResumeCompleter.complete(); |
+ subscription.resume(); |
+ subscription.onData(controller.add); |
+ subscription.onDone(controller.close); |
+ subscription.onError(controller.signalError); |
+ } |
+ |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return controller.stream.listen( |
+ onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
+ } |
+ |
+ void resume() { |
+ paused = false; |
+ if (carryOverData != null) { |
+ var data = carryOverData; |
+ carryOverData = null; |
+ controller.add(data); |
+ // If the consumer pauses again after the carry-over data, we'll not |
+ // continue our subscriber until the next resume. |
+ if (paused) return; |
+ } |
+ if (resumeCompleter != null) { |
+ resumeCompleter.complete(); |
+ resumeCompleter = null; |
+ } |
+ } |
+ |
+ void pause() { |
+ paused = true; |
+ if (resumeCompleter == null) { |
+ resumeCompleter = new Completer(); |
+ subscription.pause(resumeCompleter.future); |
+ } |
+ } |
+ |
+ void onPauseStateChange() { |
+ if (controller.isPaused) { |
+ pause(); |
+ } else { |
+ resume(); |
+ } |
+ } |
+ |
+ void onSubscriptionStateChange() { |
+ if (controller.hasSubscribers) { |
+ resume(); |
+ } else { |
+ subscription.cancel(); |
+ } |
+ } |
+} |
+ |
/** |
- * HTTP parser which parses the HTTP stream as data is supplied |
- * through the [:streamData:] and [:streamDone:] methods. As the |
- * data is parsed the following callbacks are called: |
- * |
- * [:requestStart:] |
- * [:responseStart:] |
- * [:dataReceived:] |
- * [:dataEnd:] |
- * [:closed:] |
- * [:error:] |
+ * HTTP parser which parses the data stream given to [consume]. |
* |
- * If an HTTP parser error occours it is possible to get an exception |
- * thrown from the [:streamData:] and [:streamDone:] methods if |
- * the error callback is not set. |
+ * If an HTTP parser error occours, the parser will signal an error to either |
+ * the current _HttpIncoming or the _parser itself. |
* |
* The connection upgrades (e.g. switching from HTTP/1.1 to the |
* WebSocket protocol) is handled in a special way. If connection |
@@ -126,16 +189,53 @@ class _MessageType { |
* and should be handled according to whatever protocol is being |
* upgraded to. |
*/ |
-class _HttpParser { |
- _HttpParser.requestParser() { |
- _requestParser = true; |
- _reset(); |
+class _HttpParser |
+ extends Stream<_HttpIncoming> |
+ implements StreamConsumer<List<int>, _HttpParser> { |
+ |
+ factory _HttpParser.requestParser() { |
+ return new _HttpParser._(true); |
} |
- _HttpParser.responseParser() { |
- _requestParser = false; |
+ |
+ factory _HttpParser.responseParser() { |
+ return new _HttpParser._(false); |
+ } |
+ |
+ _HttpParser._(this._requestParser) { |
+ _controller = new StreamController<_HttpIncoming>( |
+ onSubscriptionStateChange: _updateParsePauseState, |
+ onPauseStateChange: _updateParsePauseState); |
_reset(); |
} |
+ |
+ StreamSubscription<_HttpIncoming> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _controller.stream.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
+ } |
+ |
+ Future<_HttpParser> consume(Stream<List<int>> stream) { |
+ // Listen to the stream and handle data accordingly. When a |
+ // _HttpIncoming is created, _dataPause, _dataResume, _dataDone is |
+ // given to provide a way of controlling the parser. |
+ // TODO(ajohnsen): Remove _dataPause, _dataResume and _dataDone and clean up |
+ // how the _HttpIncoming signals the parser. |
+ var completer = new Completer(); |
+ _socketSubscription = stream.listen( |
+ _onData, |
+ onError: _onError, |
+ onDone: () { |
+ _onDone(); |
+ completer.complete(this); |
+ }); |
+ return completer.future; |
+ } |
+ |
// From RFC 2616. |
// generic-message = start-line |
// *(message-header CRLF) |
@@ -146,22 +246,23 @@ class _HttpParser { |
// Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF |
// message-header = field-name ":" [ field-value ] |
void _parse() { |
+ assert(!_parserCalled); |
+ _parserCalled = true; |
try { |
if (_state == _State.CLOSED) { |
throw new HttpParserException("Data on closed connection"); |
} |
- if (_state == _State.UPGRADED) { |
- throw new HttpParserException("Data on upgraded connection"); |
- } |
if (_state == _State.FAILURE) { |
throw new HttpParserException("Data on failed connection"); |
} |
- if (_state == _State.CANCELED) { |
- throw new HttpParserException("Data on canceled connection"); |
- } |
while (_buffer != null && |
- _index < _lastIndex && |
- _state <= _State.FIRST_PARSE_STOP_STATE) { |
+ _index < _buffer.length && |
+ _state != _State.FAILURE && |
+ _state != _State.UPGRADED) { |
+ if (_paused) { |
+ _parserCalled = false; |
+ return; |
+ } |
int byte = _buffer[_index++]; |
switch (_state) { |
case _State.START: |
@@ -328,15 +429,20 @@ class _HttpParser { |
case _State.RESPONSE_LINE_ENDING: |
_expect(byte, _CharCode.LF); |
_messageType == _MessageType.RESPONSE; |
- _statusCode = int.parse( |
- new String.fromCharCodes(_method_or_status_code)); |
+ _statusCode = int.parse( |
+ new String.fromCharCodes(_method_or_status_code)); |
if (_statusCode < 100 || _statusCode > 599) { |
throw new HttpParserException("Invalid response status code"); |
+ } else { |
+ // Check whether this response will never have a body. |
+ _noMessageBody = _statusCode <= 199 || _statusCode == 204 || |
+ _statusCode == 304; |
} |
_state = _State.HEADER_START; |
break; |
case _State.HEADER_START: |
+ _headers = new _HttpHeaders(version); |
if (byte == _CharCode.CR) { |
_state = _State.HEADER_ENDING; |
} else { |
@@ -386,27 +492,19 @@ class _HttpParser { |
} else { |
String headerField = new String.fromCharCodes(_headerField); |
String headerValue = new String.fromCharCodes(_headerValue); |
- bool reportHeader = true; |
+ if (headerField == "transfer-encoding" && |
+ headerValue.toLowerCase() == "chunked") { |
+ _chunked = true; |
+ } |
if (headerField == "connection") { |
List<String> tokens = _tokenizeFieldValue(headerValue); |
for (int i = 0; i < tokens.length; i++) { |
- String token = tokens[i].toLowerCase(); |
- if (token == "keep-alive") { |
- _persistentConnection = true; |
- } else if (token == "close") { |
- _persistentConnection = false; |
- } else if (token == "upgrade") { |
+ if (tokens[i].toLowerCase() == "upgrade") { |
_connectionUpgrade = true; |
} |
- _headers.add(headerField, token); |
- |
+ _headers.add(headerField, tokens[i]); |
} |
- reportHeader = false; |
- } else if (headerField == "transfer-encoding" && |
- headerValue.toLowerCase() == "chunked") { |
- _chunked = true; |
- } |
- if (reportHeader) { |
+ } else { |
_headers.add(headerField, headerValue); |
} |
_headerField.clear(); |
@@ -426,62 +524,62 @@ class _HttpParser { |
_expect(byte, _CharCode.LF); |
_headers._mutable = false; |
- _contentLength = _headers.contentLength; |
+ _transferLength = _headers.contentLength; |
// Ignore the Content-Length header if Transfer-Encoding |
// is chunked (RFC 2616 section 4.4) |
- if (_chunked) _contentLength = -1; |
+ if (_chunked) _transferLength = -1; |
// If a request message has neither Content-Length nor |
// Transfer-Encoding the message must not have a body (RFC |
// 2616 section 4.3). |
if (_messageType == _MessageType.REQUEST && |
- _contentLength < 0 && |
+ _transferLength < 0 && |
_chunked == false) { |
- _contentLength = 0; |
+ _transferLength = 0; |
} |
if (_connectionUpgrade) { |
_state = _State.UPGRADED; |
+ _transferLength = 0; |
} |
- var noBody; |
+ _createIncoming(_transferLength); |
if (_requestParser) { |
- noBody = _contentLength == 0; |
- requestStart(new String.fromCharCodes(_method_or_status_code), |
- new String.fromCharCodes(_uri_or_reason_phrase), |
- version, |
- _headers, |
- !noBody); |
+ _incoming.method = |
+ new String.fromCharCodes(_method_or_status_code); |
+ _incoming.uri = |
+ Uri.parse( |
+ new String.fromCharCodes(_uri_or_reason_phrase)); |
} else { |
- // Check whether this response will never have a body. |
- noBody = _contentLength == 0 || |
- _statusCode <= 199 || |
- _statusCode == HttpStatus.NO_CONTENT || |
- _statusCode == HttpStatus.NOT_MODIFIED || |
- _responseToMethod == "HEAD"; |
- responseStart(_statusCode, |
- new String.fromCharCodes(_uri_or_reason_phrase), |
- version, |
- _headers, |
- !noBody); |
+ _incoming.statusCode = _statusCode; |
+ _incoming.reasonPhrase = |
+ new String.fromCharCodes(_uri_or_reason_phrase); |
} |
_method_or_status_code.clear(); |
_uri_or_reason_phrase.clear(); |
- if (_state == _State.CANCELED) continue; |
- if (!_connectionUpgrade) { |
- if (noBody) { |
- _bodyEnd(); |
- _reset(); |
- } else if (_chunked) { |
- _state = _State.CHUNK_SIZE; |
- _remainingContent = 0; |
- } else if (_contentLength > 0) { |
- _remainingContent = _contentLength; |
- _state = _State.BODY; |
- } else { |
- // Neither chunked nor content length. End of body |
- // indicated by close. |
- _state = _State.BODY; |
- } |
+ if (_connectionUpgrade) { |
+ _incoming.upgraded = true; |
+ _controller.add(_incoming); |
+ break; |
+ } |
+ if (_chunked) { |
+ _state = _State.CHUNK_SIZE; |
+ _remainingContent = 0; |
+ } else if (_transferLength == 0 || |
+ (_messageType == _MessageType.RESPONSE && |
+ (_noMessageBody || _responseToMethod == "HEAD"))) { |
+ _state = _State.START; |
+ var tmp = _incoming; |
+ _closeIncoming(); |
+ _controller.add(tmp); |
+ break; |
+ } else if (_transferLength > 0) { |
+ _remainingContent = _transferLength; |
+ _state = _State.BODY; |
+ } else { |
+ // Neither chunked nor content length. End of body |
+ // indicated by close. |
+ _state = _State.BODY; |
} |
+ _controller.add(_incoming); |
break; |
case _State.CHUNK_SIZE_STARTING_CR: |
@@ -527,15 +625,14 @@ class _HttpParser { |
case _State.CHUNKED_BODY_DONE_LF: |
_expect(byte, _CharCode.LF); |
- _bodyEnd(); |
- if (_state == _State.CANCELED) continue; |
- _reset(); |
+ _state = _State.START; |
+ _closeIncoming(); |
break; |
case _State.BODY: |
// The body is not handled one byte at a time but in blocks. |
_index--; |
- int dataAvailable = _lastIndex - _index; |
+ int dataAvailable = _buffer.length - _index; |
List<int> data; |
if (_remainingContent == null || |
dataAvailable <= _remainingContent) { |
@@ -546,17 +643,15 @@ class _HttpParser { |
data.setRange(0, _remainingContent, _buffer, _index); |
} |
- dataReceived(data); |
- if (_state == _State.CANCELED) continue; |
+ _bodyController.add(data); |
if (_remainingContent != null) { |
_remainingContent -= data.length; |
} |
_index += data.length; |
if (_remainingContent == 0) { |
if (!_chunked) { |
- _bodyEnd(); |
- if (_state == _State.CANCELED) continue; |
- _reset(); |
+ _state = _State.START; |
+ _closeIncoming(); |
} else { |
_state = _State.CHUNK_SIZE_STARTING_CR; |
} |
@@ -574,36 +669,62 @@ class _HttpParser { |
break; |
} |
} |
- } catch (e) { |
+ } catch (e, s) { |
_state = _State.FAILURE; |
- error(e); |
+ error(new AsyncError(e, s)); |
} |
- // If all data is parsed or not needed due to failure there is no |
- // need to hold on to the buffer. |
- if (_state != _State.UPGRADED) _releaseBuffer(); |
+ _parserCalled = false; |
+ if (_buffer != null && _index == _buffer.length) { |
+ // If all data is parsed release the buffer and resume receiving |
+ // data. |
+ _releaseBuffer(); |
+ if (_state != _State.UPGRADED && _state != _State.FAILURE) { |
+ _socketSubscription.resume(); |
+ } |
+ } |
} |
- void streamData(List<int> buffer) { |
+ void _onData(List<int> buffer) { |
+ _socketSubscription.pause(); |
assert(_buffer == null); |
_buffer = buffer; |
_index = 0; |
- _lastIndex = buffer.length; |
_parse(); |
} |
- void streamDone() { |
- String type() => _requestParser ? "request" : "response"; |
- |
+ void _onDone() { |
+ // onDone cancles the subscription. |
+ _socketSubscription = null; |
+ if (_state == _State.CLOSED || _state == _State.FAILURE) return; |
+ |
+ if (_incoming != null) { |
+ if (_state != _State.UPGRADED && |
+ !(_state == _State.START && !_requestParser) && |
+ !(_state == _State.BODY && !_chunked && _transferLength == -1)) { |
+ _bodyController.signalError( |
+ new AsyncError( |
+ new HttpParserException( |
+ "Connection closed while receiving data"))); |
+ } |
+ _closeIncoming(); |
+ _controller.close(); |
+ return; |
+ } |
// If the connection is idle the HTTP stream is closed. |
if (_state == _State.START) { |
- if (_requestParser) { |
- closed(); |
- } else { |
+ if (!_requestParser) { |
error( |
- new HttpParserException( |
- "Connection closed before full ${type()} header was received")); |
+ new AsyncError( |
+ new HttpParserException( |
+ "Connection closed before full header was received"))); |
} |
+ _controller.close(); |
+ return; |
+ } |
+ |
+ if (_state == _State.UPGRADED) { |
+ _controller.close(); |
return; |
} |
@@ -612,27 +733,29 @@ class _HttpParser { |
// Report the error through the error callback if any. Otherwise |
// throw the error. |
error( |
- new HttpParserException( |
- "Connection closed before full ${type()} header was received")); |
+ new AsyncError( |
+ new HttpParserException( |
+ "Connection closed before full header was received"))); |
+ _controller.close(); |
return; |
} |
- if (!_chunked && _contentLength == -1) { |
- dataEnd(true); |
+ if (!_chunked && _transferLength == -1) { |
_state = _State.CLOSED; |
- closed(); |
} else { |
_state = _State.FAILURE; |
// Report the error through the error callback if any. Otherwise |
// throw the error. |
error( |
- new HttpParserException( |
- "Connection closed before full ${type()} body was received")); |
+ new AsyncError( |
+ new HttpParserException( |
+ "Connection closed before full body was received"))); |
} |
+ _controller.close(); |
} |
- void streamError(e) { |
- error(e); |
+ void _onError(e) { |
+ _controller.signalError(e); |
} |
String get version { |
@@ -645,34 +768,31 @@ class _HttpParser { |
return null; |
} |
- void cancel() { |
- _state = _State.CANCELED; |
- } |
- |
- void restart() { |
- _reset(); |
- } |
- |
int get messageType => _messageType; |
- int get contentLength => _contentLength; |
+ int get transferLength => _transferLength; |
bool get upgrade => _connectionUpgrade && _state == _State.UPGRADED; |
bool get persistentConnection => _persistentConnection; |
void set responseToMethod(String method) { _responseToMethod = method; } |
+ _HttpDetachedIncoming detachIncoming() { |
+ var completer = _pauseCompleter; |
+ _pauseCompleter = null; |
+ return new _HttpDetachedIncoming(_socketSubscription, |
+ readUnparsedData(), |
+ completer); |
+ } |
+ |
List<int> readUnparsedData() { |
- if (_buffer == null) return []; |
- if (_index == _lastIndex) return []; |
- var result = _buffer.getRange(_index, _lastIndex - _index); |
+ if (_buffer == null) return null; |
+ if (_index == _buffer.length) return null; |
+ var result = _buffer.getRange(_index, _buffer.length - _index); |
_releaseBuffer(); |
return result; |
} |
- void _bodyEnd() { |
- dataEnd(_messageType == _MessageType.RESPONSE && !_persistentConnection); |
- } |
- |
_reset() { |
+ if (_state == _State.UPGRADED) return; |
_state = _State.START; |
_messageType = _MessageType.UNDETERMINED; |
_headerField = new List(); |
@@ -681,21 +801,21 @@ class _HttpParser { |
_uri_or_reason_phrase = new List(); |
_httpVersion = _HttpVersion.UNDETERMINED; |
- _contentLength = -1; |
+ _transferLength = -1; |
_persistentConnection = false; |
_connectionUpgrade = false; |
_chunked = false; |
+ _noMessageBody = false; |
_responseToMethod = null; |
_remainingContent = null; |
- _headers = new _HttpHeaders(); |
+ _headers = null; |
} |
_releaseBuffer() { |
_buffer = null; |
_index = null; |
- _lastIndex = null; |
} |
bool _isTokenChar(int byte) { |
@@ -744,12 +864,69 @@ class _HttpParser { |
} |
} |
+ void _createIncoming(int transferLength) { |
+ assert(_incoming == null); |
+ assert(_bodyController == null); |
+ _bodyController = new StreamController<List<int>>( |
+ onSubscriptionStateChange: _updateParsePauseState, |
+ onPauseStateChange: _updateParsePauseState); |
+ _incoming = new _HttpIncoming( |
+ _headers, transferLength, _bodyController.stream); |
+ _pauseParsing(); // Needed to handle detaching - don't start on the body! |
+ } |
+ |
+ void _closeIncoming() { |
+ assert(_incoming != null); |
+ var tmp = _incoming; |
+ _incoming = null; |
+ tmp.close(); |
+ if (_bodyController != null) { |
+ _bodyController.close(); |
+ _bodyController = null; |
+ } |
+ _updateParsePauseState(); |
+ } |
+ |
+ void _continueParsing() { |
+ _paused = false; |
+ if (!_parserCalled && _buffer != null) _parse(); |
+ } |
+ |
+ void _pauseParsing() { |
+ _paused = true; |
+ } |
+ |
+ void _updateParsePauseState() { |
+ if (_bodyController != null) { |
+ if (_bodyController.hasSubscribers && !_bodyController.isPaused) { |
+ _continueParsing(); |
+ } else { |
+ _pauseParsing(); |
+ } |
+ } else { |
+ if (_controller.hasSubscribers && !_controller.isPaused) { |
+ _continueParsing(); |
+ } else { |
+ _pauseParsing(); |
+ } |
+ } |
+ } |
+ |
+ void error(error) { |
+ if (_socketSubscription != null) _socketSubscription.cancel(); |
+ _state = _State.FAILURE; |
+ _controller.signalError(error); |
+ _controller.close(); |
+ } |
+ |
+ // State. |
+ bool _parserCalled = false; |
+ |
// The data that is currently being parsed. |
List<int> _buffer; |
int _index; |
- int _lastIndex; |
- bool _requestParser; |
+ final bool _requestParser; |
int _state; |
int _httpVersionIndex; |
int _messageType; |
@@ -760,23 +937,24 @@ class _HttpParser { |
List _headerValue; |
int _httpVersion; |
- int _contentLength; |
+ int _transferLength; |
bool _persistentConnection; |
bool _connectionUpgrade; |
bool _chunked; |
+ bool _noMessageBody; |
String _responseToMethod; // Indicates the method used for the request. |
int _remainingContent; |
- _HttpHeaders _headers = new _HttpHeaders(); |
+ _HttpHeaders _headers; |
- // Callbacks. |
- Function requestStart; |
- Function responseStart; |
- Function dataReceived; |
- Function dataEnd; |
- Function error; |
- Function closed; |
+ // The current incoming connection. |
+ _HttpIncoming _incoming; |
+ StreamSubscription _socketSubscription; |
+ bool _paused = false; |
+ Completer _pauseCompleter; |
+ StreamController<_HttpIncoming> _controller; |
+ StreamController<List<int>> _bodyController; |
} |