Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(420)

Unified Diff: sdk/lib/io/http_impl.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/http_headers.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/http_impl.dart
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
index ede39a2418e518c3fc6b1ea5dba2681fca5107ac..1ed571eadda474cb4de614cdaa1f765847e3b1b9 100644
--- a/sdk/lib/io/http_impl.dart
+++ b/sdk/lib/io/http_impl.dart
@@ -4,520 +4,477 @@
part of dart.io;
-// The close queue handles graceful closing of HTTP connections. When
-// a connection is added to the queue it will enter a wait state
-// waiting for all data written and possibly socket shutdown from
-// peer.
-class _CloseQueue {
- _CloseQueue() : _q = new Set<_HttpConnectionBase>();
-
- void add(_HttpConnectionBase connection) {
- void closeIfDone() {
- // When either the client has closed or all data has been
- // written to the client we close the underlying socket
- // completely.
- if (connection._isWriteClosed || connection._isReadClosed) {
- _q.remove(connection);
- connection._socket.close();
- if (connection.onClosed != null) connection.onClosed();
- }
- }
+class _HttpIncoming
+ extends Stream<List<int>> implements StreamSink<List<int>> {
+ final int _transferLength;
+ final Completer _dataCompleter = new Completer();
+ Stream<List<int>> _stream;
- // If the connection is already fully closed don't insert it into
- // the queue.
- if (connection._isFullyClosed) {
- connection._socket.close();
- if (connection.onClosed != null) connection.onClosed();
- return;
- }
+ bool fullBodyRead = false;
- connection._state |= _HttpConnectionBase.CLOSING;
- _q.add(connection);
-
- // If the output stream is not closed for writing, close it now and
- // wait for callback when closed.
- if (!connection._isWriteClosed) {
- connection._socket.outputStream.close();
- connection._socket.outputStream.onClosed = () {
- connection._state |= _HttpConnectionBase.WRITE_CLOSED;
- closeIfDone();
- };
- } else {
- connection._socket.outputStream.onClosed = () { assert(false); };
- }
+ // Common properties.
+ final _HttpHeaders headers;
+ bool upgraded = false;
- // If the request is not already fully read wait for the socket to close.
- // As the _isReadClosed state from the HTTP request processing indicate
- // that the response has been parsed this does not necesarily mean tha
- // the socket is closed.
- if (!connection._isReadClosed) {
- connection._socket.onClosed = () {
- connection._state |= _HttpConnectionBase.READ_CLOSED;
- closeIfDone();
- };
- }
+ // ClientResponse properties.
+ int statusCode;
+ String reasonPhrase;
+
+ // Request properties.
+ String method;
+ Uri uri;
- // Ignore any data on a socket in the close queue.
- connection._socket.onData = connection._socket.read;
+ // The transfer length if the length of the message body as it
+ // appears in the message (RFC 2616 section 4.4). This can be -1 if
+ // the length of the massage body is not known due to transfer
+ // codings.
+ int get transferLength => _transferLength;
- // If an error occurs immediately close the socket.
- connection._socket.onError = (e) {
- connection._state |= _HttpConnectionBase.READ_CLOSED;
- connection._state |= _HttpConnectionBase.WRITE_CLOSED;
- closeIfDone();
- };
+ _HttpIncoming(_HttpHeaders this.headers,
+ int this._transferLength,
+ Stream<List<int>> this._stream) {
}
- void shutdown() {
- _q.forEach((_HttpConnectionBase connection) {
- connection._socket.close();
- });
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- final Set<_HttpConnectionBase> _q;
+ // Is completed once all data have been received.
+ Future get dataDone => _dataCompleter.future;
+
+ void close() {
+ fullBodyRead = true;
+ _dataCompleter.complete();
+ }
}
+class _HttpInboundMessage extends Stream<List<int>> {
+ final _HttpIncoming _incoming;
+ List<Cookie> _cookies;
-class _HttpRequestResponseBase {
- static const int START = 0;
- static const int HEADER_SENT = 1;
- static const int DONE = 2;
- static const int UPGRADED = 3;
+ _HttpInboundMessage(_HttpIncoming this._incoming);
- _HttpRequestResponseBase(_HttpConnectionBase this._httpConnection)
- : _state = START, _headResponse = false;
+ List<Cookie> get cookies {
+ if (_cookies != null) return _cookies;
+ return _cookies = headers._parseCookies();
+ }
- int get contentLength => _headers.contentLength;
- HttpHeaders get headers => _headers;
+ HttpHeaders get headers => _incoming.headers;
+ String get protocolVersion => headers.protocolVersion;
+ int get contentLength => headers.contentLength;
+ bool get persistentConnection => headers.persistentConnection;
+}
- bool get persistentConnection {
- List<String> connection = headers[HttpHeaders.CONNECTION];
- if (_protocolVersion == "1.1") {
- if (connection == null) return true;
- return !headers[HttpHeaders.CONNECTION].any(
- (value) => value.toLowerCase() == "close");
- } else {
- if (connection == null) return false;
- return headers[HttpHeaders.CONNECTION].any(
- (value) => value.toLowerCase() == "keep-alive");
- }
- }
- X509Certificate get certificate {
- var socket = _httpConnection._socket as SecureSocket;
- return socket == null ? socket : socket.peerCertificate;
- }
+class _HttpRequest extends _HttpInboundMessage implements HttpRequest {
+ final HttpResponse response;
- void set persistentConnection(bool persistentConnection) {
- if (_outputStream != null) throw new HttpException("Header already sent");
+ // Lazy initialized parsed query parameters.
+ Map<String, String> _queryParameters;
- // Determine the value of the "Connection" header.
- headers.remove(HttpHeaders.CONNECTION, "close");
- headers.remove(HttpHeaders.CONNECTION, "keep-alive");
- if (_protocolVersion == "1.1" && !persistentConnection) {
- headers.add(HttpHeaders.CONNECTION, "close");
- } else if (_protocolVersion == "1.0" && persistentConnection) {
- headers.add(HttpHeaders.CONNECTION, "keep-alive");
- }
- }
+ final _HttpServer _httpServer;
+ final _HttpConnection _httpConnection;
- bool _write(List<int> data, bool copyBuffer) {
- if (_headResponse) return true;
- _ensureHeadersSent();
- bool allWritten = true;
- if (data.length > 0) {
- if (_headers.chunkedTransferEncoding) {
- // Write chunk size if transfer encoding is chunked.
- _writeHexString(data.length);
- _writeCRLF();
- _httpConnection._write(data, copyBuffer);
- allWritten = _writeCRLF();
- } else {
- _updateContentLength(data.length);
- allWritten = _httpConnection._write(data, copyBuffer);
- }
- }
- return allWritten;
- }
-
- bool _writeList(List<int> data, int offset, int count) {
- if (_headResponse) return true;
- _ensureHeadersSent();
- bool allWritten = true;
- if (count > 0) {
- if (_headers.chunkedTransferEncoding) {
- // Write chunk size if transfer encoding is chunked.
- _writeHexString(count);
- _writeCRLF();
- _httpConnection._writeFrom(data, offset, count);
- allWritten = _writeCRLF();
- } else {
- _updateContentLength(count);
- allWritten = _httpConnection._writeFrom(data, offset, count);
- }
- }
- return allWritten;
- }
+ HttpSession _session;
- bool _writeDone() {
- bool allWritten = true;
- if (_headers.chunkedTransferEncoding) {
- // Terminate the content if transfer encoding is chunked.
- allWritten = _httpConnection._write(_Const.END_CHUNKED);
- } else {
- if (!_headResponse && _bodyBytesWritten < _headers.contentLength) {
- throw new HttpException("Sending less than specified content length");
+ _HttpRequest(_HttpResponse this.response,
+ _HttpIncoming _incoming,
+ _HttpServer this._httpServer,
+ _HttpConnection this._httpConnection)
+ : super(_incoming) {
+ response.headers.persistentConnection = headers.persistentConnection;
+
+ if (_httpServer._sessionManagerInstance != null) {
+ // Map to session if exists.
+ var sessionId = cookies.reduce(null, (last, cookie) {
+ if (last != null) return last;
+ return cookie.name.toUpperCase() == _DART_SESSION_ID ?
+ cookie.value : null;
+ });
+ if (sessionId != null) {
+ _session = _httpServer._sessionManager.getSession(sessionId);
+ if (_session != null) {
+ _session._markSeen();
+ }
}
- assert(_headResponse || _bodyBytesWritten == _headers.contentLength);
}
- return allWritten;
}
- bool _writeHeaders() {
- _headers._write(_httpConnection);
- // Terminate header.
- return _writeCRLF();
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _incoming.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- bool _writeHexString(int x) {
- final List<int> hexDigits = [0x30, 0x31, 0x32, 0x33, 0x34,
- 0x35, 0x36, 0x37, 0x38, 0x39,
- 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
- List<int> hex = new Uint8List(10);
- int index = hex.length;
- while (x > 0) {
- index--;
- hex[index] = hexDigits[x % 16];
- x = x >> 4;
+ Map<String, String> get queryParameters {
+ if (_queryParameters == null) {
+ _queryParameters = _HttpUtils.splitQueryString(uri.query);
}
- return _httpConnection._writeFrom(hex, index, hex.length - index);
+ return _queryParameters;
}
- bool _writeCRLF() {
- final CRLF = const [_CharCode.CR, _CharCode.LF];
- return _httpConnection._write(CRLF);
- }
+ Uri get uri => _incoming.uri;
- bool _writeSP() {
- final SP = const [_CharCode.SP];
- return _httpConnection._write(SP);
- }
+ String get method => _incoming.method;
- void _ensureHeadersSent() {
- // Ensure that headers are written.
- if (_state == START) {
- _writeHeader();
- }
- }
-
- void _updateContentLength(int bytes) {
- if (_bodyBytesWritten + bytes > _headers.contentLength) {
- throw new HttpException("Writing more than specified content length");
+ HttpSession get session {
+ if (_session != null) {
+ // It's already mapped, use it.
+ return _session;
}
- _bodyBytesWritten += bytes;
+ // Create session, store it in connection, and return.
+ return _session = _httpServer._sessionManager.createSession();
}
HttpConnectionInfo get connectionInfo => _httpConnection.connectionInfo;
- bool get _done => _state == DONE;
-
- int _state;
- bool _headResponse;
-
- _HttpConnectionBase _httpConnection;
- _HttpHeaders _headers;
- List<Cookie> _cookies;
- String _protocolVersion = "1.1";
-
- // Number of body bytes written. This is only actual body data not
- // including headers or chunk information of using chinked transfer
- // encoding.
- int _bodyBytesWritten = 0;
+ X509Certificate get certificate {
+ Socket socket = _httpConnection._socket;
+ if (socket is SecureSocket) return socket.peerCertificate;
+ return null;
+ }
}
-// Parsed HTTP request providing information on the HTTP headers.
-class _HttpRequest extends _HttpRequestResponseBase implements HttpRequest {
- _HttpRequest(_HttpConnection connection) : super(connection);
-
- String get method => _method;
- String get uri => _uri;
- String get path => _path;
- String get queryString => _queryString;
- Map get queryParameters => _queryParameters;
-
- List<Cookie> get cookies {
- if (_cookies != null) return _cookies;
-
- // Parse a Cookie header value according to the rules in RFC 6265.
- void _parseCookieString(String s) {
- int index = 0;
+class _HttpClientResponse
+ extends _HttpInboundMessage implements HttpClientResponse {
+ List<RedirectInfo> get redirects => _httpRequest._responseRedirects;
- bool done() => index == s.length;
+ // The HttpClient this response belongs to.
+ final _HttpClient _httpClient;
- void skipWS() {
- while (!done()) {
- if (s[index] != " " && s[index] != "\t") return;
- index++;
- }
- }
+ // The HttpClientRequest of this response.
+ final _HttpClientRequest _httpRequest;
- String parseName() {
- int start = index;
- while (!done()) {
- if (s[index] == " " || s[index] == "\t" || s[index] == "=") break;
- index++;
- }
- return s.substring(start, index).toLowerCase();
- }
+ List<Cookie> _cookies;
- String parseValue() {
- int start = index;
- while (!done()) {
- if (s[index] == " " || s[index] == "\t" || s[index] == ";") break;
- index++;
- }
- return s.substring(start, index).toLowerCase();
- }
+ _HttpClientResponse(_HttpIncoming _incoming,
+ _HttpClientRequest this._httpRequest,
+ _HttpClient this._httpClient)
+ : super(_incoming);
- void expect(String expected) {
- if (done()) {
- throw new HttpException("Failed to parse header value [$s]");
- }
- if (s[index] != expected) {
- throw new HttpException("Failed to parse header value [$s]");
- }
- index++;
- }
-
- while (!done()) {
- skipWS();
- if (done()) return;
- String name = parseName();
- skipWS();
- expect("=");
- skipWS();
- String value = parseValue();
- _cookies.add(new _Cookie(name, value));
- skipWS();
- if (done()) return;
- expect(";");
- }
- }
+ int get statusCode => _incoming.statusCode;
+ String get reasonPhrase => _incoming.reasonPhrase;
+ List<Cookie> get cookies {
+ if (_cookies != null) return _cookies;
_cookies = new List<Cookie>();
- List<String> headerValues = headers["cookie"];
- if (headerValues != null) {
- headerValues.forEach((headerValue) => _parseCookieString(headerValue));
+ List<String> values = headers["set-cookie"];
+ if (values != null) {
+ values.forEach((value) {
+ _cookies.add(new Cookie.fromSetCookieValue(value));
+ });
}
return _cookies;
}
- InputStream get inputStream {
- if (_inputStream == null) {
- _inputStream = new _HttpInputStream(this);
+ bool get isRedirect {
+ if (_httpRequest.method == "GET" || _httpRequest.method == "HEAD") {
+ return statusCode == HttpStatus.MOVED_PERMANENTLY ||
+ statusCode == HttpStatus.FOUND ||
+ statusCode == HttpStatus.SEE_OTHER ||
+ statusCode == HttpStatus.TEMPORARY_REDIRECT;
+ } else if (_httpRequest.method == "POST") {
+ return statusCode == HttpStatus.SEE_OTHER;
}
- return _inputStream;
+ return false;
}
- String get protocolVersion => _protocolVersion;
-
- HttpSession session([init(HttpSession session)]) {
- if (_session != null) {
- // It's already mapped, use it.
- return _session;
+ Future<HttpClientResponse> redirect([String method,
+ Uri url,
+ bool followLoops]) {
+ if (method == null) {
+ // Set method as defined by RFC 2616 section 10.3.4.
+ if (statusCode == HttpStatus.SEE_OTHER && _httpRequest.method == "POST") {
+ method = "GET";
+ } else {
+ method = _httpRequest.method;
+ }
}
- // Create session, store it in connection, and return.
- var sessionManager = _httpConnection._server._sessionManager;
- return _session = sessionManager.createSession(init);
- }
-
- void _onRequestReceived(String method,
- String uri,
- String version,
- _HttpHeaders headers) {
- _method = method;
- _uri = uri;
- _parseRequestUri(uri);
- _headers = headers;
- if (_httpConnection._server._sessionManagerInstance != null) {
- // Map to session if exists.
- var sessionId = cookies.reduce(null, (last, cookie) {
- if (last != null) return last;
- return cookie.name.toUpperCase() == _DART_SESSION_ID ?
- cookie.value : null;
- });
- if (sessionId != null) {
- var sessionManager = _httpConnection._server._sessionManager;
- _session = sessionManager.getSession(sessionId);
- if (_session != null) {
- _session._markSeen();
+ if (url == null) {
+ String location = headers.value(HttpHeaders.LOCATION);
+ if (location == null) {
+ throw new StateError("Response has no Location header for redirect");
+ }
+ url = Uri.parse(location);
+ }
+ if (followLoops != true) {
+ for (var redirect in redirects) {
+ if (redirect.location == url) {
+ return new Future.immediateError(
+ new RedirectLoopException(redirects));
}
}
}
-
- // Prepare for receiving data.
- _buffer = new _BufferList();
+ return _httpClient._openUrlFromRequest(method, url, _httpRequest)
+ .then((request) {
+ request._responseRedirects.addAll(this.redirects);
+ request._responseRedirects.add(new _RedirectInfo(statusCode,
+ method,
+ url));
+ return request.close();
+ });
}
- void _onDataReceived(List<int> data) {
- _buffer.add(data);
- if (_inputStream != null) _inputStream._dataReceived();
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _incoming.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- void _onDataEnd() {
- if (_inputStream != null) {
- _inputStream._closeReceived();
- } else {
- inputStream._streamMarkedClosed = true;
- }
+ Future<Socket> detachSocket() {
+ _httpClient._connectionClosed(_httpRequest._httpClientConnection);
+ return _httpRequest._httpClientConnection.detachSocket();
}
- // Escaped characters in uri are expected to have been parsed.
- void _parseRequestUri(String uri) {
- int position;
- position = uri.indexOf("?", 0);
- if (position == -1) {
- _path = _HttpUtils.decodeUrlEncodedString(_uri);
- _queryString = null;
- _queryParameters = new Map();
- } else {
- _path = _HttpUtils.decodeUrlEncodedString(_uri.substring(0, position));
- _queryString = _uri.substring(position + 1);
- _queryParameters = _HttpUtils.splitQueryString(_queryString);
- }
- }
-
- // Delegate functions for the HttpInputStream implementation.
- int _streamAvailable() {
- return _buffer.length;
- }
+ HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
- List<int> _streamRead(int bytesToRead) {
- return _buffer.readBytes(bytesToRead);
+ bool get _shouldAuthenticate {
+ // Only try to authenticate if there is a challenge in the response.
+ List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE];
+ return statusCode == HttpStatus.UNAUTHORIZED &&
+ challenge != null && challenge.length == 1;
}
- int _streamReadInto(List<int> buffer, int offset, int len) {
- List<int> data = _buffer.readBytes(len);
- buffer.setRange(offset, data.length, data);
- return data.length;
- }
+ Future<HttpClientResponse> _authenticate() {
+ Future<HttpClientResponse> retryWithCredentials(_Credentials cr) {
+ if (cr != null) {
+ // TODO(sgjesse): Support digest.
+ if (cr.scheme == _AuthenticationScheme.BASIC) {
+ // Drain body and retry.
+ return reduce(null, (x, y) {}).then((_) {
+ return _httpClient._openUrlFromRequest(_httpRequest.method,
+ _httpRequest.uri,
+ _httpRequest)
+ .then((request) => request.close());
+ });
+ }
+ }
- void _streamSetErrorHandler(callback(e)) {
- _streamErrorHandler = callback;
+ // Fall through to here to perform normal response handling if
+ // there is no sensible authorization handling.
+ return new Future.immediate(this);
+ }
+
+ List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE];
+ assert(challenge != null || challenge.length == 1);
+ _HeaderValue header =
+ new _HeaderValue.fromString(challenge[0], parameterSeparator: ",");
+ _AuthenticationScheme scheme =
+ new _AuthenticationScheme.fromString(header.value);
+ String realm = header.parameters["realm"];
+
+ // See if any credentials are available.
+ _Credentials cr = _httpClient._findCredentials(_httpRequest.uri, scheme);
+
+ if (cr != null && !cr.used) {
+ // If credentials found prepare for retrying the request.
+ return retryWithCredentials(cr);
+ }
+
+ // Ask for more credentials if none found or the one found has
+ // already been used. If it has already been used it must now be
+ // invalid and is removed.
+ if (cr != null) {
+ _httpClient._removeCredentials(cr);
+ cr = null;
+ }
+ if (_httpClient._authenticate != null) {
+ Future authComplete = _httpClient._authenticate(_httpRequest.uri,
+ scheme.toString(),
+ realm);
+ return authComplete.then((credsAvailable) {
+ if (credsAvailable) {
+ cr = _httpClient._findCredentials(_httpRequest.uri, scheme);
+ return retryWithCredentials(cr);
+ } else {
+ // No credentials available, complete with original response.
+ return this;
+ }
+ });
+ }
+ // No credentials were found and the callback was not set.
+ return new Future.immediate(this);
}
-
- String _method;
- String _uri;
- String _path;
- String _queryString;
- Map<String, String> _queryParameters;
- _HttpInputStream _inputStream;
- _BufferList _buffer;
- Function _streamErrorHandler;
- _HttpSession _session;
}
-// HTTP response object for sending a HTTP response.
-class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse {
- _HttpResponse(_HttpConnection httpConnection)
- : super(httpConnection),
- _statusCode = HttpStatus.OK {
- _headers = new _HttpHeaders();
- }
+class _HttpOutboundMessage<T> extends IOSink {
+ // Used to mark when the body should be written. This is used for HEAD
+ // requests and in error handling.
+ bool _ignoreBody = false;
+
+ _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing)
+ : super(outgoing),
+ _outgoing = outgoing,
+ headers = new _HttpHeaders(protocolVersion);
+ int get contentLength => headers.contentLength;
void set contentLength(int contentLength) {
- if (_state >= _HttpRequestResponseBase.HEADER_SENT) {
- throw new HttpException("Header already sent");
- }
- _headers.contentLength = contentLength;
+ headers.contentLength = contentLength;
}
- int get statusCode => _statusCode;
- void set statusCode(int statusCode) {
- if (_outputStream != null) throw new HttpException("Header already sent");
- _statusCode = statusCode;
+ bool get persistentConnection => headers.persistentConnection;
+ bool set persistentConnection(bool p) {
+ headers.persistentConnection = p;
}
- String get reasonPhrase => _findReasonPhrase(_statusCode);
- void set reasonPhrase(String reasonPhrase) {
- if (_outputStream != null) throw new HttpException("Header already sent");
- _reasonPhrase = reasonPhrase;
+ Future<T> consume(Stream<List<int>> stream) {
+ _writeHeaders();
+ if (_ignoreBody) return new Future.immediate(this);
+ if (_chunked) {
+ // Transform when chunked.
+ stream = stream.transform(new _ChunkedTransformer());
+ }
+ return super.consume(stream).then((_) => this);
}
- List<Cookie> get cookies {
- if (_cookies == null) _cookies = new List<Cookie>();
- return _cookies;
+ void add(List<int> data) {
+ _writeHeaders();
+ if (_ignoreBody) return;
+ if (_chunked) {
+ _ChunkedTransformer._addChunk(data, super.add);
+ } else {
+ super.add(data);
+ }
}
- OutputStream get outputStream {
- if (_state >= _HttpRequestResponseBase.DONE) {
- throw new HttpException("Response closed");
+ void close() {
+ if (!_headersWritten && !_ignoreBody && headers.chunkedTransferEncoding) {
+ // If no body was written, _ignoreBody is false (it's not a HEAD
+ // request) and the content-length is unspecified, set contentLength to 0.
+ headers.contentLength = 0;
}
- if (_outputStream == null) {
- _outputStream = new _HttpOutputStream(this);
+ _writeHeaders();
+ if (!_ignoreBody) {
+ if (_chunked) {
+ _ChunkedTransformer._addChunk([], super.add);
+ }
}
- return _outputStream;
+ super.close();
}
- DetachedSocket detachSocket() {
- if (_state >= _HttpRequestResponseBase.DONE) {
- throw new HttpException("Response closed");
+ void _writeHeaders() {
+ if (_headersWritten) return;
+ bool _tmpIgnoreBody = _ignoreBody;
+ _ignoreBody = false;
+ _headersWritten = true;
+ _writeHeader();
+ _ignoreBody = _tmpIgnoreBody;
+ if (_ignoreBody) {
+ super.close();
+ return;
}
- // Ensure that headers are written.
- if (_state == _HttpRequestResponseBase.START) {
- _writeHeader();
+ _chunked = headers.chunkedTransferEncoding;
+ if (!_chunked) {
+ _outgoing.setTransferLength(headers.contentLength);
}
- _state = _HttpRequestResponseBase.UPGRADED;
- // Ensure that any trailing data is written.
- _writeDone();
- // Indicate to the connection that the response handling is done.
- return _httpConnection._detachSocket();
}
- // Delegate functions for the HttpOutputStream implementation.
- bool _streamWrite(List<int> buffer, bool copyBuffer) {
- if (_done) throw new HttpException("Response closed");
- return _write(buffer, copyBuffer);
+ void _writeHeader(); // TODO(ajohnsen): Better name.
+
+ final _HttpHeaders headers;
+
+ final _HttpOutgoing _outgoing;
+ bool _headersWritten = false;
+ bool _chunked = false;
+}
+
+
+class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
+ implements HttpResponse {
+ int statusCode = 200;
+ String _reasonPhrase;
+ List<Cookie> _cookies;
+ _HttpRequest _httpRequest;
+
+ _HttpResponse(String protocolVersion,
+ _HttpOutgoing _outgoing)
+ : super(protocolVersion, _outgoing);
+
+ List<Cookie> get cookies {
+ if (_cookies == null) _cookies = new List<Cookie>();
+ return _cookies;
}
- bool _streamWriteFrom(List<int> buffer, int offset, int len) {
- if (_done) throw new HttpException("Response closed");
- return _writeList(buffer, offset, len);
+ String get reasonPhrase => _findReasonPhrase(statusCode);
+ void set reasonPhrase(String reasonPhrase) {
+ if (_headersWritten) throw new StateError("Header already sent");
+ _reasonPhrase = reasonPhrase;
}
- void _streamFlush() {
- _httpConnection._flush();
+ Future<Socket> detachSocket() {
+ if (_headersWritten) throw new StateError("Headers already sent");
+ _writeHeaders();
+ var future = _httpRequest._httpConnection.detachSocket();
+ // Close connection so the socket is 'free'.
+ close();
+ return future;
}
- void _streamClose() {
- _ensureHeadersSent();
- _state = _HttpRequestResponseBase.DONE;
- // Stop tracking no pending write events.
- _httpConnection._onNoPendingWrites = null;
- // Ensure that any trailing data is written.
- _writeDone();
- // Indicate to the connection that the response handling is done.
- _httpConnection._responseClosed();
- if (_streamClosedHandler != null) {
- Timer.run(_streamClosedHandler);
+ HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
+
+ void _writeHeader() {
+ writeSP() => add([_CharCode.SP]);
+ writeCRLF() => add([_CharCode.CR, _CharCode.LF]);
+
+ // Write status line.
+ if (headers.protocolVersion == "1.1") {
+ add(_Const.HTTP11);
+ } else {
+ add(_Const.HTTP10);
}
- }
+ writeSP();
+ addString(statusCode.toString());
+ writeSP();
+ addString(reasonPhrase);
+ writeCRLF();
- void _streamSetNoPendingWriteHandler(callback()) {
- if (_state != _HttpRequestResponseBase.DONE) {
- _httpConnection._onNoPendingWrites = callback;
+ var session = _httpRequest._session;
+ if (session != null && !session._destroyed) {
+ // Mark as not new.
+ session._isNew = false;
+ // Make sure we only send the current session id.
+ bool found = false;
+ for (int i = 0; i < cookies.length; i++) {
+ if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) {
+ cookies[i].value = session.id;
+ cookies[i].httpOnly = true;
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true);
+ }
+ }
+ // Add all the cookies set to the headers.
+ if (_cookies != null) {
+ _cookies.forEach((cookie) {
+ headers.add("set-cookie", cookie);
+ });
}
- }
- void _streamSetClosedHandler(callback()) {
- _streamClosedHandler = callback;
- }
+ headers._finalize();
- void _streamSetErrorHandler(callback(e)) {
- _streamErrorHandler = callback;
+ // Write headers.
+ headers._write(this);
+ writeCRLF();
}
String _findReasonPhrase(int statusCode) {
@@ -574,1231 +531,915 @@ class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse {
default: return "Status $statusCode";
}
}
-
- bool _writeHeader() {
- List<int> data;
-
- // Write status line.
- if (_protocolVersion == "1.1") {
- _httpConnection._write(_Const.HTTP11);
- } else {
- _httpConnection._write(_Const.HTTP10);
- }
- _writeSP();
- data = _statusCode.toString().charCodes;
- _httpConnection._write(data);
- _writeSP();
- data = reasonPhrase.charCodes;
- _httpConnection._write(data);
- _writeCRLF();
-
- var session = _httpConnection._request._session;
- if (session != null && !session._destroyed) {
- // Make sure we only send the current session id.
- bool found = false;
- for (int i = 0; i < cookies.length; i++) {
- if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) {
- cookies[i].value = session.id;
- cookies[i].httpOnly = true;
- found = true;
- break;
- }
- }
- if (!found) {
- cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true);
- }
- }
- // Add all the cookies set to the headers.
- if (_cookies != null) {
- _cookies.forEach((cookie) {
- _headers.add("set-cookie", cookie);
- });
- }
-
- // Write headers.
- _headers._finalize(_protocolVersion);
- bool allWritten = _writeHeaders();
- _state = _HttpRequestResponseBase.HEADER_SENT;
- return allWritten;
- }
-
- int _statusCode; // Response status code.
- String _reasonPhrase; // Response reason phrase.
- _HttpOutputStream _outputStream;
- Function _streamClosedHandler;
- Function _streamErrorHandler;
}
-class _HttpInputStream extends _BaseDataInputStream implements InputStream {
- _HttpInputStream(_HttpRequestResponseBase this._requestOrResponse) {
- _checkScheduleCallbacks();
- }
-
- int available() {
- return _requestOrResponse._streamAvailable();
- }
-
- void pipe(OutputStream output, {bool close: true}) {
- _pipe(this, output, close: close);
- }
-
- List<int> _read(int bytesToRead) {
- List<int> result = _requestOrResponse._streamRead(bytesToRead);
- _checkScheduleCallbacks();
- return result;
- }
-
- void set onError(void callback(e)) {
- _requestOrResponse._streamSetErrorHandler(callback);
- }
-
- int _readInto(List<int> buffer, int offset, int len) {
- int result = _requestOrResponse._streamReadInto(buffer, offset, len);
- _checkScheduleCallbacks();
- return result;
- }
-
- void _close() {
- // TODO(sgjesse): Handle this.
- }
-
- void _dataReceived() {
- super._dataReceived();
- }
-
- _HttpRequestResponseBase _requestOrResponse;
-}
-
-
-class _HttpOutputStream extends _BaseOutputStream implements OutputStream {
- _HttpOutputStream(_HttpRequestResponseBase this._requestOrResponse);
-
- bool write(List<int> buffer, [bool copyBuffer = true]) {
- return _requestOrResponse._streamWrite(buffer, copyBuffer);
- }
-
- bool writeFrom(List<int> buffer, [int offset = 0, int len]) {
- if (offset < 0 || offset >= buffer.length) throw new ArgumentError();
- len = len != null ? len : buffer.length - offset;
- if (len < 0) throw new ArgumentError();
- return _requestOrResponse._streamWriteFrom(buffer, offset, len);
- }
-
- void flush() {
- _requestOrResponse._streamFlush();
- }
-
- void close() {
- _requestOrResponse._streamClose();
- }
-
- bool get closed => _requestOrResponse._done;
-
- void destroy() {
- throw "Not implemented";
- }
-
- void set onNoPendingWrites(void callback()) {
- _requestOrResponse._streamSetNoPendingWriteHandler(callback);
- }
-
- void set onClosed(void callback()) {
- _requestOrResponse._streamSetClosedHandler(callback);
- }
-
- void set onError(void callback(e)) {
- _requestOrResponse._streamSetErrorHandler(callback);
- }
-
- _HttpRequestResponseBase _requestOrResponse;
-}
-
-
-abstract class _HttpConnectionBase {
- static const int IDLE = 0;
- static const int ACTIVE = 1;
- static const int CLOSING = 2;
- static const int REQUEST_DONE = 4;
- static const int RESPONSE_DONE = 8;
- static const int ALL_DONE = REQUEST_DONE | RESPONSE_DONE;
- static const int READ_CLOSED = 16;
- static const int WRITE_CLOSED = 32;
- static const int FULLY_CLOSED = READ_CLOSED | WRITE_CLOSED;
-
- _HttpConnectionBase() : hashCode = _nextHashCode {
- _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF;
- }
-
- bool get _isIdle => (_state & ACTIVE) == 0;
- bool get _isActive => (_state & ACTIVE) == ACTIVE;
- bool get _isClosing => (_state & CLOSING) == CLOSING;
- bool get _isRequestDone => (_state & REQUEST_DONE) == REQUEST_DONE;
- bool get _isResponseDone => (_state & RESPONSE_DONE) == RESPONSE_DONE;
- bool get _isAllDone => (_state & ALL_DONE) == ALL_DONE;
- bool get _isReadClosed => (_state & READ_CLOSED) == READ_CLOSED;
- bool get _isWriteClosed => (_state & WRITE_CLOSED) == WRITE_CLOSED;
- bool get _isFullyClosed => (_state & FULLY_CLOSED) == FULLY_CLOSED;
-
- void _connectionEstablished(Socket socket) {
- _socket = socket;
- // Register handlers for socket events. All socket events are
- // passed to the HTTP parser.
- _socket.onData = () {
- List<int> buffer = _socket.read();
- if (buffer != null) {
- _httpParser.streamData(buffer);
- }
- };
- _socket.onClosed = _httpParser.streamDone;
- _socket.onError = _httpParser.streamError;
- _socket.outputStream.onError = _httpParser.streamError;
- }
-
- bool _write(List<int> data, [bool copyBuffer = false]);
- bool _writeFrom(List<int> buffer, [int offset, int len]);
- bool _flush();
- bool _close();
- bool _destroy();
- DetachedSocket _detachSocket();
-
- HttpConnectionInfo get connectionInfo {
- if (_socket == null) return null;
- try {
- _HttpConnectionInfo info = new _HttpConnectionInfo();
- info.remoteHost = _socket.remoteHost;
- info.remotePort = _socket.remotePort;
- info.localPort = _socket.port;
- return info;
- } catch (e) { }
- return null;
- }
-
- void set _onNoPendingWrites(void callback()) {
- _socket.outputStream.onNoPendingWrites = callback;
- }
-
- int _state = IDLE;
+class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest>
+ implements HttpClientRequest {
+ final String method;
+ final Uri uri;
+ final List<Cookie> cookies = new List<Cookie>();
- Socket _socket;
- _HttpParser _httpParser;
+ // The HttpClient this request belongs to.
+ final _HttpClient _httpClient;
+ final _HttpClientConnection _httpClientConnection;
- // Callbacks.
- Function onDetach;
- Function onClosed;
+ final Completer<HttpClientResponse> _responseCompleter
+ = new Completer<HttpClientResponse>();
- // Hash code for HTTP connection. Currently this is just a counter.
- final int hashCode;
- static int _nextHashCode = 0;
-}
+ final bool _usingProxy;
+ // TODO(ajohnsen): Get default value from client?
+ bool _followRedirects = true;
-// HTTP server connection over a socket.
-class _HttpConnection extends _HttpConnectionBase {
- _HttpConnection(HttpServer this._server) {
- _httpParser = new _HttpParser.requestParser();
- // Register HTTP parser callbacks.
- _httpParser.requestStart = _onRequestReceived;
- _httpParser.dataReceived = _onDataReceived;
- _httpParser.dataEnd = _onDataEnd;
- _httpParser.error = _onError;
- _httpParser.closed = _onClosed;
- _httpParser.responseStart = (statusCode, reasonPhrase, version) {
- assert(false);
- };
- }
-
- void _bufferData(List<int> data, [bool copyBuffer = false]) {
- if (_buffer == null) _buffer = new _BufferList();
- if (copyBuffer) data = data.getRange(0, data.length);
- _buffer.add(data);
- }
-
- void _writeBufferedResponse() {
- if (_buffer != null) {
- while (!_buffer.isEmpty) {
- var data = _buffer.first;
- _socket.outputStream.write(data, false);
- _buffer.removeBytes(data.length);
- }
- _buffer = null;
- }
- }
+ int _maxRedirects = 5;
- bool _write(List<int> data, [bool copyBuffer = false]) {
- if (_isRequestDone || !_hasBody || _httpParser.upgrade) {
- return _socket.outputStream.write(data, copyBuffer);
- } else {
- _bufferData(data, copyBuffer);
- return false;
- }
- }
+ List<RedirectInfo> _responseRedirects = [];
- bool _writeFrom(List<int> data, [int offset, int len]) {
- if (_isRequestDone || !_hasBody || _httpParser.upgrade) {
- return _socket.outputStream.writeFrom(data, offset, len);
- } else {
- if (offset == null) offset = 0;
- if (len == null) len = buffer.length - offset;
- _bufferData(data.getRange(offset, len), false);
- return false;
+ _HttpClientRequest(_HttpOutgoing outgoing,
+ Uri this.uri,
+ String this.method,
+ bool this._usingProxy,
+ _HttpClient this._httpClient,
+ _HttpClientConnection this._httpClientConnection)
+ : super("1.1", outgoing) {
+ // GET and HEAD have 'content-length: 0' by default.
+ if (method == "GET" || method == "HEAD") {
+ contentLength = 0;
}
}
- bool _flush() {
- _socket.outputStream.flush();
- }
-
- bool _close() {
- _socket.outputStream.close();
- }
+ Future<HttpClientResponse> get response => _responseCompleter.future;
- bool _destroy() {
- _socket.close();
+ Future<HttpClientResponse> close() {
+ super.close();
+ return response;
}
- void _onClosed() {
- _state |= _HttpConnectionBase.READ_CLOSED;
- _checkDone();
+ int get maxRedirects => _maxRedirects;
+ void set maxRedirects(int maxRedirects) {
+ if (_headersWritten) throw new StateError("Request already sent");
+ _maxRedirects = maxRedirects;
}
- DetachedSocket _detachSocket() {
- _socket.onData = null;
- _socket.onClosed = null;
- _socket.onError = null;
- _socket.outputStream.onNoPendingWrites = null;
- _writeBufferedResponse();
- Socket socket = _socket;
- _socket = null;
- if (onDetach != null) onDetach();
- return new _DetachedSocket(socket, _httpParser.readUnparsedData());
+ bool get followRedirects => _followRedirects;
+ void set followRedirects(bool followRedirects) {
+ if (_headersWritten) throw new StateError("Request already sent");
+ _followRedirects = followRedirects;
}
- void _onError(e) {
- // Don't report errors for a request parser when HTTP parser is in
- // idle state. Clients can close the connection and cause a
- // connection reset by peer error which is OK.
- _onClosed();
- if (_state == _HttpConnectionBase.IDLE) return;
+ HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo;
- // Propagate the error to the streams.
- if (_request != null &&
- !_isRequestDone &&
- _request._streamErrorHandler != null) {
- _request._streamErrorHandler(e);
- } else if (_response != null &&
- !_isResponseDone &&
- _response._streamErrorHandler != null) {
- _response._streamErrorHandler(e);
- } else {
- onError(e);
- }
- if (_socket != null) _socket.close();
- }
-
- void _onRequestReceived(String method,
- String uri,
- String version,
- _HttpHeaders headers,
- bool hasBody) {
- _state = _HttpConnectionBase.ACTIVE;
- // Create new request and response objects for this request.
- _request = new _HttpRequest(this);
- _response = new _HttpResponse(this);
- _request._onRequestReceived(method, uri, version, headers);
- _request._protocolVersion = version;
- _response._protocolVersion = version;
- _response._headResponse = method == "HEAD";
- _response.persistentConnection = _httpParser.persistentConnection;
- _hasBody = hasBody;
- if (onRequestReceived != null) {
- onRequestReceived(_request, _response);
- }
- _checkDone();
- }
-
- void _onDataReceived(List<int> data) {
- _request._onDataReceived(data);
- _checkDone();
- }
-
- void _checkDone() {
- if (_isReadClosed) {
- // If the client closes the conversation is ended.
- _server._closeQueue.add(this);
- } else if (_isAllDone) {
- // If we are done writing the response, and the connection is
- // not persistent, we must close. Also if using HTTP 1.0 and the
- // content length was not known we must close to indicate end of
- // body.
- bool close =
- !_response.persistentConnection ||
- (_response._protocolVersion == "1.0" && _response.contentLength < 0);
- _request = null;
- _response = null;
- if (close) {
- _httpParser.cancel();
- _server._closeQueue.add(this);
+ void _onIncoming(_HttpIncoming incoming) {
+ var response = new _HttpClientResponse(incoming,
+ this,
+ _httpClient);
+ Future<HttpClientResponse> future;
+ if (followRedirects && response.isRedirect) {
+ if (response.redirects.length < maxRedirects) {
+ // Redirect and drain response.
+ future = response.reduce(null, (x, y) {})
+ .then((_) => response.redirect());
} else {
- _state = _HttpConnectionBase.IDLE;
- }
- } else if (_isResponseDone && _hasBody) {
- // If the response is closed before the request is fully read
- // close this connection. If there is buffered output
- // (e.g. error response for invalid request where the server did
- // not care to read the request body) this is send.
- assert(!_isRequestDone);
- _writeBufferedResponse();
- _httpParser.cancel();
- _server._closeQueue.add(this);
- }
- }
-
- void _onDataEnd(bool close) {
- // Start sending queued response if any.
- _state |= _HttpConnectionBase.REQUEST_DONE;
- _writeBufferedResponse();
- _request._onDataEnd();
- _checkDone();
- }
-
- void _responseClosed() {
- _state |= _HttpConnectionBase.RESPONSE_DONE;
- }
-
- HttpServer _server;
- HttpRequest _request;
- HttpResponse _response;
- bool _hasBody = false;
-
- // Buffer for data written before full response has been processed.
- _BufferList _buffer;
-
- // Callbacks.
- Function onRequestReceived;
- Function onError;
-}
-
-
-class _RequestHandlerRegistration {
- _RequestHandlerRegistration(Function this._matcher, Function this._handler);
- Function _matcher;
- Function _handler;
-}
-
-// HTTP server waiting for socket connections. The connections are
-// managed by the server and as requests are received the request.
-// HTTPS connections are also supported, if the _HttpServer.httpsServer
-// constructor is used and a certificate name is provided in listen,
-// or a SecureServerSocket is provided to listenOn.
-class _HttpServer implements HttpServer, HttpsServer {
- _HttpServer() : this._internal(isSecure: false);
-
- _HttpServer.httpsServer() : this._internal(isSecure: true);
-
- _HttpServer._internal({ bool isSecure: false })
- : _secure = isSecure,
- _connections = new Set<_HttpConnection>(),
- _handlers = new List<_RequestHandlerRegistration>(),
- _closeQueue = new _CloseQueue();
-
- void listen(String host,
- int port,
- {int backlog: 128,
- String certificate_name,
- bool requestClientCertificate: false}) {
- if (_secure) {
- listenOn(new SecureServerSocket(
- host,
- port,
- backlog,
- certificate_name,
- requestClientCertificate: requestClientCertificate));
- } else {
- listenOn(new ServerSocket(host, port, backlog));
- }
- _closeServer = true;
- }
-
- void listenOn(ServerSocket serverSocket) {
- if (_secure && serverSocket is! SecureServerSocket) {
- throw new HttpException(
- 'HttpsServer.listenOn was called with non-secure server socket');
- } else if (!_secure && serverSocket is SecureServerSocket) {
- throw new HttpException(
- 'HttpServer.listenOn was called with a secure server socket');
- }
- void onConnection(Socket socket) {
- // Accept the client connection.
- _HttpConnection connection = new _HttpConnection(this);
- connection._connectionEstablished(socket);
- _connections.add(connection);
- connection.onRequestReceived = _handleRequest;
- connection.onClosed = () => _connections.remove(connection);
- connection.onDetach = () => _connections.remove(connection);
- connection.onError = (e) {
- _connections.remove(connection);
- if (_onError != null) {
- _onError(e);
- } else {
- throw(e);
- }
- };
- }
- serverSocket.onConnection = onConnection;
- _server = serverSocket;
- _closeServer = false;
- }
-
- addRequestHandler(bool matcher(HttpRequest request),
- void handler(HttpRequest request, HttpResponse response)) {
- _handlers.add(new _RequestHandlerRegistration(matcher, handler));
- }
-
- void set defaultRequestHandler(
- void handler(HttpRequest request, HttpResponse response)) {
- _defaultHandler = handler;
- }
-
- void close() {
- _closeQueue.shutdown();
- if (_sessionManagerInstance != null) {
- _sessionManagerInstance.close();
- _sessionManagerInstance = null;
- }
- if (_server != null && _closeServer) {
- _server.close();
- }
- _server = null;
- for (_HttpConnection connection in _connections) {
- connection._destroy();
- }
- _connections.clear();
- }
-
- int get port {
- if (_server == null) {
- throw new HttpException("The HttpServer is not listening on a port.");
- }
- return _server.port;
- }
-
- void set onError(void callback(e)) {
- _onError = callback;
- }
-
- set sessionTimeout(int timeout) {
- _sessionManager.sessionTimeout = timeout;
- }
-
- void _handleRequest(HttpRequest request, HttpResponse response) {
- for (int i = 0; i < _handlers.length; i++) {
- if (_handlers[i]._matcher(request)) {
- Function handler = _handlers[i]._handler;
- try {
- handler(request, response);
- } catch (e) {
- if (_onError != null) {
- _onError(e);
- } else {
- throw e;
- }
- }
- return;
+ // End with exception, too many redirects.
+ future = response.reduce(null, (x, y) {})
+ .then((_) => new Future.immediateError(
+ new RedirectLimitExceededException(response.redirects)));
}
- }
-
- if (_defaultHandler != null) {
- _defaultHandler(request, response);
+ } else if (response._shouldAuthenticate) {
+ future = response._authenticate();
} else {
- response.statusCode = HttpStatus.NOT_FOUND;
- response.contentLength = 0;
- response.outputStream.close();
- }
- }
-
- _HttpSessionManager get _sessionManager {
- // Lazy init.
- if (_sessionManagerInstance == null) {
- _sessionManagerInstance = new _HttpSessionManager();
- }
- return _sessionManagerInstance;
- }
-
- HttpConnectionsInfo connectionsInfo() {
- HttpConnectionsInfo result = new HttpConnectionsInfo();
- result.total = _connections.length;
- _connections.forEach((_HttpConnection conn) {
- if (conn._isActive) {
- result.active++;
- } else if (conn._isIdle) {
- result.idle++;
- } else {
- assert(result._isClosing);
- result.closing++;
- }
- });
- return result;
- }
-
- ServerSocket _server; // The server listen socket.
- bool _closeServer = false;
- bool _secure;
- Set<_HttpConnection> _connections; // Set of currently connected clients.
- List<_RequestHandlerRegistration> _handlers;
- Object _defaultHandler;
- Function _onError;
- _CloseQueue _closeQueue;
- _HttpSessionManager _sessionManagerInstance;
-}
-
-
-class _HttpClientRequest
- extends _HttpRequestResponseBase implements HttpClientRequest {
- _HttpClientRequest(String this._method,
- Uri this._uri,
- _HttpClientConnection connection)
- : super(connection) {
- _headers = new _HttpHeaders();
- _connection = connection;
- // Default GET and HEAD requests to have no content.
- if (_method == "GET" || _method == "HEAD") {
- contentLength = 0;
- }
- }
-
- void set contentLength(int contentLength) {
- if (_state >= _HttpRequestResponseBase.HEADER_SENT) {
- throw new HttpException("Header already sent");
- }
- _headers.contentLength = contentLength;
- }
-
- List<Cookie> get cookies {
- if (_cookies == null) _cookies = new List<Cookie>();
- return _cookies;
- }
-
- OutputStream get outputStream {
- if (_done) throw new HttpException("Request closed");
- if (_outputStream == null) {
- _outputStream = new _HttpOutputStream(this);
- }
- return _outputStream;
- }
-
- // Delegate functions for the HttpOutputStream implementation.
- bool _streamWrite(List<int> buffer, bool copyBuffer) {
- if (_done) throw new HttpException("Request closed");
- _emptyBody = _emptyBody && buffer.length == 0;
- return _write(buffer, copyBuffer);
- }
-
- bool _streamWriteFrom(List<int> buffer, int offset, int len) {
- if (_done) throw new HttpException("Request closed");
- _emptyBody = _emptyBody && buffer.length == 0;
- return _writeList(buffer, offset, len);
- }
-
- void _streamFlush() {
- _httpConnection._flush();
- }
-
- void _streamClose() {
- _ensureHeadersSent();
- _state = _HttpRequestResponseBase.DONE;
- // Stop tracking no pending write events.
- _httpConnection._onNoPendingWrites = null;
- // Ensure that any trailing data is written.
- _writeDone();
- _connection._requestClosed();
- if (_streamClosedHandler != null) {
- Timer.run(_streamClosedHandler);
- }
- }
-
- void _streamSetNoPendingWriteHandler(callback()) {
- if (_state != _HttpRequestResponseBase.DONE) {
- _httpConnection._onNoPendingWrites = callback;
+ future = new Future<HttpClientResponse>.immediate(response);
}
+ future.then(
+ (v) => _responseCompleter.complete(v),
+ onError: (e) {
+ _responseCompleter.completeError(e);
+ });
}
- void _streamSetClosedHandler(callback()) {
- _streamClosedHandler = callback;
- }
-
- void _streamSetErrorHandler(callback(e)) {
- _streamErrorHandler = callback;
+ void _onError(AsyncError error) {
+ _responseCompleter.completeError(error);
}
void _writeHeader() {
- List<int> data;
+ writeSP() => add([_CharCode.SP]);
+ writeCRLF() => add([_CharCode.CR, _CharCode.LF]);
- // Write request line.
- data = _method.toString().charCodes;
- _httpConnection._write(data);
- _writeSP();
+ addString(method);
+ writeSP();
// Send the path for direct connections and the whole URL for
// proxy connections.
- if (!_connection._usingProxy) {
- String path = _uri.path;
+ if (!_usingProxy) {
+ String path = uri.path;
if (path.length == 0) path = "/";
- if (_uri.query != "") {
- if (_uri.fragment != "") {
- path = "${path}?${_uri.query}#${_uri.fragment}";
+ if (uri.query != "") {
+ if (uri.fragment != "") {
+ path = "${path}?${uri.query}#${uri.fragment}";
} else {
- path = "${path}?${_uri.query}";
+ path = "${path}?${uri.query}";
}
}
- data = path.charCodes;
+ addString(path);
} else {
- data = _uri.toString().charCodes;
+ addString(uri.toString());
}
- _httpConnection._write(data);
- _writeSP();
- _httpConnection._write(_Const.HTTP11);
- _writeCRLF();
+ writeSP();
+ add(_Const.HTTP11);
+ writeCRLF();
// Add the cookies to the headers.
- if (_cookies != null) {
+ if (!cookies.isEmpty) {
StringBuffer sb = new StringBuffer();
- for (int i = 0; i < _cookies.length; i++) {
+ for (int i = 0; i < cookies.length; i++) {
if (i > 0) sb.add("; ");
- sb.add(_cookies[i].name);
+ sb.add(cookies[i].name);
sb.add("=");
- sb.add(_cookies[i].value);
+ sb.add(cookies[i].value);
}
- _headers.add("cookie", sb.toString());
+ headers.add("cookie", sb.toString());
}
+ headers._finalize();
+
// Write headers.
- _headers._finalize("1.1");
- _writeHeaders();
- _state = _HttpRequestResponseBase.HEADER_SENT;
+ headers._write(this);
+ writeCRLF();
}
-
- String _method;
- Uri _uri;
- _HttpClientConnection _connection;
- _HttpOutputStream _outputStream;
- Function _streamClosedHandler;
- Function _streamErrorHandler;
- bool _emptyBody = true;
}
-class _HttpClientResponse
- extends _HttpRequestResponseBase
- implements HttpClientResponse {
- _HttpClientResponse(_HttpClientConnection connection)
- : super(connection) {
- _connection = connection;
- }
- int get statusCode => _statusCode;
- String get reasonPhrase => _reasonPhrase;
-
- bool get isRedirect {
- var method = _connection._request._method;
- if (method == "GET" || method == "HEAD") {
- return statusCode == HttpStatus.MOVED_PERMANENTLY ||
- statusCode == HttpStatus.FOUND ||
- statusCode == HttpStatus.SEE_OTHER ||
- statusCode == HttpStatus.TEMPORARY_REDIRECT;
- } else if (method == "POST") {
- return statusCode == HttpStatus.SEE_OTHER;
+// Transformer that transforms data to HTTP Chunked Encoding.
+class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
+ final StreamController<List<int>> _controller
+ = new StreamController<List<int>>();
+
+ Stream<List<int>> bind(Stream<List<int>> stream) {
+ var subscription = stream.listen(
+ (data) {
+ if (data.length == 0) return; // Avoid close on 0-bytes payload.
+ _addChunk(data, _controller.add);
+ },
+ onDone: () {
+ _addChunk([], _controller.add);
+ _controller.close();
+ });
+ return _controller.stream;
+ }
+
+ static void _addChunk(List<int> data, void add(List<int> data)) {
+ add(_chunkHeader(data.length));
+ if (data.length > 0) add(data);
+ add(_chunkFooter);
+ }
+
+ static List<int> _chunkHeader(int length) {
+ const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37,
+ 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
+ var header = [];
+ if (length == 0) {
+ header.add(hexDigits[length]);
+ } else {
+ while (length > 0) {
+ header.insertRange(0, 1, hexDigits[length % 16]);
+ length = length >> 4;
+ }
}
- return false;
+ header.add(_CharCode.CR);
+ header.add(_CharCode.LF);
+ return header;
}
- List<Cookie> get cookies {
- if (_cookies != null) return _cookies;
- _cookies = new List<Cookie>();
- List<String> values = _headers["set-cookie"];
- if (values != null) {
- values.forEach((value) {
- _cookies.add(new Cookie.fromSetCookieValue(value));
- });
- }
- return _cookies;
- }
+ // Footer is just a CRLF.
+ static List<int> get _chunkFooter => const [_CharCode.CR, _CharCode.LF];
+}
- InputStream get inputStream {
- if (_inputStream == null) {
- _inputStream = new _HttpInputStream(this);
- }
- return _inputStream;
+
+// Transformer that invokes [_onDone] when completed.
+class _DoneTransformer implements StreamTransformer<List<int>, List<int>> {
+ final StreamController<List<int>> _controller
+ = new StreamController<List<int>>();
+ final Function _onDone;
+
+ _DoneTransformer(this._onDone);
+
+ Stream<List<int>> bind(Stream<List<int>> stream) {
+ var subscription = stream.listen(
+ _controller.add,
+ onError: _controller.signalError,
+ onDone: () {
+ _onDone();
+ _controller.close();
+ });
+ return _controller.stream;
}
+}
- void _onResponseReceived(int statusCode,
- String reasonPhrase,
- String version,
- _HttpHeaders headers,
- bool hasBody) {
- _statusCode = statusCode;
- _reasonPhrase = reasonPhrase;
- _headers = headers;
-
- // Prepare for receiving data.
- _buffer = new _BufferList();
- if (isRedirect && _connection.followRedirects) {
- if (_connection._redirects == null ||
- _connection._redirects.length < _connection.maxRedirects) {
- // Check the location header.
- List<String> location = headers[HttpHeaders.LOCATION];
- if (location == null || location.length > 1) {
- throw new RedirectException("Invalid redirect",
- _connection._redirects);
- }
- // Check for redirect loop
- if (_connection._redirects != null) {
- Uri redirectUrl = Uri.parse(location[0]);
- for (int i = 0; i < _connection._redirects.length; i++) {
- if (_connection._redirects[i].location.toString() ==
- redirectUrl.toString()) {
- throw new RedirectLoopException(_connection._redirects);
+// Transformer that validates the data written.
+class _DataValidatorTransformer
+ implements StreamTransformer<List<int>, List<int>> {
+ final StreamController<List<int>> _controller
+ = new StreamController<List<int>>();
+ int _bytesWritten = 0;
+ Completer _completer = new Completer();
+
+ int expectedTransferLength;
+
+ _DataValidatorTransformer();
+
+ Future get validatorFuture => _completer.future;
+
+ Stream<List<int>> bind(Stream<List<int>> stream) {
+ var subscription;
+ subscription = stream.listen(
+ (data) {
+ if (expectedTransferLength != null) {
+ _bytesWritten += data.length;
+ if (_bytesWritten > expectedTransferLength) {
+ _controller.close();
+ subscription.cancel();
+ if (_completer != null) {
+ _completer.completeError(new HttpException(
+ "Content size exceeds specified contentLength. "
+ "$_bytesWritten bytes written while expected "
+ "$expectedTransferLength."));
+ _completer = null;
+ }
+ return;
}
}
- }
- if (!persistentConnection) {
- throw new RedirectException(
- "Non-persistent connections are currently not supported for "
- "redirects", _connection._redirects);
- }
- // Drain body and redirect.
- inputStream.onData = inputStream.read;
- if (_statusCode == HttpStatus.SEE_OTHER &&
- _connection._method == "POST") {
- _connection.redirect("GET");
- } else {
- _connection.redirect();
- }
- } else {
- throw new RedirectLimitExceededException(_connection._redirects);
- }
- } else if (statusCode == HttpStatus.UNAUTHORIZED) {
- _handleUnauthorized();
- } else if (_connection._onResponse != null) {
- _connection._onResponse(this);
- }
+ _controller.add(data);
+ },
+ onError: (error) {
+ _controller.close();
+ if (_completer != null) {
+ _completer.completeError(error);
+ _completer = null;
+ }
+ },
+ onDone: () {
+ _controller.close();
+ if (expectedTransferLength != null) {
+ if (_bytesWritten < expectedTransferLength) {
+ if (_completer != null) {
+ _completer.completeError(new HttpException(
+ "Content size below specified contentLength. "
+ " $_bytesWritten bytes written while expected "
+ "$expectedTransferLength."));
+ _completer = null;
+ return;
+ }
+ }
+ }
+ if (_completer != null) {
+ _completer.complete(this);
+ _completer = null;
+ }
+ },
+ unsubscribeOnError: true);
+ return _controller.stream;
}
+}
- void _handleUnauthorized() {
+// Extends StreamConsumer as this is an internal type, only used to pipe to.
+class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> {
+ final Completer _dataCompleter = new Completer();
+ final Completer _streamCompleter = new Completer();
+ final _DataValidatorTransformer _validator = new _DataValidatorTransformer();
- void retryRequest(_Credentials cr) {
- if (cr != null) {
- // Drain body and retry.
- // TODO(sgjesse): Support digest.
- if (cr.scheme == _AuthenticationScheme.BASIC) {
- inputStream.onData = inputStream.read;
- _connection._retry();
- return;
- }
- }
+ // Future that completes when all data is written.
+ Future get dataDone => _dataCompleter.future;
- // Fall through to here to perform normal response handling if
- // there is no sensible authorization handling.
- if (_connection._onResponse != null) {
- _connection._onResponse(this);
- }
- }
+ // Future that completes with the Stream, once the _HttpClientConnection is
+ // bound to one.
+ Future<Stream<List<int>>> get stream => _streamCompleter.future;
- // Only try to authenticate if there is a challenge in the response.
- List<String> challenge = _headers[HttpHeaders.WWW_AUTHENTICATE];
- if (challenge != null && challenge.length == 1) {
- _HeaderValue header =
- new _HeaderValue.fromString(challenge[0], parameterSeparator: ",");
- _AuthenticationScheme scheme =
- new _AuthenticationScheme.fromString(header.value);
- String realm = header.parameters["realm"];
-
- // See if any credentials are available.
- _Credentials cr =
- _connection._client._findCredentials(
- _connection._request._uri, scheme);
-
- // Ask for more credentials if none found or the one found has
- // already been used. If it has already been used it must now be
- // invalid and is removed.
- if (cr == null || cr.used) {
- if (cr != null) {
- _connection._client._removeCredentials(cr);
- }
- cr = null;
- if (_connection._client._authenticate != null) {
- Future authComplete =
- _connection._client._authenticate(
- _connection._request._uri, scheme.toString(), realm);
- authComplete.then((credsAvailable) {
- if (credsAvailable) {
- cr = _connection._client._findCredentials(
- _connection._request._uri, scheme);
- retryRequest(cr);
- } else {
- if (_connection._onResponse != null) {
- _connection._onResponse(this);
- }
- }
- });
- return;
- }
- } else {
- // If credentials found prepare for retrying the request.
- retryRequest(cr);
- return;
- }
- }
+ void setTransferLength(int transferLength) {
+ _validator.expectedTransferLength = transferLength;
+ }
- // Fall through to here to perform normal response handling if
- // there is no sensible authorization handling.
- if (_connection._onResponse != null) {
- _connection._onResponse(this);
- }
+ Future consume(Stream<List<int>> stream) {
+ stream = stream.transform(_validator);
+ _streamCompleter.complete(stream);
+ _validator.validatorFuture.catchError((e) {
+ _dataCompleter.completeError(e);
+ });
+ return _validator.validatorFuture.then((v) {
+ _dataCompleter.complete();
+ return v;
+ });
}
+}
- void _onDataReceived(List<int> data) {
- _buffer.add(data);
- if (_inputStream != null) _inputStream._dataReceived();
+
+class _HttpClientConnection {
+ final String key;
+ final Socket _socket;
+ final _HttpParser _httpParser;
+ StreamSubscription _subscription;
+ final _HttpClient _httpClient;
+
+ Completer<_HttpIncoming> _nextResponseCompleter;
+ Future _writeDoneFuture;
+
+ _HttpClientConnection(String this.key,
+ Socket this._socket,
+ _HttpClient this._httpClient)
+ : _httpParser = new _HttpParser.responseParser() {
+ _socket.pipe(_httpParser);
+ _socket.done.catchError((e) { destroy(); });
+
+ // Set up handlers on the parser here, so we are sure to get 'onDone' from
+ // the parser.
+ _subscription = _httpParser.listen(
+ (incoming) {
+ // Only handle one incoming response at the time. Keep the
+ // stream paused until the response have been processed.
+ _subscription.pause();
+ // We assume the response is not here, until we have send the request.
+ assert(_nextResponseCompleter != null);
+ _nextResponseCompleter.complete(incoming);
+ },
+ onError: (error) {
+ if (_nextResponseCompleter != null) {
+ _nextResponseCompleter.completeError(error);
+ }
+ },
+ onDone: () {
+ close();
+ });
}
- void _onDataEnd() {
- if (_inputStream != null) {
- _inputStream._closeReceived();
- } else {
- inputStream._streamMarkedClosed = true;
- }
+ Future<_HttpIncoming> sendRequest(_HttpOutgoing outgoing) {
+ return outgoing.stream
+ .then((stream) {
+ // Close socket if output data is invalid.
+ outgoing.dataDone.catchError((e) {
+ close();
+ });
+ // Sending request, set up response completer.
+ _nextResponseCompleter = new Completer();
+ _writeDoneFuture = _socket.addStream(stream);
+ // Listen for response.
+ return _nextResponseCompleter.future
+ .whenComplete(() {
+ _nextResponseCompleter = null;
+ })
+ .then((incoming) {
+ incoming.dataDone.then((_) {
+ if (!incoming.headers.persistentConnection) {
+ close();
+ } else {
+ // Wait for the socket to be done with writing, before we
+ // continue.
+ _writeDoneFuture.then((_) {
+ _subscription.resume();
+ // Return connection, now we are done.
+ _httpClient._returnConnection(this);
+ });
+ }
+ });
+ // TODO(ajohnsen): Can there be an error on dataDone?
+ return incoming;
+ })
+ // If we see a state error, we failed to get the 'first' element.
+ // Transform the error to a HttpParserException, for consistency.
+ .catchError((error) {
+ throw new HttpParserException(
+ "Connection closed before data was received");
+ }, test: (error) => error is StateError)
+ .catchError((error) {
+ // We are done with the socket.
+ destroy();
+ throw error;
+ });
+ });
}
- // Delegate functions for the HttpInputStream implementation.
- int _streamAvailable() {
- return _buffer.length;
+ Future<Socket> detachSocket() {
+ return _writeDoneFuture.then((_) =>
+ new _DetachedSocket(_socket, _httpParser.detachIncoming()));
}
- List<int> _streamRead(int bytesToRead) {
- return _buffer.readBytes(bytesToRead);
+ void destroy() {
+ _socket.destroy();
+ _httpClient._connectionClosed(this);
}
- int _streamReadInto(List<int> buffer, int offset, int len) {
- List<int> data = _buffer.readBytes(len);
- buffer.setRange(offset, data.length, data);
- return data.length;
+ void close() {
+ var future = _writeDoneFuture;
+ if (future == null) future = new Future.immediate(null);
+ _httpClient._connectionClosed(this);
+ future.then((_) {
+ _socket.close();
+ // TODO(ajohnsen): Add timeout.
+ // Delay destroy until socket is actually done writing.
+ _socket.done.then((_) => _socket.destroy(),
+ onError: (_) => _socket.destroy());
+ });
}
- void _streamSetErrorHandler(callback(e)) {
- _streamErrorHandler = callback;
- }
+ HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
+}
- int _statusCode;
- String _reasonPhrase;
+class _ConnnectionInfo {
+ _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy);
+ final _HttpClientConnection connection;
+ final _Proxy proxy;
+}
- _HttpClientConnection _connection;
- _HttpInputStream _inputStream;
- _BufferList _buffer;
- Function _streamErrorHandler;
-}
+class _HttpClient implements HttpClient {
+ // TODO(ajohnsen): Use eviction timeout.
+ static const int DEFAULT_EVICTION_TIMEOUT = 60000;
+ bool _closing = false;
+ final Map<String, Queue<_HttpClientConnection>> _idleConnections
+ = new Map<String, Queue<_HttpClientConnection>>();
+ final Set<_HttpClientConnection> _activeConnections
+ = new Set<_HttpClientConnection>();
+ final List<_Credentials> _credentials = [];
+ Function _authenticate;
+ Function _findProxy;
-class _HttpClientConnection
- extends _HttpConnectionBase implements HttpClientConnection {
+ Future<HttpClientRequest> open(String method,
+ String host,
+ int port,
+ String path) {
+ // TODO(sgjesse): The path set here can contain both query and
+ // fragment. They should be cracked and set correctly.
+ return _openUrl(method, new Uri.fromComponents(
+ scheme: "http", domain: host, port: port, path: path));
+ }
- _HttpClientConnection(_HttpClient this._client) {
- _httpParser = new _HttpParser.responseParser();
+ Future<HttpClientRequest> openUrl(String method, Uri url) {
+ return _openUrl(method, url);
}
- bool _write(List<int> data, [bool copyBuffer = false]) {
- return _socket.outputStream.write(data, copyBuffer);
+ Future<HttpClientRequest> get(String host,
+ int port,
+ String path) {
+ return open("get", host, port, path);
}
- bool _writeFrom(List<int> data, [int offset, int len]) {
- return _socket.outputStream.writeFrom(data, offset, len);
+ Future<HttpClientRequest> getUrl(Uri url) {
+ return _openUrl("get", url);
}
- bool _flush() {
- _socket.outputStream.flush();
+ Future<HttpClientRequest> post(String host,
+ int port,
+ String path) {
+ return open("post", host, port, path);
}
- bool _close() {
- _socket.outputStream.close();
+ Future<HttpClientRequest> postUrl(Uri url) {
+ return _openUrl("post", url);
}
- bool _destroy() {
- _socket.close();
+ void close({bool force: false}) {
+ _closing = true;
+ // Create flattened copy of _idleConnections, as 'destory' will manipulate
+ // it.
+ var idle = _idleConnections.values.reduce(
+ [],
+ (l, e) {
+ l.addAll(e);
+ return l;
+ });
+ idle.forEach((e) {
+ e.close();
+ });
+ assert(_idleConnections.isEmpty);
+ if (force) {
+ for (var connection in _activeConnections.toList()) {
+ connection.destroy();
+ }
+ assert(_activeConnections.isEmpty);
+ _activeConnections.clear();
+ }
}
- DetachedSocket _detachSocket() {
- _socket.onData = null;
- _socket.onClosed = null;
- _socket.onError = null;
- _socket.outputStream.onNoPendingWrites = null;
- Socket socket = _socket;
- _socket = null;
- if (onDetach != null) onDetach();
- return new _DetachedSocket(socket, _httpParser.readUnparsedData());
+ set authenticate(Future<bool> f(Uri url, String scheme, String realm)) {
+ _authenticate = f;
}
- void _connectionEstablished(_SocketConnection socketConn) {
- super._connectionEstablished(socketConn._socket);
- _socketConn = socketConn;
- // Register HTTP parser callbacks.
- _httpParser.responseStart = _onResponseReceived;
- _httpParser.dataReceived = _onDataReceived;
- _httpParser.dataEnd = _onDataEnd;
- _httpParser.error = _onError;
- _httpParser.closed = _onClosed;
- _httpParser.requestStart = (method, uri, version) { assert(false); };
- _state = _HttpConnectionBase.ACTIVE;
+ void addCredentials(Uri url, String realm, HttpClientCredentials cr) {
+ _credentials.add(new _Credentials(url, realm, cr));
}
- void _checkSocketDone() {
- if (_isAllDone) {
- // If we are done writing the response, and either the server
- // has closed or the connection is not persistent, we must
- // close.
- if (_isReadClosed || !_response.persistentConnection) {
- this.onClosed = () {
- _client._closedSocketConnection(_socketConn);
- };
- _client._closeQueue.add(this);
- } else if (_socket != null) {
- _client._returnSocketConnection(_socketConn);
- _socket = null;
- _socketConn = null;
- assert(_pendingRedirect == null || _pendingRetry == null);
- if (_pendingRedirect != null) {
- _doRedirect(_pendingRedirect);
- _pendingRedirect = null;
- } else if (_pendingRetry != null) {
- _doRetry(_pendingRetry);
- _pendingRetry = null;
- }
+ set findProxy(String f(Uri uri)) => _findProxy = f;
+
+ Future<HttpClientRequest> _openUrl(String method, Uri uri) {
+ if (method == null) {
+ throw new ArgumentError(method);
+ }
+ if (uri.domain.isEmpty || (uri.scheme != "http" && uri.scheme != "https")) {
+ throw new ArgumentError("Unsupported scheme '${uri.scheme}' in $uri");
+ }
+
+ bool isSecure = (uri.scheme == "https");
+ int port = uri.port;
+ if (port == 0) {
+ port = isSecure ?
+ HttpClient.DEFAULT_HTTPS_PORT :
+ HttpClient.DEFAULT_HTTP_PORT;
+ }
+ // Check to see if a proxy server should be used for this connection.
+ var proxyConf = const _ProxyConfiguration.direct();
+ if (_findProxy != null) {
+ // TODO(sgjesse): Keep a map of these as normally only a few
+ // configuration strings will be used.
+ try {
+ proxyConf = new _ProxyConfiguration(_findProxy(uri));
+ } catch (error, stackTrace) {
+ return new Future.immediateError(error, stackTrace);
}
}
+ return _getConnection(uri.domain, port, proxyConf, isSecure).then((info) {
+ // Create new internal outgoing connection.
+ var outgoing = new _HttpOutgoing();
+ // Create new request object, wrapping the outgoing connection.
+ var request = new _HttpClientRequest(outgoing,
+ uri,
+ method.toUpperCase(),
+ !info.proxy.isDirect,
+ this,
+ info.connection);
+ request.headers.host = uri.domain;
+ request.headers.port = port;
+ if (uri.userInfo != null && !uri.userInfo.isEmpty) {
+ // If the URL contains user information use that for basic
+ // authorization
+ String auth =
+ CryptoUtils.bytesToBase64(_encodeString(uri.userInfo));
+ request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
+ } else {
+ // Look for credentials.
+ _Credentials cr = _findCredentials(uri);
+ if (cr != null) {
+ cr.authorize(request);
+ }
+ }
+ // Start sending the request (lazy, delayed until the user provides
+ // data).
+ info.connection._httpParser.responseToMethod = method;
+ info.connection.sendRequest(outgoing)
+ .then((incoming) {
+ // The full request have been sent and a response is received
+ // containing status-code, headers and etc.
+ request._onIncoming(incoming);
+ })
+ .catchError((error) {
+ // An error occoured before the http-header was parsed. This
+ // could be either a socket-error or parser-error.
+ request._onError(error);
+ });
+ // Return the request to the user. Immediate socket errors are not
+ // handled, thus forwarded to the user.
+ return request;
+ });
}
- void _requestClosed() {
- _state |= _HttpConnectionBase.REQUEST_DONE;
- _checkSocketDone();
+ Future<HttpClientRequest> _openUrlFromRequest(String method,
+ Uri uri,
+ _HttpClientRequest previous) {
+ return openUrl(method, uri).then((request) {
+ // Only follow redirects if initial request did.
+ request.followRedirects = previous.followRedirects;
+ // Allow same number of redirects.
+ request.maxRedirects = previous.maxRedirects;
+ // Copy headers
+ for (var header in previous.headers._headers.keys) {
+ if (request.headers[header] == null) {
+ request.headers.set(header, previous.headers[header]);
+ }
+ }
+ request.headers.chunkedTransferEncoding = false;
+ request.contentLength = 0;
+ return request;
+ });
}
- HttpClientRequest open(String method, Uri uri) {
- _method = method;
- // Tell the HTTP parser the method it is expecting a response to.
- _httpParser.responseToMethod = method;
- // If the connection already have a request this is a retry of a
- // request. In this case the request object is reused to ensure
- // that the same headers are send.
- if (_request != null) {
- _request._method = method;
- _request._uri = uri;
- _request._headers._mutable = true;
- _request._state = _HttpRequestResponseBase.START;
- } else {
- _request = new _HttpClientRequest(method, uri, this);
+ // Return a live connection to the idle pool.
+ void _returnConnection(_HttpClientConnection connection) {
+ _activeConnections.remove(connection);
+ if (_closing) {
+ connection.close();
+ return;
}
- _response = new _HttpClientResponse(this);
- return _request;
- }
-
- DetachedSocket detachSocket() {
- return _detachSocket();
+ // TODO(ajohnsen): Listen for socket close events.
+ if (!_idleConnections.containsKey(connection.key)) {
+ _idleConnections[connection.key] = new Queue();
+ }
+ _idleConnections[connection.key].addLast(connection);
}
- void _onClosed() {
- _state |= _HttpConnectionBase.READ_CLOSED;
- _checkSocketDone();
+ // Remove a closed connnection from the active set.
+ void _connectionClosed(_HttpClientConnection connection) {
+ _activeConnections.remove(connection);
+ if (_idleConnections.containsKey(connection.key)) {
+ _idleConnections[connection.key].remove(connection);
+ if (_idleConnections[connection.key].isEmpty) {
+ _idleConnections.remove(connection.key);
+ }
+ }
}
- void _onError(e) {
- // Cancel any pending data in the HTTP parser.
- _httpParser.cancel();
- if (_socketConn != null) {
- _client._closeSocketConnection(_socketConn);
+ // Get a new _HttpClientConnection, either from the idle pool or created from
+ // a new Socket.
+ Future<_ConnnectionInfo> _getConnection(String uriHost,
+ int uriPort,
+ _ProxyConfiguration proxyConf,
+ bool isSecure) {
+ Iterator<_Proxy> proxies = proxyConf.proxies.iterator;
+
+ Future<_ConnnectionInfo> connect(error) {
+ if (!proxies.moveNext()) return new Future.immediateError(error);
+ _Proxy proxy = proxies.current;
+ String host = proxy.isDirect ? uriHost: proxy.host;
+ int port = proxy.isDirect ? uriPort: proxy.port;
+ String key = isSecure ? "ssh:$host:$port" : "$host:$port";
+ if (_idleConnections.containsKey(key)) {
+ var connection = _idleConnections[key].removeFirst();
+ if (_idleConnections[key].isEmpty) {
+ _idleConnections.remove(key);
+ }
+ _activeConnections.add(connection);
+ return new Future.immediate(new _ConnnectionInfo(connection, proxy));
+ }
+ return (isSecure && proxy.isDirect
+ ? SecureSocket.connect(host,
+ port,
+ sendClientCertificate: true)
+ : Socket.connect(host, port))
+ .then((socket) {
+ var connection = new _HttpClientConnection(key, socket, this);
+ _activeConnections.add(connection);
+ return new _ConnnectionInfo(connection, proxy);
+ }, onError: (error) {
+ // Continue with next proxy.
+ return connect(error.error);
+ });
}
+ return connect(new HttpException("No proxies given"));
+ }
- // If it looks as if we got a bad connection from the connection
- // pool and the request can be retried do a retry.
- if (_socketConn != null && _socketConn._fromPool && _request._emptyBody) {
- String method = _request._method;
- Uri uri = _request._uri;
- _socketConn = null;
+ _Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) {
+ // Look for credentials.
+ _Credentials cr =
+ _credentials.reduce(null, (_Credentials prev, _Credentials value) {
+ if (value.applies(url, scheme)) {
+ if (prev == null) return value;
+ return value.uri.path.length > prev.uri.path.length ? value : prev;
+ } else {
+ return prev;
+ }
+ });
+ return cr;
+ }
- // Retry the URL using the same connection instance.
- _httpParser.restart();
- _client._openUrl(method, uri, this);
- } else {
- // Report the error.
- if (_response != null && _response._streamErrorHandler != null) {
- _response._streamErrorHandler(e);
- } else if (_onErrorCallback != null) {
- _onErrorCallback(e);
- } else {
- throw e;
- }
+ void _removeCredentials(_Credentials cr) {
+ int index = _credentials.indexOf(cr);
+ if (index != -1) {
+ _credentials.removeAt(index);
}
}
+}
- void _onResponseReceived(int statusCode,
- String reasonPhrase,
- String version,
- _HttpHeaders headers,
- bool hasBody) {
- _response._onResponseReceived(
- statusCode, reasonPhrase, version, headers, hasBody);
- }
- void _onDataReceived(List<int> data) {
- _response._onDataReceived(data);
+class _HttpConnection {
+ static const _ACTIVE = 0;
+ static const _IDLE = 1;
+ static const _CLOSING = 2;
+ static const _DETACHED = 3;
+
+ int _state = _IDLE;
+
+ final Socket _socket;
+ final _HttpServer _httpServer;
+ final _HttpParser _httpParser;
+ StreamSubscription _subscription;
+
+ Future _writeDoneFuture;
+
+ _HttpConnection(Socket this._socket, _HttpServer this._httpServer)
+ : _httpParser = new _HttpParser.requestParser() {
+ _socket.pipe(_httpParser);
+ _socket.done.catchError((e) => destroy());
+ _subscription = _httpParser.listen(
+ (incoming) {
+ // Only handle one incoming request at the time. Keep the
+ // stream paused until the request has been send.
+ _subscription.pause();
+ _state = _ACTIVE;
+ var outgoing = new _HttpOutgoing();
+ _writeDoneFuture = outgoing.stream.then(_socket.addStream);
+ var response = new _HttpResponse(
+ incoming.headers.protocolVersion,
+ outgoing);
+ var request = new _HttpRequest(response, incoming, _httpServer, this);
+ response._ignoreBody = request.method == "HEAD";
+ response._httpRequest = request;
+ outgoing.dataDone.then((_) {
+ if (_state == _DETACHED) return;
+ if (response.headers.persistentConnection &&
+ incoming.fullBodyRead) {
+ // Wait for the socket to be done with writing, before we
+ // continue.
+ _writeDoneFuture.then((_) {
+ _state = _IDLE;
+ // Resume the subscription for incoming requests as the
+ // request is now processed.
+ _subscription.resume();
+ });
+ } else {
+ // Close socket, keep-alive not used or body sent before received
+ // data was handled.
+ close();
+ }
+ }).catchError((e) {
+ close();
+ });
+ _httpServer._handleRequest(request);
+ },
+ onDone: () {
+ close();
+ },
+ onError: (error) {
+ _httpServer._handleError(error);
+ destroy();
+ });
}
- void _onDataEnd(bool close) {
- _state |= _HttpConnectionBase.RESPONSE_DONE;
- _response._onDataEnd();
- _checkSocketDone();
+ void destroy() {
+ if (_state == _CLOSING || _state == _DETACHED) return;
+ _state = _CLOSING;
+ _socket.destroy();
+ _httpServer._connectionClosed(this);
}
- void _onClientShutdown() {
- if (!_isResponseDone) {
- _onError(new HttpException("Client shutdown"));
- }
+ void close() {
+ if (_state == _CLOSING || _state == _DETACHED) return;
+ _state = _CLOSING;
+ var future = _writeDoneFuture;
+ if (future == null) future = new Future.immediate(null);
+ _httpServer._connectionClosed(this);
+ future.then((_) {
+ _socket.close();
+ // TODO(ajohnsen): Add timeout.
+ // Delay destroy until socket is actually done writing.
+ _socket.done.then((_) => _socket.destroy(),
+ onError: (_) => _socket.destroy());
+ });
}
- void set onRequest(void handler(HttpClientRequest request)) {
- _onRequest = handler;
+ Future<Socket> detachSocket() {
+ _state = _DETACHED;
+ // Remove connection from server.
+ _httpServer._connectionClosed(this);
+
+ _HttpDetachedIncoming detachedIncoming = _httpParser.detachIncoming();
+
+ return _writeDoneFuture.then((_) {
+ return new _DetachedSocket(_socket, detachedIncoming);
+ });
}
- void set onResponse(void handler(HttpClientResponse response)) {
- _onResponse = handler;
+ HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
+
+ bool get _isActive => _state == _ACTIVE;
+ bool get _isIdle => _state == _IDLE;
+ bool get _isClosing => _state == _CLOSING;
+ bool get _isDetached => _state == _DETACHED;
+}
+
+
+// HTTP server waiting for socket connections.
+class _HttpServer extends Stream<HttpRequest> implements HttpServer {
+
+ static Future<HttpServer> bind(String host, int port, int backlog) {
+ return ServerSocket.bind(host, port, backlog).then((socket) {
+ return new _HttpServer._(socket, true);
+ });
}
- void set onError(void callback(e)) {
- _onErrorCallback = callback;
+ static Future<HttpServer> bindSecure(String host,
+ int port,
+ int backlog,
+ String certificate_name,
+ bool requestClientCertificate) {
+ return SecureServerSocket.bind(
+ host,
+ port,
+ backlog,
+ certificate_name,
+ requestClientCertificate: requestClientCertificate)
+ .then((socket) {
+ return new _HttpServer._(socket, true);
+ });
}
- void _doRetry(_RedirectInfo retry) {
- assert(_socketConn == null);
+ _HttpServer._(this._serverSocket, this._closeServer);
- // Retry the URL using the same connection instance.
- _state = _HttpConnectionBase.IDLE;
- _client._openUrl(retry.method, retry.location, this);
+ _HttpServer.listenOn(ServerSocket this._serverSocket)
+ : _closeServer = false;
+
+ StreamSubscription<HttpRequest> listen(void onData(HttpRequest event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ _serverSocket.listen(
+ (Socket socket) {
+ // Accept the client connection.
+ _HttpConnection connection = new _HttpConnection(socket, this);
+ _connections.add(connection);
+ },
+ onError: _controller.signalError,
+ onDone: _controller.close);
+ return _controller.stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- void _retry() {
- var retry = new _RedirectInfo(_response.statusCode, _method, _request._uri);
- // The actual retry is postponed until both response and request
- // are done.
- if (_isAllDone) {
- _doRetry(retry);
- } else {
- // Prepare for retry.
- assert(_pendingRedirect == null);
- _pendingRetry = retry;
+ void close() {
+ closed = true;
+ if (_serverSocket != null && _closeServer) {
+ _serverSocket.close();
+ }
+ if (_sessionManagerInstance != null) {
+ _sessionManagerInstance.close();
+ _sessionManagerInstance = null;
+ }
+ for (_HttpConnection connection in _connections.toList()) {
+ connection.destroy();
}
+ _connections.clear();
}
- void _doRedirect(_RedirectInfo redirect) {
- assert(_socketConn == null);
-
- if (_redirects == null) {
- _redirects = new List<_RedirectInfo>();
- }
- _redirects.add(redirect);
- _doRetry(redirect);
+ int get port {
+ if (closed) throw new HttpException("HttpServer is not bound to a socket");
+ return _serverSocket.port;
}
- void redirect([String method, Uri url]) {
- if (method == null) method = _method;
- if (url == null) {
- url = Uri.parse(_response.headers.value(HttpHeaders.LOCATION));
- }
- // Always set the content length to 0 for redirects.
- var mutable = _request._headers._mutable;
- _request._headers._mutable = true;
- _request._headers.contentLength = 0;
- _request._headers._mutable = mutable;
- _request._bodyBytesWritten = 0;
- var redirect = new _RedirectInfo(_response.statusCode, method, url);
- // The actual redirect is postponed until both response and
- // request are done.
- assert(_pendingRetry == null);
- _pendingRedirect = redirect;
- }
-
- List<RedirectInfo> get redirects => _redirects;
-
- Function _onRequest;
- Function _onResponse;
- Function _onErrorCallback;
-
- _HttpClient _client;
- _SocketConnection _socketConn;
- HttpClientRequest _request;
- HttpClientResponse _response;
- String _method;
- bool _usingProxy;
-
- // Redirect handling
- bool followRedirects = true;
- int maxRedirects = 5;
- List<_RedirectInfo> _redirects;
- _RedirectInfo _pendingRedirect;
- _RedirectInfo _pendingRetry;
-
- // Callbacks.
- var requestReceived;
-}
+ set sessionTimeout(int timeout) {
+ _sessionManager.sessionTimeout = timeout;
+ }
+ void _handleRequest(HttpRequest request) {
+ _controller.add(request);
+ }
-// Class for holding keep-alive sockets in the cache for the HTTP
-// client together with the connection information.
-class _SocketConnection {
- _SocketConnection(String this._host,
- int this._port,
- Socket this._socket);
+ void _handleError(AsyncError error) {
+ if (!closed) _controller.signalError(error);
+ }
- void _markReturned() {
- // Any activity on the socket while waiting in the pool will
- // invalidate the connection os that it is not reused.
- _socket.onData = _invalidate;
- _socket.onClosed = _invalidate;
- _socket.onError = (_) => _invalidate();
- _returnTime = new DateTime.now();
- _httpClientConnection = null;
+ void _connectionClosed(_HttpConnection connection) {
+ _connections.remove(connection);
}
- void _markRetrieved() {
- _socket.onData = null;
- _socket.onClosed = null;
- _socket.onError = null;
- _httpClientConnection = null;
+ _HttpSessionManager get _sessionManager {
+ // Lazy init.
+ if (_sessionManagerInstance == null) {
+ _sessionManagerInstance = new _HttpSessionManager();
+ }
+ return _sessionManagerInstance;
}
- void _close() {
- _socket.onData = null;
- _socket.onClosed = null;
- _socket.onError = null;
- _httpClientConnection = null;
- _socket.close();
+ HttpConnectionsInfo connectionsInfo() {
+ HttpConnectionsInfo result = new HttpConnectionsInfo();
+ result.total = _connections.length;
+ _connections.forEach((_HttpConnection conn) {
+ if (conn._isActive) {
+ result.active++;
+ } else if (conn._isIdle) {
+ result.idle++;
+ } else {
+ assert(conn._isClosing);
+ result.closing++;
+ }
+ });
+ return result;
}
- Duration _idleTime(DateTime now) => now.difference(_returnTime);
+ _HttpSessionManager _sessionManagerInstance;
- bool get _fromPool => _returnTime != null;
+ // Indicated if the http server has been closed.
+ bool closed = false;
- void _invalidate() {
- _valid = false;
- _close();
- }
+ // The server listen socket.
+ final ServerSocket _serverSocket;
+ final bool _closeServer;
- int get hashCode => _socket.hashCode;
+ // Set of currently connected clients.
+ final Set<_HttpConnection> _connections = new Set<_HttpConnection>();
+ final StreamController<HttpRequest> _controller
+ = new StreamController<HttpRequest>();
- String _host;
- int _port;
- Socket _socket;
- DateTime _returnTime;
- bool _valid = true;
- HttpClientConnection _httpClientConnection;
+ // TODO(ajohnsen): Use close queue?
}
+
class _ProxyConfiguration {
static const String PROXY_PREFIX = "PROXY ";
static const String DIRECT_PREFIX = "DIRECT";
@@ -1844,6 +1485,7 @@ class _ProxyConfiguration {
final List<_Proxy> proxies;
}
+
class _Proxy {
const _Proxy(this.host, this.port) : isDirect = false;
const _Proxy.direct() : host = null, port = null, isDirect = true;
@@ -1853,376 +1495,59 @@ class _Proxy {
final bool isDirect;
}
-class _HttpClient implements HttpClient {
- static const int DEFAULT_EVICTION_TIMEOUT = 60000;
-
- _HttpClient() : _openSockets = new Map(),
- _activeSockets = new Set(),
- _closeQueue = new _CloseQueue(),
- credentials = new List<_Credentials>(),
- _shutdown = false;
-
- HttpClientConnection open(
- String method, String host, int port, String path) {
- // TODO(sgjesse): The path set here can contain both query and
- // fragment. They should be cracked and set correctly.
- return _open(method, new Uri.fromComponents(
- scheme: "http", domain: host, port: port, path: path));
- }
-
- HttpClientConnection _open(String method,
- Uri uri,
- [_HttpClientConnection connection]) {
- if (_shutdown) throw new HttpException("HttpClient shutdown");
- if (method == null || uri.domain.isEmpty) {
- throw new ArgumentError(null);
- }
- return _prepareHttpClientConnection(method, uri, connection);
- }
-
- HttpClientConnection openUrl(String method, Uri url) {
- return _openUrl(method, url);
- }
-
- HttpClientConnection _openUrl(String method,
- Uri url,
- [_HttpClientConnection connection]) {
- if (url.scheme != "http" && url.scheme != "https") {
- throw new HttpException("Unsupported URL scheme ${url.scheme}");
- }
- return _open(method, url, connection);
- }
-
- HttpClientConnection get(String host, int port, String path) {
- return open("GET", host, port, path);
- }
-
- HttpClientConnection getUrl(Uri url) => _openUrl("GET", url);
-
- HttpClientConnection post(String host, int port, String path) {
- return open("POST", host, port, path);
- }
-
- HttpClientConnection postUrl(Uri url) => _openUrl("POST", url);
-
- set authenticate(Future<bool> f(Uri url, String scheme, String realm)) {
- _authenticate = f;
- }
-
- void addCredentials(
- Uri url, String realm, HttpClientCredentials cr) {
- credentials.add(new _Credentials(url, realm, cr));
- }
-
- set sendClientCertificate(bool send) => _sendClientCertificate = send;
-
- set clientCertificate(String nickname) => _clientCertificate = nickname;
-
- set findProxy(String f(Uri uri)) => _findProxy = f;
-
- void shutdown({bool force: false}) {
- if (force) _closeQueue.shutdown();
- new Map.from(_openSockets).forEach(
- (String key, Queue<_SocketConnection> connections) {
- while (!connections.isEmpty) {
- _SocketConnection socketConn = connections.removeFirst();
- socketConn._socket.close();
- }
- });
- if (force) {
- _activeSockets.toList().forEach((_SocketConnection socketConn) {
- socketConn._httpClientConnection._onClientShutdown();
- socketConn._close();
- });
- }
- if (_evictionTimer != null) _cancelEvictionTimer();
- _shutdown = true;
- }
-
- void _cancelEvictionTimer() {
- _evictionTimer.cancel();
- _evictionTimer = null;
- }
-
- String _connectionKey(String host, int port) {
- return "$host:$port";
- }
-
- HttpClientConnection _prepareHttpClientConnection(
- String method,
- Uri url,
- [_HttpClientConnection connection]) {
-
- void _establishConnection(String host,
- int port,
- _ProxyConfiguration proxyConfiguration,
- int proxyIndex,
- bool reusedConnection,
- bool secure) {
-
- void _connectionOpened(_SocketConnection socketConn,
- _HttpClientConnection connection,
- bool usingProxy) {
- socketConn._httpClientConnection = connection;
- connection._usingProxy = usingProxy;
- connection._connectionEstablished(socketConn);
- HttpClientRequest request = connection.open(method, url);
- request.headers.host = host;
- request.headers.port = port;
- if (url.userInfo != null && !url.userInfo.isEmpty) {
- // If the URL contains user information use that for basic
- // authorization
- _UTF8Encoder encoder = new _UTF8Encoder();
- String auth =
- CryptoUtils.bytesToBase64(encoder.encodeString(url.userInfo));
- request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
- } else {
- // Look for credentials.
- _Credentials cr = _findCredentials(url);
- if (cr != null) {
- cr.authorize(request);
- }
- }
- // A reused connection is indicating either redirect or retry
- // where the onRequest callback should not be issued again.
- if (connection._onRequest != null && !reusedConnection) {
- connection._onRequest(request);
- } else {
- request.outputStream.close();
- }
- }
-
- assert(proxyIndex < proxyConfiguration.proxies.length);
-
- // Determine the actual host to connect to.
- String connectHost;
- int connectPort;
- _Proxy proxy = proxyConfiguration.proxies[proxyIndex];
- if (proxy.isDirect) {
- connectHost = host;
- connectPort = port;
- } else {
- connectHost = proxy.host;
- connectPort = proxy.port;
- }
-
- // If there are active connections for this key get the first one
- // otherwise create a new one.
- String key = _connectionKey(connectHost, connectPort);
- Queue socketConnections = _openSockets[key];
- // Remove active connections that are not valid any more or of
- // the wrong type (HTTP or HTTPS).
- if (socketConnections != null) {
- while (!socketConnections.isEmpty) {
- if (socketConnections.first._valid) {
- // If socket has the same properties, exit loop with found socket.
- var socket = socketConnections.first._socket;
- if (!secure && socket is! SecureSocket) break;
- if (secure && socket is SecureSocket &&
- _sendClientCertificate == socket.sendClientCertificate &&
- _clientCertificate == socket.certificateName) break;
- }
- socketConnections.removeFirst()._close();
- }
- }
- if (socketConnections == null || socketConnections.isEmpty) {
- Socket socket = secure && proxy.isDirect ?
- new SecureSocket(connectHost,
- connectPort,
- sendClientCertificate: _sendClientCertificate,
- certificateName: _clientCertificate) :
- new Socket(connectHost, connectPort);
- // Until the connection is established handle connection errors
- // here as the HttpClientConnection object is not yet associated
- // with the socket.
- socket.onError = (e) {
- proxyIndex++;
- if (proxyIndex < proxyConfiguration.proxies.length) {
- // Try the next proxy in the list.
- _establishConnection(
- host, port, proxyConfiguration, proxyIndex, false, secure);
- } else {
- // Report the error through the HttpClientConnection object to
- // the client.
- connection._onError(e);
- }
- };
- socket.onConnect = () {
- // When the connection is established, clear the error
- // callback as it will now be handled by the
- // HttpClientConnection object which will be associated with
- // the connected socket.
- socket.onError = null;
- _SocketConnection socketConn =
- new _SocketConnection(connectHost, connectPort, socket);
- _activeSockets.add(socketConn);
- _connectionOpened(socketConn, connection, !proxy.isDirect);
- };
- } else {
- _SocketConnection socketConn = socketConnections.removeFirst();
- socketConn._markRetrieved();
- _activeSockets.add(socketConn);
- Timer.run(() =>
- _connectionOpened(socketConn, connection, !proxy.isDirect));
-
- // Get rid of eviction timer if there are no more active connections.
- if (socketConnections.isEmpty) _openSockets.remove(key);
- if (_openSockets.isEmpty) _cancelEvictionTimer();
- }
- }
-
- // Find out if we want a secure socket.
- bool is_secure = (url.scheme == "https");
-
- // Find the TCP host and port.
- String host = url.domain;
- int port = url.port;
- if (port == 0) {
- port = is_secure ?
- HttpClient.DEFAULT_HTTPS_PORT :
- HttpClient.DEFAULT_HTTP_PORT;
- }
- // Create a new connection object if we are not re-using an existing one.
- var reusedConnection = false;
- if (connection == null) {
- connection = new _HttpClientConnection(this);
- } else {
- reusedConnection = true;
- }
- connection.onDetach = () => _activeSockets.remove(connection._socketConn);
-
- // Check to see if a proxy server should be used for this connection.
- _ProxyConfiguration proxyConfiguration = const _ProxyConfiguration.direct();
- if (_findProxy != null) {
- // TODO(sgjesse): Keep a map of these as normally only a few
- // configuration strings will be used.
- proxyConfiguration = new _ProxyConfiguration(_findProxy(url));
- }
-
- // Establish the connection starting with the first proxy configured.
- _establishConnection(host,
- port,
- proxyConfiguration,
- 0,
- reusedConnection,
- is_secure);
- return connection;
+class _HttpConnectionInfo implements HttpConnectionInfo {
+ static _HttpConnectionInfo create(Socket socket) {
+ if (socket == null) return null;
+ try {
+ _HttpConnectionInfo info = new _HttpConnectionInfo._();
+ info.remoteHost = socket.remoteHost;
+ info.remotePort = socket.remotePort;
+ info.localPort = socket.port;
+ return info;
+ } catch (e) { }
+ return null;
}
- void _returnSocketConnection(_SocketConnection socketConn) {
- // If the HTTP client is being shutdown don't return the connection.
- if (_shutdown) {
- socketConn._close();
- return;
- };
-
- // Mark socket as returned to unregister from the old connection.
- socketConn._markReturned();
-
- String key = _connectionKey(socketConn._host, socketConn._port);
+ _HttpConnectionInfo._();
- // Get or create the connection list for this key.
- Queue sockets = _openSockets[key];
- if (sockets == null) {
- sockets = new Queue();
- _openSockets[key] = sockets;
- }
-
- // If there is currently no eviction timer start one.
- if (_evictionTimer == null) {
- void _handleEviction(Timer timer) {
- DateTime now = new DateTime.now();
- List<String> emptyKeys = new List<String>();
- _openSockets.forEach(
- (String key, Queue<_SocketConnection> connections) {
- // As returned connections are added at the head of the
- // list remove from the tail.
- while (!connections.isEmpty) {
- _SocketConnection socketConn = connections.last;
- if (socketConn._idleTime(now).inMilliseconds >
- DEFAULT_EVICTION_TIMEOUT) {
- connections.removeLast();
- socketConn._socket.close();
- if (connections.isEmpty) emptyKeys.add(key);
- } else {
- break;
- }
- }
- });
+ String remoteHost;
+ int remotePort;
+ int localPort;
+}
- // Remove the keys for which here are no more open connections.
- emptyKeys.forEach((String key) => _openSockets.remove(key));
- // If all connections where evicted cancel the eviction timer.
- if (_openSockets.isEmpty) _cancelEvictionTimer();
- }
- _evictionTimer = new Timer.repeating(const Duration(seconds: 10),
- _handleEviction);
- }
+class _DetachedSocket implements Socket {
+ final Stream<List<int>> _incoming;
+ final Socket _socket;
- // Return connection.
- _activeSockets.remove(socketConn);
- sockets.addFirst(socketConn);
- }
+ _DetachedSocket(this._socket, this._incoming);
- void _closeSocketConnection(_SocketConnection socketConn) {
- socketConn._close();
- _activeSockets.remove(socketConn);
+ StreamSubscription<List<int>> listen(void onData(List<int> event),
+ {void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError}) {
+ return _incoming.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
}
- void _closedSocketConnection(_SocketConnection socketConn) {
- _activeSockets.remove(socketConn);
+ Future<Socket> consume(Stream<List<int>> stream) {
+ return _socket.consume(stream);
}
- _Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) {
- // Look for credentials.
- _Credentials cr =
- credentials.reduce(null, (_Credentials prev, _Credentials value) {
- if (value.applies(url, scheme)) {
- if (prev == null) return value;
- return value.uri.path.length > prev.uri.path.length ? value : prev;
- } else {
- return prev;
- }
- });
- return cr;
+ Future<Socket> addStream(Stream<List<int>> stream) {
+ return _socket.addStream(stream);
}
- void _removeCredentials(_Credentials cr) {
- int index = credentials.indexOf(cr);
- if (index != -1) {
- credentials.removeAt(index);
- }
+ void addString(String string, [Encoding encoding = Encoding.UTF_8]) {
+ return _socket.addString(string, encoding);
}
- Function _onOpen;
- Map<String, Queue<_SocketConnection>> _openSockets;
- Set<_SocketConnection> _activeSockets;
- _CloseQueue _closeQueue;
- List<_Credentials> credentials;
- Timer _evictionTimer;
- Function _findProxy;
- Function _authenticate;
- bool _sendClientCertificate = false;
- String _clientCertificate;
- bool _shutdown; // Has this HTTP client been shutdown?
-}
-
-
-class _HttpConnectionInfo implements HttpConnectionInfo {
- String remoteHost;
- int remotePort;
- int localPort;
-}
-
-
-class _DetachedSocket implements DetachedSocket {
- _DetachedSocket(this._socket, this._unparsedData);
- Socket get socket => _socket;
- List<int> get unparsedData => _unparsedData;
- Socket _socket;
- List<int> _unparsedData;
+ void destroy() => _socket.destroy();
+ void add(List<int> data) => _socket.add(data);
+ Future<Socket> close() => _socket.close();
}
@@ -2300,10 +1625,8 @@ class _HttpClientBasicCredentials implements HttpClientBasicCredentials {
// Proxy-Authenticate headers, see
// http://tools.ietf.org/html/draft-reschke-basicauth-enc-06. For
// now always use UTF-8 encoding.
- _UTF8Encoder encoder = new _UTF8Encoder();
String auth =
- CryptoUtils.bytesToBase64(encoder.encodeString(
- "$username:$password"));
+ CryptoUtils.bytesToBase64(_encodeString("$username:$password"));
request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
}
@@ -2328,7 +1651,6 @@ class _HttpClientDigestCredentials implements HttpClientDigestCredentials {
}
-
class _RedirectInfo implements RedirectInfo {
const _RedirectInfo(int this.statusCode,
String this.method,
« no previous file with comments | « sdk/lib/io/http_headers.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698