Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(433)

Unified Diff: sdk/lib/io/http_parser.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/io/http_session.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/http_parser.dart
diff --git a/sdk/lib/io/http_parser.dart b/sdk/lib/io/http_parser.dart
index c0a2819ae8aaf1f7c1a77417e33730ebb9aa7a6e..587d89e67ca39f0fd3e0e09cb7f79e9f75171720 100644
--- a/sdk/lib/io/http_parser.dart
+++ b/sdk/lib/io/http_parser.dart
@@ -74,11 +74,9 @@ class _State {
static const int BODY = 24;
static const int CLOSED = 25;
static const int UPGRADED = 26;
- static const int CANCELED = 27;
- static const int FAILURE = 28;
+ static const int FAILURE = 27;
static const int FIRST_BODY_STATE = CHUNK_SIZE_STARTING_CR;
- static const int FIRST_PARSE_STOP_STATE = CLOSED;
}
// HTTP version of the request or response being parsed.
@@ -95,22 +93,87 @@ class _MessageType {
static const int RESPONSE = 0;
}
+class _HttpDetachedIncoming extends Stream<List<int>> {
+ StreamController<List<int>> controller;
+ final StreamSubscription subscription;
+
+ List<int> carryOverData;
+ bool paused;
+
+ Completer resumeCompleter;
+
+ _HttpDetachedIncoming(StreamSubscription this.subscription,
+ List<int> this.carryOverData,
+ Completer oldResumeCompleter) {
+ controller = new StreamController<List<int>>(
+ onSubscriptionStateChange: onSubscriptionStateChange,
+ onPauseStateChange: onPauseStateChange);
+ pause();
+ if (oldResumeCompleter != null) oldResumeCompleter.complete();
+ subscription.resume();
+ subscription.onData(controller.add);
+ subscription.onDone(controller.close);
+ subscription.onError(controller.signalError);
+ }
+
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return controller.stream.listen(
+ onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
+ }
+
+ void resume() {
+ paused = false;
+ if (carryOverData != null) {
+ var data = carryOverData;
+ carryOverData = null;
+ controller.add(data);
+ // If the consumer pauses again after the carry-over data, we'll not
+ // continue our subscriber until the next resume.
+ if (paused) return;
+ }
+ if (resumeCompleter != null) {
+ resumeCompleter.complete();
+ resumeCompleter = null;
+ }
+ }
+
+ void pause() {
+ paused = true;
+ if (resumeCompleter == null) {
+ resumeCompleter = new Completer();
+ subscription.pause(resumeCompleter.future);
+ }
+ }
+
+ void onPauseStateChange() {
+ if (controller.isPaused) {
+ pause();
+ } else {
+ resume();
+ }
+ }
+
+ void onSubscriptionStateChange() {
+ if (controller.hasSubscribers) {
+ resume();
+ } else {
+ subscription.cancel();
+ }
+ }
+}
+
/**
- * HTTP parser which parses the HTTP stream as data is supplied
- * through the [:streamData:] and [:streamDone:] methods. As the
- * data is parsed the following callbacks are called:
- *
- * [:requestStart:]
- * [:responseStart:]
- * [:dataReceived:]
- * [:dataEnd:]
- * [:closed:]
- * [:error:]
+ * HTTP parser which parses the data stream given to [consume].
*
- * If an HTTP parser error occours it is possible to get an exception
- * thrown from the [:streamData:] and [:streamDone:] methods if
- * the error callback is not set.
+ * If an HTTP parser error occours, the parser will signal an error to either
+ * the current _HttpIncoming or the _parser itself.
*
* The connection upgrades (e.g. switching from HTTP/1.1 to the
* WebSocket protocol) is handled in a special way. If connection
@@ -126,16 +189,53 @@ class _MessageType {
* and should be handled according to whatever protocol is being
* upgraded to.
*/
-class _HttpParser {
- _HttpParser.requestParser() {
- _requestParser = true;
- _reset();
+class _HttpParser
+ extends Stream<_HttpIncoming>
+ implements StreamConsumer<List<int>, _HttpParser> {
+
+ factory _HttpParser.requestParser() {
+ return new _HttpParser._(true);
}
- _HttpParser.responseParser() {
- _requestParser = false;
+
+ factory _HttpParser.responseParser() {
+ return new _HttpParser._(false);
+ }
+
+ _HttpParser._(this._requestParser) {
+ _controller = new StreamController<_HttpIncoming>(
+ onSubscriptionStateChange: _updateParsePauseState,
+ onPauseStateChange: _updateParsePauseState);
_reset();
}
+
+ StreamSubscription<_HttpIncoming> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _controller.stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
+ }
+
+ Future<_HttpParser> consume(Stream<List<int>> stream) {
+ // Listen to the stream and handle data accordingly. When a
+ // _HttpIncoming is created, _dataPause, _dataResume, _dataDone is
+ // given to provide a way of controlling the parser.
+ // TODO(ajohnsen): Remove _dataPause, _dataResume and _dataDone and clean up
+ // how the _HttpIncoming signals the parser.
+ var completer = new Completer();
+ _socketSubscription = stream.listen(
+ _onData,
+ onError: _onError,
+ onDone: () {
+ _onDone();
+ completer.complete(this);
+ });
+ return completer.future;
+ }
+
// From RFC 2616.
// generic-message = start-line
// *(message-header CRLF)
@@ -146,22 +246,23 @@ class _HttpParser {
// Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF
// message-header = field-name ":" [ field-value ]
void _parse() {
+ assert(!_parserCalled);
+ _parserCalled = true;
try {
if (_state == _State.CLOSED) {
throw new HttpParserException("Data on closed connection");
}
- if (_state == _State.UPGRADED) {
- throw new HttpParserException("Data on upgraded connection");
- }
if (_state == _State.FAILURE) {
throw new HttpParserException("Data on failed connection");
}
- if (_state == _State.CANCELED) {
- throw new HttpParserException("Data on canceled connection");
- }
while (_buffer != null &&
- _index < _lastIndex &&
- _state <= _State.FIRST_PARSE_STOP_STATE) {
+ _index < _buffer.length &&
+ _state != _State.FAILURE &&
+ _state != _State.UPGRADED) {
+ if (_paused) {
+ _parserCalled = false;
+ return;
+ }
int byte = _buffer[_index++];
switch (_state) {
case _State.START:
@@ -328,15 +429,20 @@ class _HttpParser {
case _State.RESPONSE_LINE_ENDING:
_expect(byte, _CharCode.LF);
_messageType == _MessageType.RESPONSE;
- _statusCode = int.parse(
- new String.fromCharCodes(_method_or_status_code));
+ _statusCode = int.parse(
+ new String.fromCharCodes(_method_or_status_code));
if (_statusCode < 100 || _statusCode > 599) {
throw new HttpParserException("Invalid response status code");
+ } else {
+ // Check whether this response will never have a body.
+ _noMessageBody = _statusCode <= 199 || _statusCode == 204 ||
+ _statusCode == 304;
}
_state = _State.HEADER_START;
break;
case _State.HEADER_START:
+ _headers = new _HttpHeaders(version);
if (byte == _CharCode.CR) {
_state = _State.HEADER_ENDING;
} else {
@@ -386,27 +492,19 @@ class _HttpParser {
} else {
String headerField = new String.fromCharCodes(_headerField);
String headerValue = new String.fromCharCodes(_headerValue);
- bool reportHeader = true;
+ if (headerField == "transfer-encoding" &&
+ headerValue.toLowerCase() == "chunked") {
+ _chunked = true;
+ }
if (headerField == "connection") {
List<String> tokens = _tokenizeFieldValue(headerValue);
for (int i = 0; i < tokens.length; i++) {
- String token = tokens[i].toLowerCase();
- if (token == "keep-alive") {
- _persistentConnection = true;
- } else if (token == "close") {
- _persistentConnection = false;
- } else if (token == "upgrade") {
+ if (tokens[i].toLowerCase() == "upgrade") {
_connectionUpgrade = true;
}
- _headers.add(headerField, token);
-
+ _headers.add(headerField, tokens[i]);
}
- reportHeader = false;
- } else if (headerField == "transfer-encoding" &&
- headerValue.toLowerCase() == "chunked") {
- _chunked = true;
- }
- if (reportHeader) {
+ } else {
_headers.add(headerField, headerValue);
}
_headerField.clear();
@@ -426,62 +524,62 @@ class _HttpParser {
_expect(byte, _CharCode.LF);
_headers._mutable = false;
- _contentLength = _headers.contentLength;
+ _transferLength = _headers.contentLength;
// Ignore the Content-Length header if Transfer-Encoding
// is chunked (RFC 2616 section 4.4)
- if (_chunked) _contentLength = -1;
+ if (_chunked) _transferLength = -1;
// If a request message has neither Content-Length nor
// Transfer-Encoding the message must not have a body (RFC
// 2616 section 4.3).
if (_messageType == _MessageType.REQUEST &&
- _contentLength < 0 &&
+ _transferLength < 0 &&
_chunked == false) {
- _contentLength = 0;
+ _transferLength = 0;
}
if (_connectionUpgrade) {
_state = _State.UPGRADED;
+ _transferLength = 0;
}
- var noBody;
+ _createIncoming(_transferLength);
if (_requestParser) {
- noBody = _contentLength == 0;
- requestStart(new String.fromCharCodes(_method_or_status_code),
- new String.fromCharCodes(_uri_or_reason_phrase),
- version,
- _headers,
- !noBody);
+ _incoming.method =
+ new String.fromCharCodes(_method_or_status_code);
+ _incoming.uri =
+ Uri.parse(
+ new String.fromCharCodes(_uri_or_reason_phrase));
} else {
- // Check whether this response will never have a body.
- noBody = _contentLength == 0 ||
- _statusCode <= 199 ||
- _statusCode == HttpStatus.NO_CONTENT ||
- _statusCode == HttpStatus.NOT_MODIFIED ||
- _responseToMethod == "HEAD";
- responseStart(_statusCode,
- new String.fromCharCodes(_uri_or_reason_phrase),
- version,
- _headers,
- !noBody);
+ _incoming.statusCode = _statusCode;
+ _incoming.reasonPhrase =
+ new String.fromCharCodes(_uri_or_reason_phrase);
}
_method_or_status_code.clear();
_uri_or_reason_phrase.clear();
- if (_state == _State.CANCELED) continue;
- if (!_connectionUpgrade) {
- if (noBody) {
- _bodyEnd();
- _reset();
- } else if (_chunked) {
- _state = _State.CHUNK_SIZE;
- _remainingContent = 0;
- } else if (_contentLength > 0) {
- _remainingContent = _contentLength;
- _state = _State.BODY;
- } else {
- // Neither chunked nor content length. End of body
- // indicated by close.
- _state = _State.BODY;
- }
+ if (_connectionUpgrade) {
+ _incoming.upgraded = true;
+ _controller.add(_incoming);
+ break;
+ }
+ if (_chunked) {
+ _state = _State.CHUNK_SIZE;
+ _remainingContent = 0;
+ } else if (_transferLength == 0 ||
+ (_messageType == _MessageType.RESPONSE &&
+ (_noMessageBody || _responseToMethod == "HEAD"))) {
+ _state = _State.START;
+ var tmp = _incoming;
+ _closeIncoming();
+ _controller.add(tmp);
+ break;
+ } else if (_transferLength > 0) {
+ _remainingContent = _transferLength;
+ _state = _State.BODY;
+ } else {
+ // Neither chunked nor content length. End of body
+ // indicated by close.
+ _state = _State.BODY;
}
+ _controller.add(_incoming);
break;
case _State.CHUNK_SIZE_STARTING_CR:
@@ -527,15 +625,14 @@ class _HttpParser {
case _State.CHUNKED_BODY_DONE_LF:
_expect(byte, _CharCode.LF);
- _bodyEnd();
- if (_state == _State.CANCELED) continue;
- _reset();
+ _state = _State.START;
+ _closeIncoming();
break;
case _State.BODY:
// The body is not handled one byte at a time but in blocks.
_index--;
- int dataAvailable = _lastIndex - _index;
+ int dataAvailable = _buffer.length - _index;
List<int> data;
if (_remainingContent == null ||
dataAvailable <= _remainingContent) {
@@ -546,17 +643,15 @@ class _HttpParser {
data.setRange(0, _remainingContent, _buffer, _index);
}
- dataReceived(data);
- if (_state == _State.CANCELED) continue;
+ _bodyController.add(data);
if (_remainingContent != null) {
_remainingContent -= data.length;
}
_index += data.length;
if (_remainingContent == 0) {
if (!_chunked) {
- _bodyEnd();
- if (_state == _State.CANCELED) continue;
- _reset();
+ _state = _State.START;
+ _closeIncoming();
} else {
_state = _State.CHUNK_SIZE_STARTING_CR;
}
@@ -574,36 +669,62 @@ class _HttpParser {
break;
}
}
- } catch (e) {
+ } catch (e, s) {
_state = _State.FAILURE;
- error(e);
+ error(new AsyncError(e, s));
}
- // If all data is parsed or not needed due to failure there is no
- // need to hold on to the buffer.
- if (_state != _State.UPGRADED) _releaseBuffer();
+ _parserCalled = false;
+ if (_buffer != null && _index == _buffer.length) {
+ // If all data is parsed release the buffer and resume receiving
+ // data.
+ _releaseBuffer();
+ if (_state != _State.UPGRADED && _state != _State.FAILURE) {
+ _socketSubscription.resume();
+ }
+ }
}
- void streamData(List<int> buffer) {
+ void _onData(List<int> buffer) {
+ _socketSubscription.pause();
assert(_buffer == null);
_buffer = buffer;
_index = 0;
- _lastIndex = buffer.length;
_parse();
}
- void streamDone() {
- String type() => _requestParser ? "request" : "response";
-
+ void _onDone() {
+ // onDone cancles the subscription.
+ _socketSubscription = null;
+ if (_state == _State.CLOSED || _state == _State.FAILURE) return;
+
+ if (_incoming != null) {
+ if (_state != _State.UPGRADED &&
+ !(_state == _State.START && !_requestParser) &&
+ !(_state == _State.BODY && !_chunked && _transferLength == -1)) {
+ _bodyController.signalError(
+ new AsyncError(
+ new HttpParserException(
+ "Connection closed while receiving data")));
+ }
+ _closeIncoming();
+ _controller.close();
+ return;
+ }
// If the connection is idle the HTTP stream is closed.
if (_state == _State.START) {
- if (_requestParser) {
- closed();
- } else {
+ if (!_requestParser) {
error(
- new HttpParserException(
- "Connection closed before full ${type()} header was received"));
+ new AsyncError(
+ new HttpParserException(
+ "Connection closed before full header was received")));
}
+ _controller.close();
+ return;
+ }
+
+ if (_state == _State.UPGRADED) {
+ _controller.close();
return;
}
@@ -612,27 +733,29 @@ class _HttpParser {
// Report the error through the error callback if any. Otherwise
// throw the error.
error(
- new HttpParserException(
- "Connection closed before full ${type()} header was received"));
+ new AsyncError(
+ new HttpParserException(
+ "Connection closed before full header was received")));
+ _controller.close();
return;
}
- if (!_chunked && _contentLength == -1) {
- dataEnd(true);
+ if (!_chunked && _transferLength == -1) {
_state = _State.CLOSED;
- closed();
} else {
_state = _State.FAILURE;
// Report the error through the error callback if any. Otherwise
// throw the error.
error(
- new HttpParserException(
- "Connection closed before full ${type()} body was received"));
+ new AsyncError(
+ new HttpParserException(
+ "Connection closed before full body was received")));
}
+ _controller.close();
}
- void streamError(e) {
- error(e);
+ void _onError(e) {
+ _controller.signalError(e);
}
String get version {
@@ -645,34 +768,31 @@ class _HttpParser {
return null;
}
- void cancel() {
- _state = _State.CANCELED;
- }
-
- void restart() {
- _reset();
- }
-
int get messageType => _messageType;
- int get contentLength => _contentLength;
+ int get transferLength => _transferLength;
bool get upgrade => _connectionUpgrade && _state == _State.UPGRADED;
bool get persistentConnection => _persistentConnection;
void set responseToMethod(String method) { _responseToMethod = method; }
+ _HttpDetachedIncoming detachIncoming() {
+ var completer = _pauseCompleter;
+ _pauseCompleter = null;
+ return new _HttpDetachedIncoming(_socketSubscription,
+ readUnparsedData(),
+ completer);
+ }
+
List<int> readUnparsedData() {
- if (_buffer == null) return [];
- if (_index == _lastIndex) return [];
- var result = _buffer.getRange(_index, _lastIndex - _index);
+ if (_buffer == null) return null;
+ if (_index == _buffer.length) return null;
+ var result = _buffer.getRange(_index, _buffer.length - _index);
_releaseBuffer();
return result;
}
- void _bodyEnd() {
- dataEnd(_messageType == _MessageType.RESPONSE && !_persistentConnection);
- }
-
_reset() {
+ if (_state == _State.UPGRADED) return;
_state = _State.START;
_messageType = _MessageType.UNDETERMINED;
_headerField = new List();
@@ -681,21 +801,21 @@ class _HttpParser {
_uri_or_reason_phrase = new List();
_httpVersion = _HttpVersion.UNDETERMINED;
- _contentLength = -1;
+ _transferLength = -1;
_persistentConnection = false;
_connectionUpgrade = false;
_chunked = false;
+ _noMessageBody = false;
_responseToMethod = null;
_remainingContent = null;
- _headers = new _HttpHeaders();
+ _headers = null;
}
_releaseBuffer() {
_buffer = null;
_index = null;
- _lastIndex = null;
}
bool _isTokenChar(int byte) {
@@ -744,12 +864,69 @@ class _HttpParser {
}
}
+ void _createIncoming(int transferLength) {
+ assert(_incoming == null);
+ assert(_bodyController == null);
+ _bodyController = new StreamController<List<int>>(
+ onSubscriptionStateChange: _updateParsePauseState,
+ onPauseStateChange: _updateParsePauseState);
+ _incoming = new _HttpIncoming(
+ _headers, transferLength, _bodyController.stream);
+ _pauseParsing(); // Needed to handle detaching - don't start on the body!
+ }
+
+ void _closeIncoming() {
+ assert(_incoming != null);
+ var tmp = _incoming;
+ _incoming = null;
+ tmp.close();
+ if (_bodyController != null) {
+ _bodyController.close();
+ _bodyController = null;
+ }
+ _updateParsePauseState();
+ }
+
+ void _continueParsing() {
+ _paused = false;
+ if (!_parserCalled && _buffer != null) _parse();
+ }
+
+ void _pauseParsing() {
+ _paused = true;
+ }
+
+ void _updateParsePauseState() {
+ if (_bodyController != null) {
+ if (_bodyController.hasSubscribers && !_bodyController.isPaused) {
+ _continueParsing();
+ } else {
+ _pauseParsing();
+ }
+ } else {
+ if (_controller.hasSubscribers && !_controller.isPaused) {
+ _continueParsing();
+ } else {
+ _pauseParsing();
+ }
+ }
+ }
+
+ void error(error) {
+ if (_socketSubscription != null) _socketSubscription.cancel();
+ _state = _State.FAILURE;
+ _controller.signalError(error);
+ _controller.close();
+ }
+
+ // State.
+ bool _parserCalled = false;
+
// The data that is currently being parsed.
List<int> _buffer;
int _index;
- int _lastIndex;
- bool _requestParser;
+ final bool _requestParser;
int _state;
int _httpVersionIndex;
int _messageType;
@@ -760,23 +937,24 @@ class _HttpParser {
List _headerValue;
int _httpVersion;
- int _contentLength;
+ int _transferLength;
bool _persistentConnection;
bool _connectionUpgrade;
bool _chunked;
+ bool _noMessageBody;
String _responseToMethod; // Indicates the method used for the request.
int _remainingContent;
- _HttpHeaders _headers = new _HttpHeaders();
+ _HttpHeaders _headers;
- // Callbacks.
- Function requestStart;
- Function responseStart;
- Function dataReceived;
- Function dataEnd;
- Function error;
- Function closed;
+ // The current incoming connection.
+ _HttpIncoming _incoming;
+ StreamSubscription _socketSubscription;
+ bool _paused = false;
+ Completer _pauseCompleter;
+ StreamController<_HttpIncoming> _controller;
+ StreamController<List<int>> _bodyController;
}
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/io/http_session.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698