Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index ede39a2418e518c3fc6b1ea5dba2681fca5107ac..1ed571eadda474cb4de614cdaa1f765847e3b1b9 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -4,520 +4,477 @@ |
part of dart.io; |
-// The close queue handles graceful closing of HTTP connections. When |
-// a connection is added to the queue it will enter a wait state |
-// waiting for all data written and possibly socket shutdown from |
-// peer. |
-class _CloseQueue { |
- _CloseQueue() : _q = new Set<_HttpConnectionBase>(); |
- |
- void add(_HttpConnectionBase connection) { |
- void closeIfDone() { |
- // When either the client has closed or all data has been |
- // written to the client we close the underlying socket |
- // completely. |
- if (connection._isWriteClosed || connection._isReadClosed) { |
- _q.remove(connection); |
- connection._socket.close(); |
- if (connection.onClosed != null) connection.onClosed(); |
- } |
- } |
+class _HttpIncoming |
+ extends Stream<List<int>> implements StreamSink<List<int>> { |
+ final int _transferLength; |
+ final Completer _dataCompleter = new Completer(); |
+ Stream<List<int>> _stream; |
- // If the connection is already fully closed don't insert it into |
- // the queue. |
- if (connection._isFullyClosed) { |
- connection._socket.close(); |
- if (connection.onClosed != null) connection.onClosed(); |
- return; |
- } |
+ bool fullBodyRead = false; |
- connection._state |= _HttpConnectionBase.CLOSING; |
- _q.add(connection); |
- |
- // If the output stream is not closed for writing, close it now and |
- // wait for callback when closed. |
- if (!connection._isWriteClosed) { |
- connection._socket.outputStream.close(); |
- connection._socket.outputStream.onClosed = () { |
- connection._state |= _HttpConnectionBase.WRITE_CLOSED; |
- closeIfDone(); |
- }; |
- } else { |
- connection._socket.outputStream.onClosed = () { assert(false); }; |
- } |
+ // Common properties. |
+ final _HttpHeaders headers; |
+ bool upgraded = false; |
- // If the request is not already fully read wait for the socket to close. |
- // As the _isReadClosed state from the HTTP request processing indicate |
- // that the response has been parsed this does not necesarily mean tha |
- // the socket is closed. |
- if (!connection._isReadClosed) { |
- connection._socket.onClosed = () { |
- connection._state |= _HttpConnectionBase.READ_CLOSED; |
- closeIfDone(); |
- }; |
- } |
+ // ClientResponse properties. |
+ int statusCode; |
+ String reasonPhrase; |
+ |
+ // Request properties. |
+ String method; |
+ Uri uri; |
- // Ignore any data on a socket in the close queue. |
- connection._socket.onData = connection._socket.read; |
+ // The transfer length if the length of the message body as it |
+ // appears in the message (RFC 2616 section 4.4). This can be -1 if |
+ // the length of the massage body is not known due to transfer |
+ // codings. |
+ int get transferLength => _transferLength; |
- // If an error occurs immediately close the socket. |
- connection._socket.onError = (e) { |
- connection._state |= _HttpConnectionBase.READ_CLOSED; |
- connection._state |= _HttpConnectionBase.WRITE_CLOSED; |
- closeIfDone(); |
- }; |
+ _HttpIncoming(_HttpHeaders this.headers, |
+ int this._transferLength, |
+ Stream<List<int>> this._stream) { |
} |
- void shutdown() { |
- _q.forEach((_HttpConnectionBase connection) { |
- connection._socket.close(); |
- }); |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _stream.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- final Set<_HttpConnectionBase> _q; |
+ // Is completed once all data have been received. |
+ Future get dataDone => _dataCompleter.future; |
+ |
+ void close() { |
+ fullBodyRead = true; |
+ _dataCompleter.complete(); |
+ } |
} |
+class _HttpInboundMessage extends Stream<List<int>> { |
+ final _HttpIncoming _incoming; |
+ List<Cookie> _cookies; |
-class _HttpRequestResponseBase { |
- static const int START = 0; |
- static const int HEADER_SENT = 1; |
- static const int DONE = 2; |
- static const int UPGRADED = 3; |
+ _HttpInboundMessage(_HttpIncoming this._incoming); |
- _HttpRequestResponseBase(_HttpConnectionBase this._httpConnection) |
- : _state = START, _headResponse = false; |
+ List<Cookie> get cookies { |
+ if (_cookies != null) return _cookies; |
+ return _cookies = headers._parseCookies(); |
+ } |
- int get contentLength => _headers.contentLength; |
- HttpHeaders get headers => _headers; |
+ HttpHeaders get headers => _incoming.headers; |
+ String get protocolVersion => headers.protocolVersion; |
+ int get contentLength => headers.contentLength; |
+ bool get persistentConnection => headers.persistentConnection; |
+} |
- bool get persistentConnection { |
- List<String> connection = headers[HttpHeaders.CONNECTION]; |
- if (_protocolVersion == "1.1") { |
- if (connection == null) return true; |
- return !headers[HttpHeaders.CONNECTION].any( |
- (value) => value.toLowerCase() == "close"); |
- } else { |
- if (connection == null) return false; |
- return headers[HttpHeaders.CONNECTION].any( |
- (value) => value.toLowerCase() == "keep-alive"); |
- } |
- } |
- X509Certificate get certificate { |
- var socket = _httpConnection._socket as SecureSocket; |
- return socket == null ? socket : socket.peerCertificate; |
- } |
+class _HttpRequest extends _HttpInboundMessage implements HttpRequest { |
+ final HttpResponse response; |
- void set persistentConnection(bool persistentConnection) { |
- if (_outputStream != null) throw new HttpException("Header already sent"); |
+ // Lazy initialized parsed query parameters. |
+ Map<String, String> _queryParameters; |
- // Determine the value of the "Connection" header. |
- headers.remove(HttpHeaders.CONNECTION, "close"); |
- headers.remove(HttpHeaders.CONNECTION, "keep-alive"); |
- if (_protocolVersion == "1.1" && !persistentConnection) { |
- headers.add(HttpHeaders.CONNECTION, "close"); |
- } else if (_protocolVersion == "1.0" && persistentConnection) { |
- headers.add(HttpHeaders.CONNECTION, "keep-alive"); |
- } |
- } |
+ final _HttpServer _httpServer; |
+ final _HttpConnection _httpConnection; |
- bool _write(List<int> data, bool copyBuffer) { |
- if (_headResponse) return true; |
- _ensureHeadersSent(); |
- bool allWritten = true; |
- if (data.length > 0) { |
- if (_headers.chunkedTransferEncoding) { |
- // Write chunk size if transfer encoding is chunked. |
- _writeHexString(data.length); |
- _writeCRLF(); |
- _httpConnection._write(data, copyBuffer); |
- allWritten = _writeCRLF(); |
- } else { |
- _updateContentLength(data.length); |
- allWritten = _httpConnection._write(data, copyBuffer); |
- } |
- } |
- return allWritten; |
- } |
- |
- bool _writeList(List<int> data, int offset, int count) { |
- if (_headResponse) return true; |
- _ensureHeadersSent(); |
- bool allWritten = true; |
- if (count > 0) { |
- if (_headers.chunkedTransferEncoding) { |
- // Write chunk size if transfer encoding is chunked. |
- _writeHexString(count); |
- _writeCRLF(); |
- _httpConnection._writeFrom(data, offset, count); |
- allWritten = _writeCRLF(); |
- } else { |
- _updateContentLength(count); |
- allWritten = _httpConnection._writeFrom(data, offset, count); |
- } |
- } |
- return allWritten; |
- } |
+ HttpSession _session; |
- bool _writeDone() { |
- bool allWritten = true; |
- if (_headers.chunkedTransferEncoding) { |
- // Terminate the content if transfer encoding is chunked. |
- allWritten = _httpConnection._write(_Const.END_CHUNKED); |
- } else { |
- if (!_headResponse && _bodyBytesWritten < _headers.contentLength) { |
- throw new HttpException("Sending less than specified content length"); |
+ _HttpRequest(_HttpResponse this.response, |
+ _HttpIncoming _incoming, |
+ _HttpServer this._httpServer, |
+ _HttpConnection this._httpConnection) |
+ : super(_incoming) { |
+ response.headers.persistentConnection = headers.persistentConnection; |
+ |
+ if (_httpServer._sessionManagerInstance != null) { |
+ // Map to session if exists. |
+ var sessionId = cookies.reduce(null, (last, cookie) { |
+ if (last != null) return last; |
+ return cookie.name.toUpperCase() == _DART_SESSION_ID ? |
+ cookie.value : null; |
+ }); |
+ if (sessionId != null) { |
+ _session = _httpServer._sessionManager.getSession(sessionId); |
+ if (_session != null) { |
+ _session._markSeen(); |
+ } |
} |
- assert(_headResponse || _bodyBytesWritten == _headers.contentLength); |
} |
- return allWritten; |
} |
- bool _writeHeaders() { |
- _headers._write(_httpConnection); |
- // Terminate header. |
- return _writeCRLF(); |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _incoming.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- bool _writeHexString(int x) { |
- final List<int> hexDigits = [0x30, 0x31, 0x32, 0x33, 0x34, |
- 0x35, 0x36, 0x37, 0x38, 0x39, |
- 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
- List<int> hex = new Uint8List(10); |
- int index = hex.length; |
- while (x > 0) { |
- index--; |
- hex[index] = hexDigits[x % 16]; |
- x = x >> 4; |
+ Map<String, String> get queryParameters { |
+ if (_queryParameters == null) { |
+ _queryParameters = _HttpUtils.splitQueryString(uri.query); |
} |
- return _httpConnection._writeFrom(hex, index, hex.length - index); |
+ return _queryParameters; |
} |
- bool _writeCRLF() { |
- final CRLF = const [_CharCode.CR, _CharCode.LF]; |
- return _httpConnection._write(CRLF); |
- } |
+ Uri get uri => _incoming.uri; |
- bool _writeSP() { |
- final SP = const [_CharCode.SP]; |
- return _httpConnection._write(SP); |
- } |
+ String get method => _incoming.method; |
- void _ensureHeadersSent() { |
- // Ensure that headers are written. |
- if (_state == START) { |
- _writeHeader(); |
- } |
- } |
- |
- void _updateContentLength(int bytes) { |
- if (_bodyBytesWritten + bytes > _headers.contentLength) { |
- throw new HttpException("Writing more than specified content length"); |
+ HttpSession get session { |
+ if (_session != null) { |
+ // It's already mapped, use it. |
+ return _session; |
} |
- _bodyBytesWritten += bytes; |
+ // Create session, store it in connection, and return. |
+ return _session = _httpServer._sessionManager.createSession(); |
} |
HttpConnectionInfo get connectionInfo => _httpConnection.connectionInfo; |
- bool get _done => _state == DONE; |
- |
- int _state; |
- bool _headResponse; |
- |
- _HttpConnectionBase _httpConnection; |
- _HttpHeaders _headers; |
- List<Cookie> _cookies; |
- String _protocolVersion = "1.1"; |
- |
- // Number of body bytes written. This is only actual body data not |
- // including headers or chunk information of using chinked transfer |
- // encoding. |
- int _bodyBytesWritten = 0; |
+ X509Certificate get certificate { |
+ Socket socket = _httpConnection._socket; |
+ if (socket is SecureSocket) return socket.peerCertificate; |
+ return null; |
+ } |
} |
-// Parsed HTTP request providing information on the HTTP headers. |
-class _HttpRequest extends _HttpRequestResponseBase implements HttpRequest { |
- _HttpRequest(_HttpConnection connection) : super(connection); |
- |
- String get method => _method; |
- String get uri => _uri; |
- String get path => _path; |
- String get queryString => _queryString; |
- Map get queryParameters => _queryParameters; |
- |
- List<Cookie> get cookies { |
- if (_cookies != null) return _cookies; |
- |
- // Parse a Cookie header value according to the rules in RFC 6265. |
- void _parseCookieString(String s) { |
- int index = 0; |
+class _HttpClientResponse |
+ extends _HttpInboundMessage implements HttpClientResponse { |
+ List<RedirectInfo> get redirects => _httpRequest._responseRedirects; |
- bool done() => index == s.length; |
+ // The HttpClient this response belongs to. |
+ final _HttpClient _httpClient; |
- void skipWS() { |
- while (!done()) { |
- if (s[index] != " " && s[index] != "\t") return; |
- index++; |
- } |
- } |
+ // The HttpClientRequest of this response. |
+ final _HttpClientRequest _httpRequest; |
- String parseName() { |
- int start = index; |
- while (!done()) { |
- if (s[index] == " " || s[index] == "\t" || s[index] == "=") break; |
- index++; |
- } |
- return s.substring(start, index).toLowerCase(); |
- } |
+ List<Cookie> _cookies; |
- String parseValue() { |
- int start = index; |
- while (!done()) { |
- if (s[index] == " " || s[index] == "\t" || s[index] == ";") break; |
- index++; |
- } |
- return s.substring(start, index).toLowerCase(); |
- } |
+ _HttpClientResponse(_HttpIncoming _incoming, |
+ _HttpClientRequest this._httpRequest, |
+ _HttpClient this._httpClient) |
+ : super(_incoming); |
- void expect(String expected) { |
- if (done()) { |
- throw new HttpException("Failed to parse header value [$s]"); |
- } |
- if (s[index] != expected) { |
- throw new HttpException("Failed to parse header value [$s]"); |
- } |
- index++; |
- } |
- |
- while (!done()) { |
- skipWS(); |
- if (done()) return; |
- String name = parseName(); |
- skipWS(); |
- expect("="); |
- skipWS(); |
- String value = parseValue(); |
- _cookies.add(new _Cookie(name, value)); |
- skipWS(); |
- if (done()) return; |
- expect(";"); |
- } |
- } |
+ int get statusCode => _incoming.statusCode; |
+ String get reasonPhrase => _incoming.reasonPhrase; |
+ List<Cookie> get cookies { |
+ if (_cookies != null) return _cookies; |
_cookies = new List<Cookie>(); |
- List<String> headerValues = headers["cookie"]; |
- if (headerValues != null) { |
- headerValues.forEach((headerValue) => _parseCookieString(headerValue)); |
+ List<String> values = headers["set-cookie"]; |
+ if (values != null) { |
+ values.forEach((value) { |
+ _cookies.add(new Cookie.fromSetCookieValue(value)); |
+ }); |
} |
return _cookies; |
} |
- InputStream get inputStream { |
- if (_inputStream == null) { |
- _inputStream = new _HttpInputStream(this); |
+ bool get isRedirect { |
+ if (_httpRequest.method == "GET" || _httpRequest.method == "HEAD") { |
+ return statusCode == HttpStatus.MOVED_PERMANENTLY || |
+ statusCode == HttpStatus.FOUND || |
+ statusCode == HttpStatus.SEE_OTHER || |
+ statusCode == HttpStatus.TEMPORARY_REDIRECT; |
+ } else if (_httpRequest.method == "POST") { |
+ return statusCode == HttpStatus.SEE_OTHER; |
} |
- return _inputStream; |
+ return false; |
} |
- String get protocolVersion => _protocolVersion; |
- |
- HttpSession session([init(HttpSession session)]) { |
- if (_session != null) { |
- // It's already mapped, use it. |
- return _session; |
+ Future<HttpClientResponse> redirect([String method, |
+ Uri url, |
+ bool followLoops]) { |
+ if (method == null) { |
+ // Set method as defined by RFC 2616 section 10.3.4. |
+ if (statusCode == HttpStatus.SEE_OTHER && _httpRequest.method == "POST") { |
+ method = "GET"; |
+ } else { |
+ method = _httpRequest.method; |
+ } |
} |
- // Create session, store it in connection, and return. |
- var sessionManager = _httpConnection._server._sessionManager; |
- return _session = sessionManager.createSession(init); |
- } |
- |
- void _onRequestReceived(String method, |
- String uri, |
- String version, |
- _HttpHeaders headers) { |
- _method = method; |
- _uri = uri; |
- _parseRequestUri(uri); |
- _headers = headers; |
- if (_httpConnection._server._sessionManagerInstance != null) { |
- // Map to session if exists. |
- var sessionId = cookies.reduce(null, (last, cookie) { |
- if (last != null) return last; |
- return cookie.name.toUpperCase() == _DART_SESSION_ID ? |
- cookie.value : null; |
- }); |
- if (sessionId != null) { |
- var sessionManager = _httpConnection._server._sessionManager; |
- _session = sessionManager.getSession(sessionId); |
- if (_session != null) { |
- _session._markSeen(); |
+ if (url == null) { |
+ String location = headers.value(HttpHeaders.LOCATION); |
+ if (location == null) { |
+ throw new StateError("Response has no Location header for redirect"); |
+ } |
+ url = Uri.parse(location); |
+ } |
+ if (followLoops != true) { |
+ for (var redirect in redirects) { |
+ if (redirect.location == url) { |
+ return new Future.immediateError( |
+ new RedirectLoopException(redirects)); |
} |
} |
} |
- |
- // Prepare for receiving data. |
- _buffer = new _BufferList(); |
+ return _httpClient._openUrlFromRequest(method, url, _httpRequest) |
+ .then((request) { |
+ request._responseRedirects.addAll(this.redirects); |
+ request._responseRedirects.add(new _RedirectInfo(statusCode, |
+ method, |
+ url)); |
+ return request.close(); |
+ }); |
} |
- void _onDataReceived(List<int> data) { |
- _buffer.add(data); |
- if (_inputStream != null) _inputStream._dataReceived(); |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _incoming.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- void _onDataEnd() { |
- if (_inputStream != null) { |
- _inputStream._closeReceived(); |
- } else { |
- inputStream._streamMarkedClosed = true; |
- } |
+ Future<Socket> detachSocket() { |
+ _httpClient._connectionClosed(_httpRequest._httpClientConnection); |
+ return _httpRequest._httpClientConnection.detachSocket(); |
} |
- // Escaped characters in uri are expected to have been parsed. |
- void _parseRequestUri(String uri) { |
- int position; |
- position = uri.indexOf("?", 0); |
- if (position == -1) { |
- _path = _HttpUtils.decodeUrlEncodedString(_uri); |
- _queryString = null; |
- _queryParameters = new Map(); |
- } else { |
- _path = _HttpUtils.decodeUrlEncodedString(_uri.substring(0, position)); |
- _queryString = _uri.substring(position + 1); |
- _queryParameters = _HttpUtils.splitQueryString(_queryString); |
- } |
- } |
- |
- // Delegate functions for the HttpInputStream implementation. |
- int _streamAvailable() { |
- return _buffer.length; |
- } |
+ HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
- List<int> _streamRead(int bytesToRead) { |
- return _buffer.readBytes(bytesToRead); |
+ bool get _shouldAuthenticate { |
+ // Only try to authenticate if there is a challenge in the response. |
+ List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; |
+ return statusCode == HttpStatus.UNAUTHORIZED && |
+ challenge != null && challenge.length == 1; |
} |
- int _streamReadInto(List<int> buffer, int offset, int len) { |
- List<int> data = _buffer.readBytes(len); |
- buffer.setRange(offset, data.length, data); |
- return data.length; |
- } |
+ Future<HttpClientResponse> _authenticate() { |
+ Future<HttpClientResponse> retryWithCredentials(_Credentials cr) { |
+ if (cr != null) { |
+ // TODO(sgjesse): Support digest. |
+ if (cr.scheme == _AuthenticationScheme.BASIC) { |
+ // Drain body and retry. |
+ return reduce(null, (x, y) {}).then((_) { |
+ return _httpClient._openUrlFromRequest(_httpRequest.method, |
+ _httpRequest.uri, |
+ _httpRequest) |
+ .then((request) => request.close()); |
+ }); |
+ } |
+ } |
- void _streamSetErrorHandler(callback(e)) { |
- _streamErrorHandler = callback; |
+ // Fall through to here to perform normal response handling if |
+ // there is no sensible authorization handling. |
+ return new Future.immediate(this); |
+ } |
+ |
+ List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; |
+ assert(challenge != null || challenge.length == 1); |
+ _HeaderValue header = |
+ new _HeaderValue.fromString(challenge[0], parameterSeparator: ","); |
+ _AuthenticationScheme scheme = |
+ new _AuthenticationScheme.fromString(header.value); |
+ String realm = header.parameters["realm"]; |
+ |
+ // See if any credentials are available. |
+ _Credentials cr = _httpClient._findCredentials(_httpRequest.uri, scheme); |
+ |
+ if (cr != null && !cr.used) { |
+ // If credentials found prepare for retrying the request. |
+ return retryWithCredentials(cr); |
+ } |
+ |
+ // Ask for more credentials if none found or the one found has |
+ // already been used. If it has already been used it must now be |
+ // invalid and is removed. |
+ if (cr != null) { |
+ _httpClient._removeCredentials(cr); |
+ cr = null; |
+ } |
+ if (_httpClient._authenticate != null) { |
+ Future authComplete = _httpClient._authenticate(_httpRequest.uri, |
+ scheme.toString(), |
+ realm); |
+ return authComplete.then((credsAvailable) { |
+ if (credsAvailable) { |
+ cr = _httpClient._findCredentials(_httpRequest.uri, scheme); |
+ return retryWithCredentials(cr); |
+ } else { |
+ // No credentials available, complete with original response. |
+ return this; |
+ } |
+ }); |
+ } |
+ // No credentials were found and the callback was not set. |
+ return new Future.immediate(this); |
} |
- |
- String _method; |
- String _uri; |
- String _path; |
- String _queryString; |
- Map<String, String> _queryParameters; |
- _HttpInputStream _inputStream; |
- _BufferList _buffer; |
- Function _streamErrorHandler; |
- _HttpSession _session; |
} |
-// HTTP response object for sending a HTTP response. |
-class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse { |
- _HttpResponse(_HttpConnection httpConnection) |
- : super(httpConnection), |
- _statusCode = HttpStatus.OK { |
- _headers = new _HttpHeaders(); |
- } |
+class _HttpOutboundMessage<T> extends IOSink { |
+ // Used to mark when the body should be written. This is used for HEAD |
+ // requests and in error handling. |
+ bool _ignoreBody = false; |
+ |
+ _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
+ : super(outgoing), |
+ _outgoing = outgoing, |
+ headers = new _HttpHeaders(protocolVersion); |
+ int get contentLength => headers.contentLength; |
void set contentLength(int contentLength) { |
- if (_state >= _HttpRequestResponseBase.HEADER_SENT) { |
- throw new HttpException("Header already sent"); |
- } |
- _headers.contentLength = contentLength; |
+ headers.contentLength = contentLength; |
} |
- int get statusCode => _statusCode; |
- void set statusCode(int statusCode) { |
- if (_outputStream != null) throw new HttpException("Header already sent"); |
- _statusCode = statusCode; |
+ bool get persistentConnection => headers.persistentConnection; |
+ bool set persistentConnection(bool p) { |
+ headers.persistentConnection = p; |
} |
- String get reasonPhrase => _findReasonPhrase(_statusCode); |
- void set reasonPhrase(String reasonPhrase) { |
- if (_outputStream != null) throw new HttpException("Header already sent"); |
- _reasonPhrase = reasonPhrase; |
+ Future<T> consume(Stream<List<int>> stream) { |
+ _writeHeaders(); |
+ if (_ignoreBody) return new Future.immediate(this); |
+ if (_chunked) { |
+ // Transform when chunked. |
+ stream = stream.transform(new _ChunkedTransformer()); |
+ } |
+ return super.consume(stream).then((_) => this); |
} |
- List<Cookie> get cookies { |
- if (_cookies == null) _cookies = new List<Cookie>(); |
- return _cookies; |
+ void add(List<int> data) { |
+ _writeHeaders(); |
+ if (_ignoreBody) return; |
+ if (_chunked) { |
+ _ChunkedTransformer._addChunk(data, super.add); |
+ } else { |
+ super.add(data); |
+ } |
} |
- OutputStream get outputStream { |
- if (_state >= _HttpRequestResponseBase.DONE) { |
- throw new HttpException("Response closed"); |
+ void close() { |
+ if (!_headersWritten && !_ignoreBody && headers.chunkedTransferEncoding) { |
+ // If no body was written, _ignoreBody is false (it's not a HEAD |
+ // request) and the content-length is unspecified, set contentLength to 0. |
+ headers.contentLength = 0; |
} |
- if (_outputStream == null) { |
- _outputStream = new _HttpOutputStream(this); |
+ _writeHeaders(); |
+ if (!_ignoreBody) { |
+ if (_chunked) { |
+ _ChunkedTransformer._addChunk([], super.add); |
+ } |
} |
- return _outputStream; |
+ super.close(); |
} |
- DetachedSocket detachSocket() { |
- if (_state >= _HttpRequestResponseBase.DONE) { |
- throw new HttpException("Response closed"); |
+ void _writeHeaders() { |
+ if (_headersWritten) return; |
+ bool _tmpIgnoreBody = _ignoreBody; |
+ _ignoreBody = false; |
+ _headersWritten = true; |
+ _writeHeader(); |
+ _ignoreBody = _tmpIgnoreBody; |
+ if (_ignoreBody) { |
+ super.close(); |
+ return; |
} |
- // Ensure that headers are written. |
- if (_state == _HttpRequestResponseBase.START) { |
- _writeHeader(); |
+ _chunked = headers.chunkedTransferEncoding; |
+ if (!_chunked) { |
+ _outgoing.setTransferLength(headers.contentLength); |
} |
- _state = _HttpRequestResponseBase.UPGRADED; |
- // Ensure that any trailing data is written. |
- _writeDone(); |
- // Indicate to the connection that the response handling is done. |
- return _httpConnection._detachSocket(); |
} |
- // Delegate functions for the HttpOutputStream implementation. |
- bool _streamWrite(List<int> buffer, bool copyBuffer) { |
- if (_done) throw new HttpException("Response closed"); |
- return _write(buffer, copyBuffer); |
+ void _writeHeader(); // TODO(ajohnsen): Better name. |
+ |
+ final _HttpHeaders headers; |
+ |
+ final _HttpOutgoing _outgoing; |
+ bool _headersWritten = false; |
+ bool _chunked = false; |
+} |
+ |
+ |
+class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
+ implements HttpResponse { |
+ int statusCode = 200; |
+ String _reasonPhrase; |
+ List<Cookie> _cookies; |
+ _HttpRequest _httpRequest; |
+ |
+ _HttpResponse(String protocolVersion, |
+ _HttpOutgoing _outgoing) |
+ : super(protocolVersion, _outgoing); |
+ |
+ List<Cookie> get cookies { |
+ if (_cookies == null) _cookies = new List<Cookie>(); |
+ return _cookies; |
} |
- bool _streamWriteFrom(List<int> buffer, int offset, int len) { |
- if (_done) throw new HttpException("Response closed"); |
- return _writeList(buffer, offset, len); |
+ String get reasonPhrase => _findReasonPhrase(statusCode); |
+ void set reasonPhrase(String reasonPhrase) { |
+ if (_headersWritten) throw new StateError("Header already sent"); |
+ _reasonPhrase = reasonPhrase; |
} |
- void _streamFlush() { |
- _httpConnection._flush(); |
+ Future<Socket> detachSocket() { |
+ if (_headersWritten) throw new StateError("Headers already sent"); |
+ _writeHeaders(); |
+ var future = _httpRequest._httpConnection.detachSocket(); |
+ // Close connection so the socket is 'free'. |
+ close(); |
+ return future; |
} |
- void _streamClose() { |
- _ensureHeadersSent(); |
- _state = _HttpRequestResponseBase.DONE; |
- // Stop tracking no pending write events. |
- _httpConnection._onNoPendingWrites = null; |
- // Ensure that any trailing data is written. |
- _writeDone(); |
- // Indicate to the connection that the response handling is done. |
- _httpConnection._responseClosed(); |
- if (_streamClosedHandler != null) { |
- Timer.run(_streamClosedHandler); |
+ HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
+ |
+ void _writeHeader() { |
+ writeSP() => add([_CharCode.SP]); |
+ writeCRLF() => add([_CharCode.CR, _CharCode.LF]); |
+ |
+ // Write status line. |
+ if (headers.protocolVersion == "1.1") { |
+ add(_Const.HTTP11); |
+ } else { |
+ add(_Const.HTTP10); |
} |
- } |
+ writeSP(); |
+ addString(statusCode.toString()); |
+ writeSP(); |
+ addString(reasonPhrase); |
+ writeCRLF(); |
- void _streamSetNoPendingWriteHandler(callback()) { |
- if (_state != _HttpRequestResponseBase.DONE) { |
- _httpConnection._onNoPendingWrites = callback; |
+ var session = _httpRequest._session; |
+ if (session != null && !session._destroyed) { |
+ // Mark as not new. |
+ session._isNew = false; |
+ // Make sure we only send the current session id. |
+ bool found = false; |
+ for (int i = 0; i < cookies.length; i++) { |
+ if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) { |
+ cookies[i].value = session.id; |
+ cookies[i].httpOnly = true; |
+ found = true; |
+ break; |
+ } |
+ } |
+ if (!found) { |
+ cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true); |
+ } |
+ } |
+ // Add all the cookies set to the headers. |
+ if (_cookies != null) { |
+ _cookies.forEach((cookie) { |
+ headers.add("set-cookie", cookie); |
+ }); |
} |
- } |
- void _streamSetClosedHandler(callback()) { |
- _streamClosedHandler = callback; |
- } |
+ headers._finalize(); |
- void _streamSetErrorHandler(callback(e)) { |
- _streamErrorHandler = callback; |
+ // Write headers. |
+ headers._write(this); |
+ writeCRLF(); |
} |
String _findReasonPhrase(int statusCode) { |
@@ -574,1231 +531,915 @@ class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse { |
default: return "Status $statusCode"; |
} |
} |
- |
- bool _writeHeader() { |
- List<int> data; |
- |
- // Write status line. |
- if (_protocolVersion == "1.1") { |
- _httpConnection._write(_Const.HTTP11); |
- } else { |
- _httpConnection._write(_Const.HTTP10); |
- } |
- _writeSP(); |
- data = _statusCode.toString().charCodes; |
- _httpConnection._write(data); |
- _writeSP(); |
- data = reasonPhrase.charCodes; |
- _httpConnection._write(data); |
- _writeCRLF(); |
- |
- var session = _httpConnection._request._session; |
- if (session != null && !session._destroyed) { |
- // Make sure we only send the current session id. |
- bool found = false; |
- for (int i = 0; i < cookies.length; i++) { |
- if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) { |
- cookies[i].value = session.id; |
- cookies[i].httpOnly = true; |
- found = true; |
- break; |
- } |
- } |
- if (!found) { |
- cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true); |
- } |
- } |
- // Add all the cookies set to the headers. |
- if (_cookies != null) { |
- _cookies.forEach((cookie) { |
- _headers.add("set-cookie", cookie); |
- }); |
- } |
- |
- // Write headers. |
- _headers._finalize(_protocolVersion); |
- bool allWritten = _writeHeaders(); |
- _state = _HttpRequestResponseBase.HEADER_SENT; |
- return allWritten; |
- } |
- |
- int _statusCode; // Response status code. |
- String _reasonPhrase; // Response reason phrase. |
- _HttpOutputStream _outputStream; |
- Function _streamClosedHandler; |
- Function _streamErrorHandler; |
} |
-class _HttpInputStream extends _BaseDataInputStream implements InputStream { |
- _HttpInputStream(_HttpRequestResponseBase this._requestOrResponse) { |
- _checkScheduleCallbacks(); |
- } |
- |
- int available() { |
- return _requestOrResponse._streamAvailable(); |
- } |
- |
- void pipe(OutputStream output, {bool close: true}) { |
- _pipe(this, output, close: close); |
- } |
- |
- List<int> _read(int bytesToRead) { |
- List<int> result = _requestOrResponse._streamRead(bytesToRead); |
- _checkScheduleCallbacks(); |
- return result; |
- } |
- |
- void set onError(void callback(e)) { |
- _requestOrResponse._streamSetErrorHandler(callback); |
- } |
- |
- int _readInto(List<int> buffer, int offset, int len) { |
- int result = _requestOrResponse._streamReadInto(buffer, offset, len); |
- _checkScheduleCallbacks(); |
- return result; |
- } |
- |
- void _close() { |
- // TODO(sgjesse): Handle this. |
- } |
- |
- void _dataReceived() { |
- super._dataReceived(); |
- } |
- |
- _HttpRequestResponseBase _requestOrResponse; |
-} |
- |
- |
-class _HttpOutputStream extends _BaseOutputStream implements OutputStream { |
- _HttpOutputStream(_HttpRequestResponseBase this._requestOrResponse); |
- |
- bool write(List<int> buffer, [bool copyBuffer = true]) { |
- return _requestOrResponse._streamWrite(buffer, copyBuffer); |
- } |
- |
- bool writeFrom(List<int> buffer, [int offset = 0, int len]) { |
- if (offset < 0 || offset >= buffer.length) throw new ArgumentError(); |
- len = len != null ? len : buffer.length - offset; |
- if (len < 0) throw new ArgumentError(); |
- return _requestOrResponse._streamWriteFrom(buffer, offset, len); |
- } |
- |
- void flush() { |
- _requestOrResponse._streamFlush(); |
- } |
- |
- void close() { |
- _requestOrResponse._streamClose(); |
- } |
- |
- bool get closed => _requestOrResponse._done; |
- |
- void destroy() { |
- throw "Not implemented"; |
- } |
- |
- void set onNoPendingWrites(void callback()) { |
- _requestOrResponse._streamSetNoPendingWriteHandler(callback); |
- } |
- |
- void set onClosed(void callback()) { |
- _requestOrResponse._streamSetClosedHandler(callback); |
- } |
- |
- void set onError(void callback(e)) { |
- _requestOrResponse._streamSetErrorHandler(callback); |
- } |
- |
- _HttpRequestResponseBase _requestOrResponse; |
-} |
- |
- |
-abstract class _HttpConnectionBase { |
- static const int IDLE = 0; |
- static const int ACTIVE = 1; |
- static const int CLOSING = 2; |
- static const int REQUEST_DONE = 4; |
- static const int RESPONSE_DONE = 8; |
- static const int ALL_DONE = REQUEST_DONE | RESPONSE_DONE; |
- static const int READ_CLOSED = 16; |
- static const int WRITE_CLOSED = 32; |
- static const int FULLY_CLOSED = READ_CLOSED | WRITE_CLOSED; |
- |
- _HttpConnectionBase() : hashCode = _nextHashCode { |
- _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF; |
- } |
- |
- bool get _isIdle => (_state & ACTIVE) == 0; |
- bool get _isActive => (_state & ACTIVE) == ACTIVE; |
- bool get _isClosing => (_state & CLOSING) == CLOSING; |
- bool get _isRequestDone => (_state & REQUEST_DONE) == REQUEST_DONE; |
- bool get _isResponseDone => (_state & RESPONSE_DONE) == RESPONSE_DONE; |
- bool get _isAllDone => (_state & ALL_DONE) == ALL_DONE; |
- bool get _isReadClosed => (_state & READ_CLOSED) == READ_CLOSED; |
- bool get _isWriteClosed => (_state & WRITE_CLOSED) == WRITE_CLOSED; |
- bool get _isFullyClosed => (_state & FULLY_CLOSED) == FULLY_CLOSED; |
- |
- void _connectionEstablished(Socket socket) { |
- _socket = socket; |
- // Register handlers for socket events. All socket events are |
- // passed to the HTTP parser. |
- _socket.onData = () { |
- List<int> buffer = _socket.read(); |
- if (buffer != null) { |
- _httpParser.streamData(buffer); |
- } |
- }; |
- _socket.onClosed = _httpParser.streamDone; |
- _socket.onError = _httpParser.streamError; |
- _socket.outputStream.onError = _httpParser.streamError; |
- } |
- |
- bool _write(List<int> data, [bool copyBuffer = false]); |
- bool _writeFrom(List<int> buffer, [int offset, int len]); |
- bool _flush(); |
- bool _close(); |
- bool _destroy(); |
- DetachedSocket _detachSocket(); |
- |
- HttpConnectionInfo get connectionInfo { |
- if (_socket == null) return null; |
- try { |
- _HttpConnectionInfo info = new _HttpConnectionInfo(); |
- info.remoteHost = _socket.remoteHost; |
- info.remotePort = _socket.remotePort; |
- info.localPort = _socket.port; |
- return info; |
- } catch (e) { } |
- return null; |
- } |
- |
- void set _onNoPendingWrites(void callback()) { |
- _socket.outputStream.onNoPendingWrites = callback; |
- } |
- |
- int _state = IDLE; |
+class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
+ implements HttpClientRequest { |
+ final String method; |
+ final Uri uri; |
+ final List<Cookie> cookies = new List<Cookie>(); |
- Socket _socket; |
- _HttpParser _httpParser; |
+ // The HttpClient this request belongs to. |
+ final _HttpClient _httpClient; |
+ final _HttpClientConnection _httpClientConnection; |
- // Callbacks. |
- Function onDetach; |
- Function onClosed; |
+ final Completer<HttpClientResponse> _responseCompleter |
+ = new Completer<HttpClientResponse>(); |
- // Hash code for HTTP connection. Currently this is just a counter. |
- final int hashCode; |
- static int _nextHashCode = 0; |
-} |
+ final bool _usingProxy; |
+ // TODO(ajohnsen): Get default value from client? |
+ bool _followRedirects = true; |
-// HTTP server connection over a socket. |
-class _HttpConnection extends _HttpConnectionBase { |
- _HttpConnection(HttpServer this._server) { |
- _httpParser = new _HttpParser.requestParser(); |
- // Register HTTP parser callbacks. |
- _httpParser.requestStart = _onRequestReceived; |
- _httpParser.dataReceived = _onDataReceived; |
- _httpParser.dataEnd = _onDataEnd; |
- _httpParser.error = _onError; |
- _httpParser.closed = _onClosed; |
- _httpParser.responseStart = (statusCode, reasonPhrase, version) { |
- assert(false); |
- }; |
- } |
- |
- void _bufferData(List<int> data, [bool copyBuffer = false]) { |
- if (_buffer == null) _buffer = new _BufferList(); |
- if (copyBuffer) data = data.getRange(0, data.length); |
- _buffer.add(data); |
- } |
- |
- void _writeBufferedResponse() { |
- if (_buffer != null) { |
- while (!_buffer.isEmpty) { |
- var data = _buffer.first; |
- _socket.outputStream.write(data, false); |
- _buffer.removeBytes(data.length); |
- } |
- _buffer = null; |
- } |
- } |
+ int _maxRedirects = 5; |
- bool _write(List<int> data, [bool copyBuffer = false]) { |
- if (_isRequestDone || !_hasBody || _httpParser.upgrade) { |
- return _socket.outputStream.write(data, copyBuffer); |
- } else { |
- _bufferData(data, copyBuffer); |
- return false; |
- } |
- } |
+ List<RedirectInfo> _responseRedirects = []; |
- bool _writeFrom(List<int> data, [int offset, int len]) { |
- if (_isRequestDone || !_hasBody || _httpParser.upgrade) { |
- return _socket.outputStream.writeFrom(data, offset, len); |
- } else { |
- if (offset == null) offset = 0; |
- if (len == null) len = buffer.length - offset; |
- _bufferData(data.getRange(offset, len), false); |
- return false; |
+ _HttpClientRequest(_HttpOutgoing outgoing, |
+ Uri this.uri, |
+ String this.method, |
+ bool this._usingProxy, |
+ _HttpClient this._httpClient, |
+ _HttpClientConnection this._httpClientConnection) |
+ : super("1.1", outgoing) { |
+ // GET and HEAD have 'content-length: 0' by default. |
+ if (method == "GET" || method == "HEAD") { |
+ contentLength = 0; |
} |
} |
- bool _flush() { |
- _socket.outputStream.flush(); |
- } |
- |
- bool _close() { |
- _socket.outputStream.close(); |
- } |
+ Future<HttpClientResponse> get response => _responseCompleter.future; |
- bool _destroy() { |
- _socket.close(); |
+ Future<HttpClientResponse> close() { |
+ super.close(); |
+ return response; |
} |
- void _onClosed() { |
- _state |= _HttpConnectionBase.READ_CLOSED; |
- _checkDone(); |
+ int get maxRedirects => _maxRedirects; |
+ void set maxRedirects(int maxRedirects) { |
+ if (_headersWritten) throw new StateError("Request already sent"); |
+ _maxRedirects = maxRedirects; |
} |
- DetachedSocket _detachSocket() { |
- _socket.onData = null; |
- _socket.onClosed = null; |
- _socket.onError = null; |
- _socket.outputStream.onNoPendingWrites = null; |
- _writeBufferedResponse(); |
- Socket socket = _socket; |
- _socket = null; |
- if (onDetach != null) onDetach(); |
- return new _DetachedSocket(socket, _httpParser.readUnparsedData()); |
+ bool get followRedirects => _followRedirects; |
+ void set followRedirects(bool followRedirects) { |
+ if (_headersWritten) throw new StateError("Request already sent"); |
+ _followRedirects = followRedirects; |
} |
- void _onError(e) { |
- // Don't report errors for a request parser when HTTP parser is in |
- // idle state. Clients can close the connection and cause a |
- // connection reset by peer error which is OK. |
- _onClosed(); |
- if (_state == _HttpConnectionBase.IDLE) return; |
+ HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; |
- // Propagate the error to the streams. |
- if (_request != null && |
- !_isRequestDone && |
- _request._streamErrorHandler != null) { |
- _request._streamErrorHandler(e); |
- } else if (_response != null && |
- !_isResponseDone && |
- _response._streamErrorHandler != null) { |
- _response._streamErrorHandler(e); |
- } else { |
- onError(e); |
- } |
- if (_socket != null) _socket.close(); |
- } |
- |
- void _onRequestReceived(String method, |
- String uri, |
- String version, |
- _HttpHeaders headers, |
- bool hasBody) { |
- _state = _HttpConnectionBase.ACTIVE; |
- // Create new request and response objects for this request. |
- _request = new _HttpRequest(this); |
- _response = new _HttpResponse(this); |
- _request._onRequestReceived(method, uri, version, headers); |
- _request._protocolVersion = version; |
- _response._protocolVersion = version; |
- _response._headResponse = method == "HEAD"; |
- _response.persistentConnection = _httpParser.persistentConnection; |
- _hasBody = hasBody; |
- if (onRequestReceived != null) { |
- onRequestReceived(_request, _response); |
- } |
- _checkDone(); |
- } |
- |
- void _onDataReceived(List<int> data) { |
- _request._onDataReceived(data); |
- _checkDone(); |
- } |
- |
- void _checkDone() { |
- if (_isReadClosed) { |
- // If the client closes the conversation is ended. |
- _server._closeQueue.add(this); |
- } else if (_isAllDone) { |
- // If we are done writing the response, and the connection is |
- // not persistent, we must close. Also if using HTTP 1.0 and the |
- // content length was not known we must close to indicate end of |
- // body. |
- bool close = |
- !_response.persistentConnection || |
- (_response._protocolVersion == "1.0" && _response.contentLength < 0); |
- _request = null; |
- _response = null; |
- if (close) { |
- _httpParser.cancel(); |
- _server._closeQueue.add(this); |
+ void _onIncoming(_HttpIncoming incoming) { |
+ var response = new _HttpClientResponse(incoming, |
+ this, |
+ _httpClient); |
+ Future<HttpClientResponse> future; |
+ if (followRedirects && response.isRedirect) { |
+ if (response.redirects.length < maxRedirects) { |
+ // Redirect and drain response. |
+ future = response.reduce(null, (x, y) {}) |
+ .then((_) => response.redirect()); |
} else { |
- _state = _HttpConnectionBase.IDLE; |
- } |
- } else if (_isResponseDone && _hasBody) { |
- // If the response is closed before the request is fully read |
- // close this connection. If there is buffered output |
- // (e.g. error response for invalid request where the server did |
- // not care to read the request body) this is send. |
- assert(!_isRequestDone); |
- _writeBufferedResponse(); |
- _httpParser.cancel(); |
- _server._closeQueue.add(this); |
- } |
- } |
- |
- void _onDataEnd(bool close) { |
- // Start sending queued response if any. |
- _state |= _HttpConnectionBase.REQUEST_DONE; |
- _writeBufferedResponse(); |
- _request._onDataEnd(); |
- _checkDone(); |
- } |
- |
- void _responseClosed() { |
- _state |= _HttpConnectionBase.RESPONSE_DONE; |
- } |
- |
- HttpServer _server; |
- HttpRequest _request; |
- HttpResponse _response; |
- bool _hasBody = false; |
- |
- // Buffer for data written before full response has been processed. |
- _BufferList _buffer; |
- |
- // Callbacks. |
- Function onRequestReceived; |
- Function onError; |
-} |
- |
- |
-class _RequestHandlerRegistration { |
- _RequestHandlerRegistration(Function this._matcher, Function this._handler); |
- Function _matcher; |
- Function _handler; |
-} |
- |
-// HTTP server waiting for socket connections. The connections are |
-// managed by the server and as requests are received the request. |
-// HTTPS connections are also supported, if the _HttpServer.httpsServer |
-// constructor is used and a certificate name is provided in listen, |
-// or a SecureServerSocket is provided to listenOn. |
-class _HttpServer implements HttpServer, HttpsServer { |
- _HttpServer() : this._internal(isSecure: false); |
- |
- _HttpServer.httpsServer() : this._internal(isSecure: true); |
- |
- _HttpServer._internal({ bool isSecure: false }) |
- : _secure = isSecure, |
- _connections = new Set<_HttpConnection>(), |
- _handlers = new List<_RequestHandlerRegistration>(), |
- _closeQueue = new _CloseQueue(); |
- |
- void listen(String host, |
- int port, |
- {int backlog: 128, |
- String certificate_name, |
- bool requestClientCertificate: false}) { |
- if (_secure) { |
- listenOn(new SecureServerSocket( |
- host, |
- port, |
- backlog, |
- certificate_name, |
- requestClientCertificate: requestClientCertificate)); |
- } else { |
- listenOn(new ServerSocket(host, port, backlog)); |
- } |
- _closeServer = true; |
- } |
- |
- void listenOn(ServerSocket serverSocket) { |
- if (_secure && serverSocket is! SecureServerSocket) { |
- throw new HttpException( |
- 'HttpsServer.listenOn was called with non-secure server socket'); |
- } else if (!_secure && serverSocket is SecureServerSocket) { |
- throw new HttpException( |
- 'HttpServer.listenOn was called with a secure server socket'); |
- } |
- void onConnection(Socket socket) { |
- // Accept the client connection. |
- _HttpConnection connection = new _HttpConnection(this); |
- connection._connectionEstablished(socket); |
- _connections.add(connection); |
- connection.onRequestReceived = _handleRequest; |
- connection.onClosed = () => _connections.remove(connection); |
- connection.onDetach = () => _connections.remove(connection); |
- connection.onError = (e) { |
- _connections.remove(connection); |
- if (_onError != null) { |
- _onError(e); |
- } else { |
- throw(e); |
- } |
- }; |
- } |
- serverSocket.onConnection = onConnection; |
- _server = serverSocket; |
- _closeServer = false; |
- } |
- |
- addRequestHandler(bool matcher(HttpRequest request), |
- void handler(HttpRequest request, HttpResponse response)) { |
- _handlers.add(new _RequestHandlerRegistration(matcher, handler)); |
- } |
- |
- void set defaultRequestHandler( |
- void handler(HttpRequest request, HttpResponse response)) { |
- _defaultHandler = handler; |
- } |
- |
- void close() { |
- _closeQueue.shutdown(); |
- if (_sessionManagerInstance != null) { |
- _sessionManagerInstance.close(); |
- _sessionManagerInstance = null; |
- } |
- if (_server != null && _closeServer) { |
- _server.close(); |
- } |
- _server = null; |
- for (_HttpConnection connection in _connections) { |
- connection._destroy(); |
- } |
- _connections.clear(); |
- } |
- |
- int get port { |
- if (_server == null) { |
- throw new HttpException("The HttpServer is not listening on a port."); |
- } |
- return _server.port; |
- } |
- |
- void set onError(void callback(e)) { |
- _onError = callback; |
- } |
- |
- set sessionTimeout(int timeout) { |
- _sessionManager.sessionTimeout = timeout; |
- } |
- |
- void _handleRequest(HttpRequest request, HttpResponse response) { |
- for (int i = 0; i < _handlers.length; i++) { |
- if (_handlers[i]._matcher(request)) { |
- Function handler = _handlers[i]._handler; |
- try { |
- handler(request, response); |
- } catch (e) { |
- if (_onError != null) { |
- _onError(e); |
- } else { |
- throw e; |
- } |
- } |
- return; |
+ // End with exception, too many redirects. |
+ future = response.reduce(null, (x, y) {}) |
+ .then((_) => new Future.immediateError( |
+ new RedirectLimitExceededException(response.redirects))); |
} |
- } |
- |
- if (_defaultHandler != null) { |
- _defaultHandler(request, response); |
+ } else if (response._shouldAuthenticate) { |
+ future = response._authenticate(); |
} else { |
- response.statusCode = HttpStatus.NOT_FOUND; |
- response.contentLength = 0; |
- response.outputStream.close(); |
- } |
- } |
- |
- _HttpSessionManager get _sessionManager { |
- // Lazy init. |
- if (_sessionManagerInstance == null) { |
- _sessionManagerInstance = new _HttpSessionManager(); |
- } |
- return _sessionManagerInstance; |
- } |
- |
- HttpConnectionsInfo connectionsInfo() { |
- HttpConnectionsInfo result = new HttpConnectionsInfo(); |
- result.total = _connections.length; |
- _connections.forEach((_HttpConnection conn) { |
- if (conn._isActive) { |
- result.active++; |
- } else if (conn._isIdle) { |
- result.idle++; |
- } else { |
- assert(result._isClosing); |
- result.closing++; |
- } |
- }); |
- return result; |
- } |
- |
- ServerSocket _server; // The server listen socket. |
- bool _closeServer = false; |
- bool _secure; |
- Set<_HttpConnection> _connections; // Set of currently connected clients. |
- List<_RequestHandlerRegistration> _handlers; |
- Object _defaultHandler; |
- Function _onError; |
- _CloseQueue _closeQueue; |
- _HttpSessionManager _sessionManagerInstance; |
-} |
- |
- |
-class _HttpClientRequest |
- extends _HttpRequestResponseBase implements HttpClientRequest { |
- _HttpClientRequest(String this._method, |
- Uri this._uri, |
- _HttpClientConnection connection) |
- : super(connection) { |
- _headers = new _HttpHeaders(); |
- _connection = connection; |
- // Default GET and HEAD requests to have no content. |
- if (_method == "GET" || _method == "HEAD") { |
- contentLength = 0; |
- } |
- } |
- |
- void set contentLength(int contentLength) { |
- if (_state >= _HttpRequestResponseBase.HEADER_SENT) { |
- throw new HttpException("Header already sent"); |
- } |
- _headers.contentLength = contentLength; |
- } |
- |
- List<Cookie> get cookies { |
- if (_cookies == null) _cookies = new List<Cookie>(); |
- return _cookies; |
- } |
- |
- OutputStream get outputStream { |
- if (_done) throw new HttpException("Request closed"); |
- if (_outputStream == null) { |
- _outputStream = new _HttpOutputStream(this); |
- } |
- return _outputStream; |
- } |
- |
- // Delegate functions for the HttpOutputStream implementation. |
- bool _streamWrite(List<int> buffer, bool copyBuffer) { |
- if (_done) throw new HttpException("Request closed"); |
- _emptyBody = _emptyBody && buffer.length == 0; |
- return _write(buffer, copyBuffer); |
- } |
- |
- bool _streamWriteFrom(List<int> buffer, int offset, int len) { |
- if (_done) throw new HttpException("Request closed"); |
- _emptyBody = _emptyBody && buffer.length == 0; |
- return _writeList(buffer, offset, len); |
- } |
- |
- void _streamFlush() { |
- _httpConnection._flush(); |
- } |
- |
- void _streamClose() { |
- _ensureHeadersSent(); |
- _state = _HttpRequestResponseBase.DONE; |
- // Stop tracking no pending write events. |
- _httpConnection._onNoPendingWrites = null; |
- // Ensure that any trailing data is written. |
- _writeDone(); |
- _connection._requestClosed(); |
- if (_streamClosedHandler != null) { |
- Timer.run(_streamClosedHandler); |
- } |
- } |
- |
- void _streamSetNoPendingWriteHandler(callback()) { |
- if (_state != _HttpRequestResponseBase.DONE) { |
- _httpConnection._onNoPendingWrites = callback; |
+ future = new Future<HttpClientResponse>.immediate(response); |
} |
+ future.then( |
+ (v) => _responseCompleter.complete(v), |
+ onError: (e) { |
+ _responseCompleter.completeError(e); |
+ }); |
} |
- void _streamSetClosedHandler(callback()) { |
- _streamClosedHandler = callback; |
- } |
- |
- void _streamSetErrorHandler(callback(e)) { |
- _streamErrorHandler = callback; |
+ void _onError(AsyncError error) { |
+ _responseCompleter.completeError(error); |
} |
void _writeHeader() { |
- List<int> data; |
+ writeSP() => add([_CharCode.SP]); |
+ writeCRLF() => add([_CharCode.CR, _CharCode.LF]); |
- // Write request line. |
- data = _method.toString().charCodes; |
- _httpConnection._write(data); |
- _writeSP(); |
+ addString(method); |
+ writeSP(); |
// Send the path for direct connections and the whole URL for |
// proxy connections. |
- if (!_connection._usingProxy) { |
- String path = _uri.path; |
+ if (!_usingProxy) { |
+ String path = uri.path; |
if (path.length == 0) path = "/"; |
- if (_uri.query != "") { |
- if (_uri.fragment != "") { |
- path = "${path}?${_uri.query}#${_uri.fragment}"; |
+ if (uri.query != "") { |
+ if (uri.fragment != "") { |
+ path = "${path}?${uri.query}#${uri.fragment}"; |
} else { |
- path = "${path}?${_uri.query}"; |
+ path = "${path}?${uri.query}"; |
} |
} |
- data = path.charCodes; |
+ addString(path); |
} else { |
- data = _uri.toString().charCodes; |
+ addString(uri.toString()); |
} |
- _httpConnection._write(data); |
- _writeSP(); |
- _httpConnection._write(_Const.HTTP11); |
- _writeCRLF(); |
+ writeSP(); |
+ add(_Const.HTTP11); |
+ writeCRLF(); |
// Add the cookies to the headers. |
- if (_cookies != null) { |
+ if (!cookies.isEmpty) { |
StringBuffer sb = new StringBuffer(); |
- for (int i = 0; i < _cookies.length; i++) { |
+ for (int i = 0; i < cookies.length; i++) { |
if (i > 0) sb.add("; "); |
- sb.add(_cookies[i].name); |
+ sb.add(cookies[i].name); |
sb.add("="); |
- sb.add(_cookies[i].value); |
+ sb.add(cookies[i].value); |
} |
- _headers.add("cookie", sb.toString()); |
+ headers.add("cookie", sb.toString()); |
} |
+ headers._finalize(); |
+ |
// Write headers. |
- _headers._finalize("1.1"); |
- _writeHeaders(); |
- _state = _HttpRequestResponseBase.HEADER_SENT; |
+ headers._write(this); |
+ writeCRLF(); |
} |
- |
- String _method; |
- Uri _uri; |
- _HttpClientConnection _connection; |
- _HttpOutputStream _outputStream; |
- Function _streamClosedHandler; |
- Function _streamErrorHandler; |
- bool _emptyBody = true; |
} |
-class _HttpClientResponse |
- extends _HttpRequestResponseBase |
- implements HttpClientResponse { |
- _HttpClientResponse(_HttpClientConnection connection) |
- : super(connection) { |
- _connection = connection; |
- } |
- int get statusCode => _statusCode; |
- String get reasonPhrase => _reasonPhrase; |
- |
- bool get isRedirect { |
- var method = _connection._request._method; |
- if (method == "GET" || method == "HEAD") { |
- return statusCode == HttpStatus.MOVED_PERMANENTLY || |
- statusCode == HttpStatus.FOUND || |
- statusCode == HttpStatus.SEE_OTHER || |
- statusCode == HttpStatus.TEMPORARY_REDIRECT; |
- } else if (method == "POST") { |
- return statusCode == HttpStatus.SEE_OTHER; |
+// Transformer that transforms data to HTTP Chunked Encoding. |
+class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> { |
+ final StreamController<List<int>> _controller |
+ = new StreamController<List<int>>(); |
+ |
+ Stream<List<int>> bind(Stream<List<int>> stream) { |
+ var subscription = stream.listen( |
+ (data) { |
+ if (data.length == 0) return; // Avoid close on 0-bytes payload. |
+ _addChunk(data, _controller.add); |
+ }, |
+ onDone: () { |
+ _addChunk([], _controller.add); |
+ _controller.close(); |
+ }); |
+ return _controller.stream; |
+ } |
+ |
+ static void _addChunk(List<int> data, void add(List<int> data)) { |
+ add(_chunkHeader(data.length)); |
+ if (data.length > 0) add(data); |
+ add(_chunkFooter); |
+ } |
+ |
+ static List<int> _chunkHeader(int length) { |
+ const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
+ 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
+ var header = []; |
+ if (length == 0) { |
+ header.add(hexDigits[length]); |
+ } else { |
+ while (length > 0) { |
+ header.insertRange(0, 1, hexDigits[length % 16]); |
+ length = length >> 4; |
+ } |
} |
- return false; |
+ header.add(_CharCode.CR); |
+ header.add(_CharCode.LF); |
+ return header; |
} |
- List<Cookie> get cookies { |
- if (_cookies != null) return _cookies; |
- _cookies = new List<Cookie>(); |
- List<String> values = _headers["set-cookie"]; |
- if (values != null) { |
- values.forEach((value) { |
- _cookies.add(new Cookie.fromSetCookieValue(value)); |
- }); |
- } |
- return _cookies; |
- } |
+ // Footer is just a CRLF. |
+ static List<int> get _chunkFooter => const [_CharCode.CR, _CharCode.LF]; |
+} |
- InputStream get inputStream { |
- if (_inputStream == null) { |
- _inputStream = new _HttpInputStream(this); |
- } |
- return _inputStream; |
+ |
+// Transformer that invokes [_onDone] when completed. |
+class _DoneTransformer implements StreamTransformer<List<int>, List<int>> { |
+ final StreamController<List<int>> _controller |
+ = new StreamController<List<int>>(); |
+ final Function _onDone; |
+ |
+ _DoneTransformer(this._onDone); |
+ |
+ Stream<List<int>> bind(Stream<List<int>> stream) { |
+ var subscription = stream.listen( |
+ _controller.add, |
+ onError: _controller.signalError, |
+ onDone: () { |
+ _onDone(); |
+ _controller.close(); |
+ }); |
+ return _controller.stream; |
} |
+} |
- void _onResponseReceived(int statusCode, |
- String reasonPhrase, |
- String version, |
- _HttpHeaders headers, |
- bool hasBody) { |
- _statusCode = statusCode; |
- _reasonPhrase = reasonPhrase; |
- _headers = headers; |
- |
- // Prepare for receiving data. |
- _buffer = new _BufferList(); |
- if (isRedirect && _connection.followRedirects) { |
- if (_connection._redirects == null || |
- _connection._redirects.length < _connection.maxRedirects) { |
- // Check the location header. |
- List<String> location = headers[HttpHeaders.LOCATION]; |
- if (location == null || location.length > 1) { |
- throw new RedirectException("Invalid redirect", |
- _connection._redirects); |
- } |
- // Check for redirect loop |
- if (_connection._redirects != null) { |
- Uri redirectUrl = Uri.parse(location[0]); |
- for (int i = 0; i < _connection._redirects.length; i++) { |
- if (_connection._redirects[i].location.toString() == |
- redirectUrl.toString()) { |
- throw new RedirectLoopException(_connection._redirects); |
+// Transformer that validates the data written. |
+class _DataValidatorTransformer |
+ implements StreamTransformer<List<int>, List<int>> { |
+ final StreamController<List<int>> _controller |
+ = new StreamController<List<int>>(); |
+ int _bytesWritten = 0; |
+ Completer _completer = new Completer(); |
+ |
+ int expectedTransferLength; |
+ |
+ _DataValidatorTransformer(); |
+ |
+ Future get validatorFuture => _completer.future; |
+ |
+ Stream<List<int>> bind(Stream<List<int>> stream) { |
+ var subscription; |
+ subscription = stream.listen( |
+ (data) { |
+ if (expectedTransferLength != null) { |
+ _bytesWritten += data.length; |
+ if (_bytesWritten > expectedTransferLength) { |
+ _controller.close(); |
+ subscription.cancel(); |
+ if (_completer != null) { |
+ _completer.completeError(new HttpException( |
+ "Content size exceeds specified contentLength. " |
+ "$_bytesWritten bytes written while expected " |
+ "$expectedTransferLength.")); |
+ _completer = null; |
+ } |
+ return; |
} |
} |
- } |
- if (!persistentConnection) { |
- throw new RedirectException( |
- "Non-persistent connections are currently not supported for " |
- "redirects", _connection._redirects); |
- } |
- // Drain body and redirect. |
- inputStream.onData = inputStream.read; |
- if (_statusCode == HttpStatus.SEE_OTHER && |
- _connection._method == "POST") { |
- _connection.redirect("GET"); |
- } else { |
- _connection.redirect(); |
- } |
- } else { |
- throw new RedirectLimitExceededException(_connection._redirects); |
- } |
- } else if (statusCode == HttpStatus.UNAUTHORIZED) { |
- _handleUnauthorized(); |
- } else if (_connection._onResponse != null) { |
- _connection._onResponse(this); |
- } |
+ _controller.add(data); |
+ }, |
+ onError: (error) { |
+ _controller.close(); |
+ if (_completer != null) { |
+ _completer.completeError(error); |
+ _completer = null; |
+ } |
+ }, |
+ onDone: () { |
+ _controller.close(); |
+ if (expectedTransferLength != null) { |
+ if (_bytesWritten < expectedTransferLength) { |
+ if (_completer != null) { |
+ _completer.completeError(new HttpException( |
+ "Content size below specified contentLength. " |
+ " $_bytesWritten bytes written while expected " |
+ "$expectedTransferLength.")); |
+ _completer = null; |
+ return; |
+ } |
+ } |
+ } |
+ if (_completer != null) { |
+ _completer.complete(this); |
+ _completer = null; |
+ } |
+ }, |
+ unsubscribeOnError: true); |
+ return _controller.stream; |
} |
+} |
- void _handleUnauthorized() { |
+// Extends StreamConsumer as this is an internal type, only used to pipe to. |
+class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> { |
+ final Completer _dataCompleter = new Completer(); |
+ final Completer _streamCompleter = new Completer(); |
+ final _DataValidatorTransformer _validator = new _DataValidatorTransformer(); |
- void retryRequest(_Credentials cr) { |
- if (cr != null) { |
- // Drain body and retry. |
- // TODO(sgjesse): Support digest. |
- if (cr.scheme == _AuthenticationScheme.BASIC) { |
- inputStream.onData = inputStream.read; |
- _connection._retry(); |
- return; |
- } |
- } |
+ // Future that completes when all data is written. |
+ Future get dataDone => _dataCompleter.future; |
- // Fall through to here to perform normal response handling if |
- // there is no sensible authorization handling. |
- if (_connection._onResponse != null) { |
- _connection._onResponse(this); |
- } |
- } |
+ // Future that completes with the Stream, once the _HttpClientConnection is |
+ // bound to one. |
+ Future<Stream<List<int>>> get stream => _streamCompleter.future; |
- // Only try to authenticate if there is a challenge in the response. |
- List<String> challenge = _headers[HttpHeaders.WWW_AUTHENTICATE]; |
- if (challenge != null && challenge.length == 1) { |
- _HeaderValue header = |
- new _HeaderValue.fromString(challenge[0], parameterSeparator: ","); |
- _AuthenticationScheme scheme = |
- new _AuthenticationScheme.fromString(header.value); |
- String realm = header.parameters["realm"]; |
- |
- // See if any credentials are available. |
- _Credentials cr = |
- _connection._client._findCredentials( |
- _connection._request._uri, scheme); |
- |
- // Ask for more credentials if none found or the one found has |
- // already been used. If it has already been used it must now be |
- // invalid and is removed. |
- if (cr == null || cr.used) { |
- if (cr != null) { |
- _connection._client._removeCredentials(cr); |
- } |
- cr = null; |
- if (_connection._client._authenticate != null) { |
- Future authComplete = |
- _connection._client._authenticate( |
- _connection._request._uri, scheme.toString(), realm); |
- authComplete.then((credsAvailable) { |
- if (credsAvailable) { |
- cr = _connection._client._findCredentials( |
- _connection._request._uri, scheme); |
- retryRequest(cr); |
- } else { |
- if (_connection._onResponse != null) { |
- _connection._onResponse(this); |
- } |
- } |
- }); |
- return; |
- } |
- } else { |
- // If credentials found prepare for retrying the request. |
- retryRequest(cr); |
- return; |
- } |
- } |
+ void setTransferLength(int transferLength) { |
+ _validator.expectedTransferLength = transferLength; |
+ } |
- // Fall through to here to perform normal response handling if |
- // there is no sensible authorization handling. |
- if (_connection._onResponse != null) { |
- _connection._onResponse(this); |
- } |
+ Future consume(Stream<List<int>> stream) { |
+ stream = stream.transform(_validator); |
+ _streamCompleter.complete(stream); |
+ _validator.validatorFuture.catchError((e) { |
+ _dataCompleter.completeError(e); |
+ }); |
+ return _validator.validatorFuture.then((v) { |
+ _dataCompleter.complete(); |
+ return v; |
+ }); |
} |
+} |
- void _onDataReceived(List<int> data) { |
- _buffer.add(data); |
- if (_inputStream != null) _inputStream._dataReceived(); |
+ |
+class _HttpClientConnection { |
+ final String key; |
+ final Socket _socket; |
+ final _HttpParser _httpParser; |
+ StreamSubscription _subscription; |
+ final _HttpClient _httpClient; |
+ |
+ Completer<_HttpIncoming> _nextResponseCompleter; |
+ Future _writeDoneFuture; |
+ |
+ _HttpClientConnection(String this.key, |
+ Socket this._socket, |
+ _HttpClient this._httpClient) |
+ : _httpParser = new _HttpParser.responseParser() { |
+ _socket.pipe(_httpParser); |
+ _socket.done.catchError((e) { destroy(); }); |
+ |
+ // Set up handlers on the parser here, so we are sure to get 'onDone' from |
+ // the parser. |
+ _subscription = _httpParser.listen( |
+ (incoming) { |
+ // Only handle one incoming response at the time. Keep the |
+ // stream paused until the response have been processed. |
+ _subscription.pause(); |
+ // We assume the response is not here, until we have send the request. |
+ assert(_nextResponseCompleter != null); |
+ _nextResponseCompleter.complete(incoming); |
+ }, |
+ onError: (error) { |
+ if (_nextResponseCompleter != null) { |
+ _nextResponseCompleter.completeError(error); |
+ } |
+ }, |
+ onDone: () { |
+ close(); |
+ }); |
} |
- void _onDataEnd() { |
- if (_inputStream != null) { |
- _inputStream._closeReceived(); |
- } else { |
- inputStream._streamMarkedClosed = true; |
- } |
+ Future<_HttpIncoming> sendRequest(_HttpOutgoing outgoing) { |
+ return outgoing.stream |
+ .then((stream) { |
+ // Close socket if output data is invalid. |
+ outgoing.dataDone.catchError((e) { |
+ close(); |
+ }); |
+ // Sending request, set up response completer. |
+ _nextResponseCompleter = new Completer(); |
+ _writeDoneFuture = _socket.addStream(stream); |
+ // Listen for response. |
+ return _nextResponseCompleter.future |
+ .whenComplete(() { |
+ _nextResponseCompleter = null; |
+ }) |
+ .then((incoming) { |
+ incoming.dataDone.then((_) { |
+ if (!incoming.headers.persistentConnection) { |
+ close(); |
+ } else { |
+ // Wait for the socket to be done with writing, before we |
+ // continue. |
+ _writeDoneFuture.then((_) { |
+ _subscription.resume(); |
+ // Return connection, now we are done. |
+ _httpClient._returnConnection(this); |
+ }); |
+ } |
+ }); |
+ // TODO(ajohnsen): Can there be an error on dataDone? |
+ return incoming; |
+ }) |
+ // If we see a state error, we failed to get the 'first' element. |
+ // Transform the error to a HttpParserException, for consistency. |
+ .catchError((error) { |
+ throw new HttpParserException( |
+ "Connection closed before data was received"); |
+ }, test: (error) => error is StateError) |
+ .catchError((error) { |
+ // We are done with the socket. |
+ destroy(); |
+ throw error; |
+ }); |
+ }); |
} |
- // Delegate functions for the HttpInputStream implementation. |
- int _streamAvailable() { |
- return _buffer.length; |
+ Future<Socket> detachSocket() { |
+ return _writeDoneFuture.then((_) => |
+ new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
} |
- List<int> _streamRead(int bytesToRead) { |
- return _buffer.readBytes(bytesToRead); |
+ void destroy() { |
+ _socket.destroy(); |
+ _httpClient._connectionClosed(this); |
} |
- int _streamReadInto(List<int> buffer, int offset, int len) { |
- List<int> data = _buffer.readBytes(len); |
- buffer.setRange(offset, data.length, data); |
- return data.length; |
+ void close() { |
+ var future = _writeDoneFuture; |
+ if (future == null) future = new Future.immediate(null); |
+ _httpClient._connectionClosed(this); |
+ future.then((_) { |
+ _socket.close(); |
+ // TODO(ajohnsen): Add timeout. |
+ // Delay destroy until socket is actually done writing. |
+ _socket.done.then((_) => _socket.destroy(), |
+ onError: (_) => _socket.destroy()); |
+ }); |
} |
- void _streamSetErrorHandler(callback(e)) { |
- _streamErrorHandler = callback; |
- } |
+ HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
+} |
- int _statusCode; |
- String _reasonPhrase; |
+class _ConnnectionInfo { |
+ _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); |
+ final _HttpClientConnection connection; |
+ final _Proxy proxy; |
+} |
- _HttpClientConnection _connection; |
- _HttpInputStream _inputStream; |
- _BufferList _buffer; |
- Function _streamErrorHandler; |
-} |
+class _HttpClient implements HttpClient { |
+ // TODO(ajohnsen): Use eviction timeout. |
+ static const int DEFAULT_EVICTION_TIMEOUT = 60000; |
+ bool _closing = false; |
+ final Map<String, Queue<_HttpClientConnection>> _idleConnections |
+ = new Map<String, Queue<_HttpClientConnection>>(); |
+ final Set<_HttpClientConnection> _activeConnections |
+ = new Set<_HttpClientConnection>(); |
+ final List<_Credentials> _credentials = []; |
+ Function _authenticate; |
+ Function _findProxy; |
-class _HttpClientConnection |
- extends _HttpConnectionBase implements HttpClientConnection { |
+ Future<HttpClientRequest> open(String method, |
+ String host, |
+ int port, |
+ String path) { |
+ // TODO(sgjesse): The path set here can contain both query and |
+ // fragment. They should be cracked and set correctly. |
+ return _openUrl(method, new Uri.fromComponents( |
+ scheme: "http", domain: host, port: port, path: path)); |
+ } |
- _HttpClientConnection(_HttpClient this._client) { |
- _httpParser = new _HttpParser.responseParser(); |
+ Future<HttpClientRequest> openUrl(String method, Uri url) { |
+ return _openUrl(method, url); |
} |
- bool _write(List<int> data, [bool copyBuffer = false]) { |
- return _socket.outputStream.write(data, copyBuffer); |
+ Future<HttpClientRequest> get(String host, |
+ int port, |
+ String path) { |
+ return open("get", host, port, path); |
} |
- bool _writeFrom(List<int> data, [int offset, int len]) { |
- return _socket.outputStream.writeFrom(data, offset, len); |
+ Future<HttpClientRequest> getUrl(Uri url) { |
+ return _openUrl("get", url); |
} |
- bool _flush() { |
- _socket.outputStream.flush(); |
+ Future<HttpClientRequest> post(String host, |
+ int port, |
+ String path) { |
+ return open("post", host, port, path); |
} |
- bool _close() { |
- _socket.outputStream.close(); |
+ Future<HttpClientRequest> postUrl(Uri url) { |
+ return _openUrl("post", url); |
} |
- bool _destroy() { |
- _socket.close(); |
+ void close({bool force: false}) { |
+ _closing = true; |
+ // Create flattened copy of _idleConnections, as 'destory' will manipulate |
+ // it. |
+ var idle = _idleConnections.values.reduce( |
+ [], |
+ (l, e) { |
+ l.addAll(e); |
+ return l; |
+ }); |
+ idle.forEach((e) { |
+ e.close(); |
+ }); |
+ assert(_idleConnections.isEmpty); |
+ if (force) { |
+ for (var connection in _activeConnections.toList()) { |
+ connection.destroy(); |
+ } |
+ assert(_activeConnections.isEmpty); |
+ _activeConnections.clear(); |
+ } |
} |
- DetachedSocket _detachSocket() { |
- _socket.onData = null; |
- _socket.onClosed = null; |
- _socket.onError = null; |
- _socket.outputStream.onNoPendingWrites = null; |
- Socket socket = _socket; |
- _socket = null; |
- if (onDetach != null) onDetach(); |
- return new _DetachedSocket(socket, _httpParser.readUnparsedData()); |
+ set authenticate(Future<bool> f(Uri url, String scheme, String realm)) { |
+ _authenticate = f; |
} |
- void _connectionEstablished(_SocketConnection socketConn) { |
- super._connectionEstablished(socketConn._socket); |
- _socketConn = socketConn; |
- // Register HTTP parser callbacks. |
- _httpParser.responseStart = _onResponseReceived; |
- _httpParser.dataReceived = _onDataReceived; |
- _httpParser.dataEnd = _onDataEnd; |
- _httpParser.error = _onError; |
- _httpParser.closed = _onClosed; |
- _httpParser.requestStart = (method, uri, version) { assert(false); }; |
- _state = _HttpConnectionBase.ACTIVE; |
+ void addCredentials(Uri url, String realm, HttpClientCredentials cr) { |
+ _credentials.add(new _Credentials(url, realm, cr)); |
} |
- void _checkSocketDone() { |
- if (_isAllDone) { |
- // If we are done writing the response, and either the server |
- // has closed or the connection is not persistent, we must |
- // close. |
- if (_isReadClosed || !_response.persistentConnection) { |
- this.onClosed = () { |
- _client._closedSocketConnection(_socketConn); |
- }; |
- _client._closeQueue.add(this); |
- } else if (_socket != null) { |
- _client._returnSocketConnection(_socketConn); |
- _socket = null; |
- _socketConn = null; |
- assert(_pendingRedirect == null || _pendingRetry == null); |
- if (_pendingRedirect != null) { |
- _doRedirect(_pendingRedirect); |
- _pendingRedirect = null; |
- } else if (_pendingRetry != null) { |
- _doRetry(_pendingRetry); |
- _pendingRetry = null; |
- } |
+ set findProxy(String f(Uri uri)) => _findProxy = f; |
+ |
+ Future<HttpClientRequest> _openUrl(String method, Uri uri) { |
+ if (method == null) { |
+ throw new ArgumentError(method); |
+ } |
+ if (uri.domain.isEmpty || (uri.scheme != "http" && uri.scheme != "https")) { |
+ throw new ArgumentError("Unsupported scheme '${uri.scheme}' in $uri"); |
+ } |
+ |
+ bool isSecure = (uri.scheme == "https"); |
+ int port = uri.port; |
+ if (port == 0) { |
+ port = isSecure ? |
+ HttpClient.DEFAULT_HTTPS_PORT : |
+ HttpClient.DEFAULT_HTTP_PORT; |
+ } |
+ // Check to see if a proxy server should be used for this connection. |
+ var proxyConf = const _ProxyConfiguration.direct(); |
+ if (_findProxy != null) { |
+ // TODO(sgjesse): Keep a map of these as normally only a few |
+ // configuration strings will be used. |
+ try { |
+ proxyConf = new _ProxyConfiguration(_findProxy(uri)); |
+ } catch (error, stackTrace) { |
+ return new Future.immediateError(error, stackTrace); |
} |
} |
+ return _getConnection(uri.domain, port, proxyConf, isSecure).then((info) { |
+ // Create new internal outgoing connection. |
+ var outgoing = new _HttpOutgoing(); |
+ // Create new request object, wrapping the outgoing connection. |
+ var request = new _HttpClientRequest(outgoing, |
+ uri, |
+ method.toUpperCase(), |
+ !info.proxy.isDirect, |
+ this, |
+ info.connection); |
+ request.headers.host = uri.domain; |
+ request.headers.port = port; |
+ if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
+ // If the URL contains user information use that for basic |
+ // authorization |
+ String auth = |
+ CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); |
+ request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
+ } else { |
+ // Look for credentials. |
+ _Credentials cr = _findCredentials(uri); |
+ if (cr != null) { |
+ cr.authorize(request); |
+ } |
+ } |
+ // Start sending the request (lazy, delayed until the user provides |
+ // data). |
+ info.connection._httpParser.responseToMethod = method; |
+ info.connection.sendRequest(outgoing) |
+ .then((incoming) { |
+ // The full request have been sent and a response is received |
+ // containing status-code, headers and etc. |
+ request._onIncoming(incoming); |
+ }) |
+ .catchError((error) { |
+ // An error occoured before the http-header was parsed. This |
+ // could be either a socket-error or parser-error. |
+ request._onError(error); |
+ }); |
+ // Return the request to the user. Immediate socket errors are not |
+ // handled, thus forwarded to the user. |
+ return request; |
+ }); |
} |
- void _requestClosed() { |
- _state |= _HttpConnectionBase.REQUEST_DONE; |
- _checkSocketDone(); |
+ Future<HttpClientRequest> _openUrlFromRequest(String method, |
+ Uri uri, |
+ _HttpClientRequest previous) { |
+ return openUrl(method, uri).then((request) { |
+ // Only follow redirects if initial request did. |
+ request.followRedirects = previous.followRedirects; |
+ // Allow same number of redirects. |
+ request.maxRedirects = previous.maxRedirects; |
+ // Copy headers |
+ for (var header in previous.headers._headers.keys) { |
+ if (request.headers[header] == null) { |
+ request.headers.set(header, previous.headers[header]); |
+ } |
+ } |
+ request.headers.chunkedTransferEncoding = false; |
+ request.contentLength = 0; |
+ return request; |
+ }); |
} |
- HttpClientRequest open(String method, Uri uri) { |
- _method = method; |
- // Tell the HTTP parser the method it is expecting a response to. |
- _httpParser.responseToMethod = method; |
- // If the connection already have a request this is a retry of a |
- // request. In this case the request object is reused to ensure |
- // that the same headers are send. |
- if (_request != null) { |
- _request._method = method; |
- _request._uri = uri; |
- _request._headers._mutable = true; |
- _request._state = _HttpRequestResponseBase.START; |
- } else { |
- _request = new _HttpClientRequest(method, uri, this); |
+ // Return a live connection to the idle pool. |
+ void _returnConnection(_HttpClientConnection connection) { |
+ _activeConnections.remove(connection); |
+ if (_closing) { |
+ connection.close(); |
+ return; |
} |
- _response = new _HttpClientResponse(this); |
- return _request; |
- } |
- |
- DetachedSocket detachSocket() { |
- return _detachSocket(); |
+ // TODO(ajohnsen): Listen for socket close events. |
+ if (!_idleConnections.containsKey(connection.key)) { |
+ _idleConnections[connection.key] = new Queue(); |
+ } |
+ _idleConnections[connection.key].addLast(connection); |
} |
- void _onClosed() { |
- _state |= _HttpConnectionBase.READ_CLOSED; |
- _checkSocketDone(); |
+ // Remove a closed connnection from the active set. |
+ void _connectionClosed(_HttpClientConnection connection) { |
+ _activeConnections.remove(connection); |
+ if (_idleConnections.containsKey(connection.key)) { |
+ _idleConnections[connection.key].remove(connection); |
+ if (_idleConnections[connection.key].isEmpty) { |
+ _idleConnections.remove(connection.key); |
+ } |
+ } |
} |
- void _onError(e) { |
- // Cancel any pending data in the HTTP parser. |
- _httpParser.cancel(); |
- if (_socketConn != null) { |
- _client._closeSocketConnection(_socketConn); |
+ // Get a new _HttpClientConnection, either from the idle pool or created from |
+ // a new Socket. |
+ Future<_ConnnectionInfo> _getConnection(String uriHost, |
+ int uriPort, |
+ _ProxyConfiguration proxyConf, |
+ bool isSecure) { |
+ Iterator<_Proxy> proxies = proxyConf.proxies.iterator; |
+ |
+ Future<_ConnnectionInfo> connect(error) { |
+ if (!proxies.moveNext()) return new Future.immediateError(error); |
+ _Proxy proxy = proxies.current; |
+ String host = proxy.isDirect ? uriHost: proxy.host; |
+ int port = proxy.isDirect ? uriPort: proxy.port; |
+ String key = isSecure ? "ssh:$host:$port" : "$host:$port"; |
+ if (_idleConnections.containsKey(key)) { |
+ var connection = _idleConnections[key].removeFirst(); |
+ if (_idleConnections[key].isEmpty) { |
+ _idleConnections.remove(key); |
+ } |
+ _activeConnections.add(connection); |
+ return new Future.immediate(new _ConnnectionInfo(connection, proxy)); |
+ } |
+ return (isSecure && proxy.isDirect |
+ ? SecureSocket.connect(host, |
+ port, |
+ sendClientCertificate: true) |
+ : Socket.connect(host, port)) |
+ .then((socket) { |
+ var connection = new _HttpClientConnection(key, socket, this); |
+ _activeConnections.add(connection); |
+ return new _ConnnectionInfo(connection, proxy); |
+ }, onError: (error) { |
+ // Continue with next proxy. |
+ return connect(error.error); |
+ }); |
} |
+ return connect(new HttpException("No proxies given")); |
+ } |
- // If it looks as if we got a bad connection from the connection |
- // pool and the request can be retried do a retry. |
- if (_socketConn != null && _socketConn._fromPool && _request._emptyBody) { |
- String method = _request._method; |
- Uri uri = _request._uri; |
- _socketConn = null; |
+ _Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) { |
+ // Look for credentials. |
+ _Credentials cr = |
+ _credentials.reduce(null, (_Credentials prev, _Credentials value) { |
+ if (value.applies(url, scheme)) { |
+ if (prev == null) return value; |
+ return value.uri.path.length > prev.uri.path.length ? value : prev; |
+ } else { |
+ return prev; |
+ } |
+ }); |
+ return cr; |
+ } |
- // Retry the URL using the same connection instance. |
- _httpParser.restart(); |
- _client._openUrl(method, uri, this); |
- } else { |
- // Report the error. |
- if (_response != null && _response._streamErrorHandler != null) { |
- _response._streamErrorHandler(e); |
- } else if (_onErrorCallback != null) { |
- _onErrorCallback(e); |
- } else { |
- throw e; |
- } |
+ void _removeCredentials(_Credentials cr) { |
+ int index = _credentials.indexOf(cr); |
+ if (index != -1) { |
+ _credentials.removeAt(index); |
} |
} |
+} |
- void _onResponseReceived(int statusCode, |
- String reasonPhrase, |
- String version, |
- _HttpHeaders headers, |
- bool hasBody) { |
- _response._onResponseReceived( |
- statusCode, reasonPhrase, version, headers, hasBody); |
- } |
- void _onDataReceived(List<int> data) { |
- _response._onDataReceived(data); |
+class _HttpConnection { |
+ static const _ACTIVE = 0; |
+ static const _IDLE = 1; |
+ static const _CLOSING = 2; |
+ static const _DETACHED = 3; |
+ |
+ int _state = _IDLE; |
+ |
+ final Socket _socket; |
+ final _HttpServer _httpServer; |
+ final _HttpParser _httpParser; |
+ StreamSubscription _subscription; |
+ |
+ Future _writeDoneFuture; |
+ |
+ _HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
+ : _httpParser = new _HttpParser.requestParser() { |
+ _socket.pipe(_httpParser); |
+ _socket.done.catchError((e) => destroy()); |
+ _subscription = _httpParser.listen( |
+ (incoming) { |
+ // Only handle one incoming request at the time. Keep the |
+ // stream paused until the request has been send. |
+ _subscription.pause(); |
+ _state = _ACTIVE; |
+ var outgoing = new _HttpOutgoing(); |
+ _writeDoneFuture = outgoing.stream.then(_socket.addStream); |
+ var response = new _HttpResponse( |
+ incoming.headers.protocolVersion, |
+ outgoing); |
+ var request = new _HttpRequest(response, incoming, _httpServer, this); |
+ response._ignoreBody = request.method == "HEAD"; |
+ response._httpRequest = request; |
+ outgoing.dataDone.then((_) { |
+ if (_state == _DETACHED) return; |
+ if (response.headers.persistentConnection && |
+ incoming.fullBodyRead) { |
+ // Wait for the socket to be done with writing, before we |
+ // continue. |
+ _writeDoneFuture.then((_) { |
+ _state = _IDLE; |
+ // Resume the subscription for incoming requests as the |
+ // request is now processed. |
+ _subscription.resume(); |
+ }); |
+ } else { |
+ // Close socket, keep-alive not used or body sent before received |
+ // data was handled. |
+ close(); |
+ } |
+ }).catchError((e) { |
+ close(); |
+ }); |
+ _httpServer._handleRequest(request); |
+ }, |
+ onDone: () { |
+ close(); |
+ }, |
+ onError: (error) { |
+ _httpServer._handleError(error); |
+ destroy(); |
+ }); |
} |
- void _onDataEnd(bool close) { |
- _state |= _HttpConnectionBase.RESPONSE_DONE; |
- _response._onDataEnd(); |
- _checkSocketDone(); |
+ void destroy() { |
+ if (_state == _CLOSING || _state == _DETACHED) return; |
+ _state = _CLOSING; |
+ _socket.destroy(); |
+ _httpServer._connectionClosed(this); |
} |
- void _onClientShutdown() { |
- if (!_isResponseDone) { |
- _onError(new HttpException("Client shutdown")); |
- } |
+ void close() { |
+ if (_state == _CLOSING || _state == _DETACHED) return; |
+ _state = _CLOSING; |
+ var future = _writeDoneFuture; |
+ if (future == null) future = new Future.immediate(null); |
+ _httpServer._connectionClosed(this); |
+ future.then((_) { |
+ _socket.close(); |
+ // TODO(ajohnsen): Add timeout. |
+ // Delay destroy until socket is actually done writing. |
+ _socket.done.then((_) => _socket.destroy(), |
+ onError: (_) => _socket.destroy()); |
+ }); |
} |
- void set onRequest(void handler(HttpClientRequest request)) { |
- _onRequest = handler; |
+ Future<Socket> detachSocket() { |
+ _state = _DETACHED; |
+ // Remove connection from server. |
+ _httpServer._connectionClosed(this); |
+ |
+ _HttpDetachedIncoming detachedIncoming = _httpParser.detachIncoming(); |
+ |
+ return _writeDoneFuture.then((_) { |
+ return new _DetachedSocket(_socket, detachedIncoming); |
+ }); |
} |
- void set onResponse(void handler(HttpClientResponse response)) { |
- _onResponse = handler; |
+ HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
+ |
+ bool get _isActive => _state == _ACTIVE; |
+ bool get _isIdle => _state == _IDLE; |
+ bool get _isClosing => _state == _CLOSING; |
+ bool get _isDetached => _state == _DETACHED; |
+} |
+ |
+ |
+// HTTP server waiting for socket connections. |
+class _HttpServer extends Stream<HttpRequest> implements HttpServer { |
+ |
+ static Future<HttpServer> bind(String host, int port, int backlog) { |
+ return ServerSocket.bind(host, port, backlog).then((socket) { |
+ return new _HttpServer._(socket, true); |
+ }); |
} |
- void set onError(void callback(e)) { |
- _onErrorCallback = callback; |
+ static Future<HttpServer> bindSecure(String host, |
+ int port, |
+ int backlog, |
+ String certificate_name, |
+ bool requestClientCertificate) { |
+ return SecureServerSocket.bind( |
+ host, |
+ port, |
+ backlog, |
+ certificate_name, |
+ requestClientCertificate: requestClientCertificate) |
+ .then((socket) { |
+ return new _HttpServer._(socket, true); |
+ }); |
} |
- void _doRetry(_RedirectInfo retry) { |
- assert(_socketConn == null); |
+ _HttpServer._(this._serverSocket, this._closeServer); |
- // Retry the URL using the same connection instance. |
- _state = _HttpConnectionBase.IDLE; |
- _client._openUrl(retry.method, retry.location, this); |
+ _HttpServer.listenOn(ServerSocket this._serverSocket) |
+ : _closeServer = false; |
+ |
+ StreamSubscription<HttpRequest> listen(void onData(HttpRequest event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ _serverSocket.listen( |
+ (Socket socket) { |
+ // Accept the client connection. |
+ _HttpConnection connection = new _HttpConnection(socket, this); |
+ _connections.add(connection); |
+ }, |
+ onError: _controller.signalError, |
+ onDone: _controller.close); |
+ return _controller.stream.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- void _retry() { |
- var retry = new _RedirectInfo(_response.statusCode, _method, _request._uri); |
- // The actual retry is postponed until both response and request |
- // are done. |
- if (_isAllDone) { |
- _doRetry(retry); |
- } else { |
- // Prepare for retry. |
- assert(_pendingRedirect == null); |
- _pendingRetry = retry; |
+ void close() { |
+ closed = true; |
+ if (_serverSocket != null && _closeServer) { |
+ _serverSocket.close(); |
+ } |
+ if (_sessionManagerInstance != null) { |
+ _sessionManagerInstance.close(); |
+ _sessionManagerInstance = null; |
+ } |
+ for (_HttpConnection connection in _connections.toList()) { |
+ connection.destroy(); |
} |
+ _connections.clear(); |
} |
- void _doRedirect(_RedirectInfo redirect) { |
- assert(_socketConn == null); |
- |
- if (_redirects == null) { |
- _redirects = new List<_RedirectInfo>(); |
- } |
- _redirects.add(redirect); |
- _doRetry(redirect); |
+ int get port { |
+ if (closed) throw new HttpException("HttpServer is not bound to a socket"); |
+ return _serverSocket.port; |
} |
- void redirect([String method, Uri url]) { |
- if (method == null) method = _method; |
- if (url == null) { |
- url = Uri.parse(_response.headers.value(HttpHeaders.LOCATION)); |
- } |
- // Always set the content length to 0 for redirects. |
- var mutable = _request._headers._mutable; |
- _request._headers._mutable = true; |
- _request._headers.contentLength = 0; |
- _request._headers._mutable = mutable; |
- _request._bodyBytesWritten = 0; |
- var redirect = new _RedirectInfo(_response.statusCode, method, url); |
- // The actual redirect is postponed until both response and |
- // request are done. |
- assert(_pendingRetry == null); |
- _pendingRedirect = redirect; |
- } |
- |
- List<RedirectInfo> get redirects => _redirects; |
- |
- Function _onRequest; |
- Function _onResponse; |
- Function _onErrorCallback; |
- |
- _HttpClient _client; |
- _SocketConnection _socketConn; |
- HttpClientRequest _request; |
- HttpClientResponse _response; |
- String _method; |
- bool _usingProxy; |
- |
- // Redirect handling |
- bool followRedirects = true; |
- int maxRedirects = 5; |
- List<_RedirectInfo> _redirects; |
- _RedirectInfo _pendingRedirect; |
- _RedirectInfo _pendingRetry; |
- |
- // Callbacks. |
- var requestReceived; |
-} |
+ set sessionTimeout(int timeout) { |
+ _sessionManager.sessionTimeout = timeout; |
+ } |
+ void _handleRequest(HttpRequest request) { |
+ _controller.add(request); |
+ } |
-// Class for holding keep-alive sockets in the cache for the HTTP |
-// client together with the connection information. |
-class _SocketConnection { |
- _SocketConnection(String this._host, |
- int this._port, |
- Socket this._socket); |
+ void _handleError(AsyncError error) { |
+ if (!closed) _controller.signalError(error); |
+ } |
- void _markReturned() { |
- // Any activity on the socket while waiting in the pool will |
- // invalidate the connection os that it is not reused. |
- _socket.onData = _invalidate; |
- _socket.onClosed = _invalidate; |
- _socket.onError = (_) => _invalidate(); |
- _returnTime = new DateTime.now(); |
- _httpClientConnection = null; |
+ void _connectionClosed(_HttpConnection connection) { |
+ _connections.remove(connection); |
} |
- void _markRetrieved() { |
- _socket.onData = null; |
- _socket.onClosed = null; |
- _socket.onError = null; |
- _httpClientConnection = null; |
+ _HttpSessionManager get _sessionManager { |
+ // Lazy init. |
+ if (_sessionManagerInstance == null) { |
+ _sessionManagerInstance = new _HttpSessionManager(); |
+ } |
+ return _sessionManagerInstance; |
} |
- void _close() { |
- _socket.onData = null; |
- _socket.onClosed = null; |
- _socket.onError = null; |
- _httpClientConnection = null; |
- _socket.close(); |
+ HttpConnectionsInfo connectionsInfo() { |
+ HttpConnectionsInfo result = new HttpConnectionsInfo(); |
+ result.total = _connections.length; |
+ _connections.forEach((_HttpConnection conn) { |
+ if (conn._isActive) { |
+ result.active++; |
+ } else if (conn._isIdle) { |
+ result.idle++; |
+ } else { |
+ assert(conn._isClosing); |
+ result.closing++; |
+ } |
+ }); |
+ return result; |
} |
- Duration _idleTime(DateTime now) => now.difference(_returnTime); |
+ _HttpSessionManager _sessionManagerInstance; |
- bool get _fromPool => _returnTime != null; |
+ // Indicated if the http server has been closed. |
+ bool closed = false; |
- void _invalidate() { |
- _valid = false; |
- _close(); |
- } |
+ // The server listen socket. |
+ final ServerSocket _serverSocket; |
+ final bool _closeServer; |
- int get hashCode => _socket.hashCode; |
+ // Set of currently connected clients. |
+ final Set<_HttpConnection> _connections = new Set<_HttpConnection>(); |
+ final StreamController<HttpRequest> _controller |
+ = new StreamController<HttpRequest>(); |
- String _host; |
- int _port; |
- Socket _socket; |
- DateTime _returnTime; |
- bool _valid = true; |
- HttpClientConnection _httpClientConnection; |
+ // TODO(ajohnsen): Use close queue? |
} |
+ |
class _ProxyConfiguration { |
static const String PROXY_PREFIX = "PROXY "; |
static const String DIRECT_PREFIX = "DIRECT"; |
@@ -1844,6 +1485,7 @@ class _ProxyConfiguration { |
final List<_Proxy> proxies; |
} |
+ |
class _Proxy { |
const _Proxy(this.host, this.port) : isDirect = false; |
const _Proxy.direct() : host = null, port = null, isDirect = true; |
@@ -1853,376 +1495,59 @@ class _Proxy { |
final bool isDirect; |
} |
-class _HttpClient implements HttpClient { |
- static const int DEFAULT_EVICTION_TIMEOUT = 60000; |
- |
- _HttpClient() : _openSockets = new Map(), |
- _activeSockets = new Set(), |
- _closeQueue = new _CloseQueue(), |
- credentials = new List<_Credentials>(), |
- _shutdown = false; |
- |
- HttpClientConnection open( |
- String method, String host, int port, String path) { |
- // TODO(sgjesse): The path set here can contain both query and |
- // fragment. They should be cracked and set correctly. |
- return _open(method, new Uri.fromComponents( |
- scheme: "http", domain: host, port: port, path: path)); |
- } |
- |
- HttpClientConnection _open(String method, |
- Uri uri, |
- [_HttpClientConnection connection]) { |
- if (_shutdown) throw new HttpException("HttpClient shutdown"); |
- if (method == null || uri.domain.isEmpty) { |
- throw new ArgumentError(null); |
- } |
- return _prepareHttpClientConnection(method, uri, connection); |
- } |
- |
- HttpClientConnection openUrl(String method, Uri url) { |
- return _openUrl(method, url); |
- } |
- |
- HttpClientConnection _openUrl(String method, |
- Uri url, |
- [_HttpClientConnection connection]) { |
- if (url.scheme != "http" && url.scheme != "https") { |
- throw new HttpException("Unsupported URL scheme ${url.scheme}"); |
- } |
- return _open(method, url, connection); |
- } |
- |
- HttpClientConnection get(String host, int port, String path) { |
- return open("GET", host, port, path); |
- } |
- |
- HttpClientConnection getUrl(Uri url) => _openUrl("GET", url); |
- |
- HttpClientConnection post(String host, int port, String path) { |
- return open("POST", host, port, path); |
- } |
- |
- HttpClientConnection postUrl(Uri url) => _openUrl("POST", url); |
- |
- set authenticate(Future<bool> f(Uri url, String scheme, String realm)) { |
- _authenticate = f; |
- } |
- |
- void addCredentials( |
- Uri url, String realm, HttpClientCredentials cr) { |
- credentials.add(new _Credentials(url, realm, cr)); |
- } |
- |
- set sendClientCertificate(bool send) => _sendClientCertificate = send; |
- |
- set clientCertificate(String nickname) => _clientCertificate = nickname; |
- |
- set findProxy(String f(Uri uri)) => _findProxy = f; |
- |
- void shutdown({bool force: false}) { |
- if (force) _closeQueue.shutdown(); |
- new Map.from(_openSockets).forEach( |
- (String key, Queue<_SocketConnection> connections) { |
- while (!connections.isEmpty) { |
- _SocketConnection socketConn = connections.removeFirst(); |
- socketConn._socket.close(); |
- } |
- }); |
- if (force) { |
- _activeSockets.toList().forEach((_SocketConnection socketConn) { |
- socketConn._httpClientConnection._onClientShutdown(); |
- socketConn._close(); |
- }); |
- } |
- if (_evictionTimer != null) _cancelEvictionTimer(); |
- _shutdown = true; |
- } |
- |
- void _cancelEvictionTimer() { |
- _evictionTimer.cancel(); |
- _evictionTimer = null; |
- } |
- |
- String _connectionKey(String host, int port) { |
- return "$host:$port"; |
- } |
- |
- HttpClientConnection _prepareHttpClientConnection( |
- String method, |
- Uri url, |
- [_HttpClientConnection connection]) { |
- |
- void _establishConnection(String host, |
- int port, |
- _ProxyConfiguration proxyConfiguration, |
- int proxyIndex, |
- bool reusedConnection, |
- bool secure) { |
- |
- void _connectionOpened(_SocketConnection socketConn, |
- _HttpClientConnection connection, |
- bool usingProxy) { |
- socketConn._httpClientConnection = connection; |
- connection._usingProxy = usingProxy; |
- connection._connectionEstablished(socketConn); |
- HttpClientRequest request = connection.open(method, url); |
- request.headers.host = host; |
- request.headers.port = port; |
- if (url.userInfo != null && !url.userInfo.isEmpty) { |
- // If the URL contains user information use that for basic |
- // authorization |
- _UTF8Encoder encoder = new _UTF8Encoder(); |
- String auth = |
- CryptoUtils.bytesToBase64(encoder.encodeString(url.userInfo)); |
- request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
- } else { |
- // Look for credentials. |
- _Credentials cr = _findCredentials(url); |
- if (cr != null) { |
- cr.authorize(request); |
- } |
- } |
- // A reused connection is indicating either redirect or retry |
- // where the onRequest callback should not be issued again. |
- if (connection._onRequest != null && !reusedConnection) { |
- connection._onRequest(request); |
- } else { |
- request.outputStream.close(); |
- } |
- } |
- |
- assert(proxyIndex < proxyConfiguration.proxies.length); |
- |
- // Determine the actual host to connect to. |
- String connectHost; |
- int connectPort; |
- _Proxy proxy = proxyConfiguration.proxies[proxyIndex]; |
- if (proxy.isDirect) { |
- connectHost = host; |
- connectPort = port; |
- } else { |
- connectHost = proxy.host; |
- connectPort = proxy.port; |
- } |
- |
- // If there are active connections for this key get the first one |
- // otherwise create a new one. |
- String key = _connectionKey(connectHost, connectPort); |
- Queue socketConnections = _openSockets[key]; |
- // Remove active connections that are not valid any more or of |
- // the wrong type (HTTP or HTTPS). |
- if (socketConnections != null) { |
- while (!socketConnections.isEmpty) { |
- if (socketConnections.first._valid) { |
- // If socket has the same properties, exit loop with found socket. |
- var socket = socketConnections.first._socket; |
- if (!secure && socket is! SecureSocket) break; |
- if (secure && socket is SecureSocket && |
- _sendClientCertificate == socket.sendClientCertificate && |
- _clientCertificate == socket.certificateName) break; |
- } |
- socketConnections.removeFirst()._close(); |
- } |
- } |
- if (socketConnections == null || socketConnections.isEmpty) { |
- Socket socket = secure && proxy.isDirect ? |
- new SecureSocket(connectHost, |
- connectPort, |
- sendClientCertificate: _sendClientCertificate, |
- certificateName: _clientCertificate) : |
- new Socket(connectHost, connectPort); |
- // Until the connection is established handle connection errors |
- // here as the HttpClientConnection object is not yet associated |
- // with the socket. |
- socket.onError = (e) { |
- proxyIndex++; |
- if (proxyIndex < proxyConfiguration.proxies.length) { |
- // Try the next proxy in the list. |
- _establishConnection( |
- host, port, proxyConfiguration, proxyIndex, false, secure); |
- } else { |
- // Report the error through the HttpClientConnection object to |
- // the client. |
- connection._onError(e); |
- } |
- }; |
- socket.onConnect = () { |
- // When the connection is established, clear the error |
- // callback as it will now be handled by the |
- // HttpClientConnection object which will be associated with |
- // the connected socket. |
- socket.onError = null; |
- _SocketConnection socketConn = |
- new _SocketConnection(connectHost, connectPort, socket); |
- _activeSockets.add(socketConn); |
- _connectionOpened(socketConn, connection, !proxy.isDirect); |
- }; |
- } else { |
- _SocketConnection socketConn = socketConnections.removeFirst(); |
- socketConn._markRetrieved(); |
- _activeSockets.add(socketConn); |
- Timer.run(() => |
- _connectionOpened(socketConn, connection, !proxy.isDirect)); |
- |
- // Get rid of eviction timer if there are no more active connections. |
- if (socketConnections.isEmpty) _openSockets.remove(key); |
- if (_openSockets.isEmpty) _cancelEvictionTimer(); |
- } |
- } |
- |
- // Find out if we want a secure socket. |
- bool is_secure = (url.scheme == "https"); |
- |
- // Find the TCP host and port. |
- String host = url.domain; |
- int port = url.port; |
- if (port == 0) { |
- port = is_secure ? |
- HttpClient.DEFAULT_HTTPS_PORT : |
- HttpClient.DEFAULT_HTTP_PORT; |
- } |
- // Create a new connection object if we are not re-using an existing one. |
- var reusedConnection = false; |
- if (connection == null) { |
- connection = new _HttpClientConnection(this); |
- } else { |
- reusedConnection = true; |
- } |
- connection.onDetach = () => _activeSockets.remove(connection._socketConn); |
- |
- // Check to see if a proxy server should be used for this connection. |
- _ProxyConfiguration proxyConfiguration = const _ProxyConfiguration.direct(); |
- if (_findProxy != null) { |
- // TODO(sgjesse): Keep a map of these as normally only a few |
- // configuration strings will be used. |
- proxyConfiguration = new _ProxyConfiguration(_findProxy(url)); |
- } |
- |
- // Establish the connection starting with the first proxy configured. |
- _establishConnection(host, |
- port, |
- proxyConfiguration, |
- 0, |
- reusedConnection, |
- is_secure); |
- return connection; |
+class _HttpConnectionInfo implements HttpConnectionInfo { |
+ static _HttpConnectionInfo create(Socket socket) { |
+ if (socket == null) return null; |
+ try { |
+ _HttpConnectionInfo info = new _HttpConnectionInfo._(); |
+ info.remoteHost = socket.remoteHost; |
+ info.remotePort = socket.remotePort; |
+ info.localPort = socket.port; |
+ return info; |
+ } catch (e) { } |
+ return null; |
} |
- void _returnSocketConnection(_SocketConnection socketConn) { |
- // If the HTTP client is being shutdown don't return the connection. |
- if (_shutdown) { |
- socketConn._close(); |
- return; |
- }; |
- |
- // Mark socket as returned to unregister from the old connection. |
- socketConn._markReturned(); |
- |
- String key = _connectionKey(socketConn._host, socketConn._port); |
+ _HttpConnectionInfo._(); |
- // Get or create the connection list for this key. |
- Queue sockets = _openSockets[key]; |
- if (sockets == null) { |
- sockets = new Queue(); |
- _openSockets[key] = sockets; |
- } |
- |
- // If there is currently no eviction timer start one. |
- if (_evictionTimer == null) { |
- void _handleEviction(Timer timer) { |
- DateTime now = new DateTime.now(); |
- List<String> emptyKeys = new List<String>(); |
- _openSockets.forEach( |
- (String key, Queue<_SocketConnection> connections) { |
- // As returned connections are added at the head of the |
- // list remove from the tail. |
- while (!connections.isEmpty) { |
- _SocketConnection socketConn = connections.last; |
- if (socketConn._idleTime(now).inMilliseconds > |
- DEFAULT_EVICTION_TIMEOUT) { |
- connections.removeLast(); |
- socketConn._socket.close(); |
- if (connections.isEmpty) emptyKeys.add(key); |
- } else { |
- break; |
- } |
- } |
- }); |
+ String remoteHost; |
+ int remotePort; |
+ int localPort; |
+} |
- // Remove the keys for which here are no more open connections. |
- emptyKeys.forEach((String key) => _openSockets.remove(key)); |
- // If all connections where evicted cancel the eviction timer. |
- if (_openSockets.isEmpty) _cancelEvictionTimer(); |
- } |
- _evictionTimer = new Timer.repeating(const Duration(seconds: 10), |
- _handleEviction); |
- } |
+class _DetachedSocket implements Socket { |
+ final Stream<List<int>> _incoming; |
+ final Socket _socket; |
- // Return connection. |
- _activeSockets.remove(socketConn); |
- sockets.addFirst(socketConn); |
- } |
+ _DetachedSocket(this._socket, this._incoming); |
- void _closeSocketConnection(_SocketConnection socketConn) { |
- socketConn._close(); |
- _activeSockets.remove(socketConn); |
+ StreamSubscription<List<int>> listen(void onData(List<int> event), |
+ {void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError}) { |
+ return _incoming.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ unsubscribeOnError: unsubscribeOnError); |
} |
- void _closedSocketConnection(_SocketConnection socketConn) { |
- _activeSockets.remove(socketConn); |
+ Future<Socket> consume(Stream<List<int>> stream) { |
+ return _socket.consume(stream); |
} |
- _Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) { |
- // Look for credentials. |
- _Credentials cr = |
- credentials.reduce(null, (_Credentials prev, _Credentials value) { |
- if (value.applies(url, scheme)) { |
- if (prev == null) return value; |
- return value.uri.path.length > prev.uri.path.length ? value : prev; |
- } else { |
- return prev; |
- } |
- }); |
- return cr; |
+ Future<Socket> addStream(Stream<List<int>> stream) { |
+ return _socket.addStream(stream); |
} |
- void _removeCredentials(_Credentials cr) { |
- int index = credentials.indexOf(cr); |
- if (index != -1) { |
- credentials.removeAt(index); |
- } |
+ void addString(String string, [Encoding encoding = Encoding.UTF_8]) { |
+ return _socket.addString(string, encoding); |
} |
- Function _onOpen; |
- Map<String, Queue<_SocketConnection>> _openSockets; |
- Set<_SocketConnection> _activeSockets; |
- _CloseQueue _closeQueue; |
- List<_Credentials> credentials; |
- Timer _evictionTimer; |
- Function _findProxy; |
- Function _authenticate; |
- bool _sendClientCertificate = false; |
- String _clientCertificate; |
- bool _shutdown; // Has this HTTP client been shutdown? |
-} |
- |
- |
-class _HttpConnectionInfo implements HttpConnectionInfo { |
- String remoteHost; |
- int remotePort; |
- int localPort; |
-} |
- |
- |
-class _DetachedSocket implements DetachedSocket { |
- _DetachedSocket(this._socket, this._unparsedData); |
- Socket get socket => _socket; |
- List<int> get unparsedData => _unparsedData; |
- Socket _socket; |
- List<int> _unparsedData; |
+ void destroy() => _socket.destroy(); |
+ void add(List<int> data) => _socket.add(data); |
+ Future<Socket> close() => _socket.close(); |
} |
@@ -2300,10 +1625,8 @@ class _HttpClientBasicCredentials implements HttpClientBasicCredentials { |
// Proxy-Authenticate headers, see |
// http://tools.ietf.org/html/draft-reschke-basicauth-enc-06. For |
// now always use UTF-8 encoding. |
- _UTF8Encoder encoder = new _UTF8Encoder(); |
String auth = |
- CryptoUtils.bytesToBase64(encoder.encodeString( |
- "$username:$password")); |
+ CryptoUtils.bytesToBase64(_encodeString("$username:$password")); |
request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
} |
@@ -2328,7 +1651,6 @@ class _HttpClientDigestCredentials implements HttpClientDigestCredentials { |
} |
- |
class _RedirectInfo implements RedirectInfo { |
const _RedirectInfo(int this.statusCode, |
String this.method, |