OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 // The close queue handles graceful closing of HTTP connections. When | 7 class _HttpIncoming |
8 // a connection is added to the queue it will enter a wait state | 8 extends Stream<List<int>> implements StreamSink<List<int>> { |
9 // waiting for all data written and possibly socket shutdown from | 9 final int _transferLength; |
10 // peer. | 10 final Completer _dataCompleter = new Completer(); |
11 class _CloseQueue { | 11 Stream<List<int>> _stream; |
12 _CloseQueue() : _q = new Set<_HttpConnectionBase>(); | 12 |
13 | 13 bool fullBodyRead = false; |
14 void add(_HttpConnectionBase connection) { | 14 |
15 void closeIfDone() { | 15 // Common properties. |
16 // When either the client has closed or all data has been | 16 final _HttpHeaders headers; |
17 // written to the client we close the underlying socket | 17 bool upgraded = false; |
18 // completely. | 18 |
19 if (connection._isWriteClosed || connection._isReadClosed) { | 19 // ClientResponse properties. |
20 _q.remove(connection); | 20 int statusCode; |
21 connection._socket.close(); | 21 String reasonPhrase; |
22 if (connection.onClosed != null) connection.onClosed(); | 22 |
23 } | 23 // Request properties. |
24 } | 24 String method; |
25 | 25 Uri uri; |
26 // If the connection is already fully closed don't insert it into | 26 |
27 // the queue. | 27 // The transfer length if the length of the message body as it |
28 if (connection._isFullyClosed) { | 28 // appears in the message (RFC 2616 section 4.4). This can be -1 if |
29 connection._socket.close(); | 29 // the length of the massage body is not known due to transfer |
30 if (connection.onClosed != null) connection.onClosed(); | 30 // codings. |
31 return; | 31 int get transferLength => _transferLength; |
32 } | 32 |
33 | 33 _HttpIncoming(_HttpHeaders this.headers, |
34 connection._state |= _HttpConnectionBase.CLOSING; | 34 int this._transferLength, |
35 _q.add(connection); | 35 Stream<List<int>> this._stream) { |
36 | 36 } |
37 // If the output stream is not closed for writing, close it now and | 37 |
38 // wait for callback when closed. | 38 StreamSubscription<List<int>> listen(void onData(List<int> event), |
39 if (!connection._isWriteClosed) { | 39 {void onError(AsyncError error), |
40 connection._socket.outputStream.close(); | 40 void onDone(), |
41 connection._socket.outputStream.onClosed = () { | 41 bool unsubscribeOnError}) { |
42 connection._state |= _HttpConnectionBase.WRITE_CLOSED; | 42 return _stream.listen(onData, |
43 closeIfDone(); | 43 onError: onError, |
44 }; | 44 onDone: onDone, |
45 } else { | 45 unsubscribeOnError: unsubscribeOnError); |
46 connection._socket.outputStream.onClosed = () { assert(false); }; | 46 } |
47 } | 47 |
48 | 48 // Is completed once all data have been received. |
49 // If the request is not already fully read wait for the socket to close. | 49 Future get dataDone => _dataCompleter.future; |
50 // As the _isReadClosed state from the HTTP request processing indicate | 50 |
51 // that the response has been parsed this does not necesarily mean tha | 51 void close() { |
52 // the socket is closed. | 52 fullBodyRead = true; |
53 if (!connection._isReadClosed) { | 53 _dataCompleter.complete(); |
54 connection._socket.onClosed = () { | 54 } |
55 connection._state |= _HttpConnectionBase.READ_CLOSED; | |
56 closeIfDone(); | |
57 }; | |
58 } | |
59 | |
60 // Ignore any data on a socket in the close queue. | |
61 connection._socket.onData = connection._socket.read; | |
62 | |
63 // If an error occurs immediately close the socket. | |
64 connection._socket.onError = (e) { | |
65 connection._state |= _HttpConnectionBase.READ_CLOSED; | |
66 connection._state |= _HttpConnectionBase.WRITE_CLOSED; | |
67 closeIfDone(); | |
68 }; | |
69 } | |
70 | |
71 void shutdown() { | |
72 _q.forEach((_HttpConnectionBase connection) { | |
73 connection._socket.close(); | |
74 }); | |
75 } | |
76 | |
77 final Set<_HttpConnectionBase> _q; | |
78 } | 55 } |
79 | 56 |
80 | 57 class _HttpInboundMessage extends Stream<List<int>> { |
81 class _HttpRequestResponseBase { | 58 final _HttpIncoming _incoming; |
82 static const int START = 0; | |
83 static const int HEADER_SENT = 1; | |
84 static const int DONE = 2; | |
85 static const int UPGRADED = 3; | |
86 | |
87 _HttpRequestResponseBase(_HttpConnectionBase this._httpConnection) | |
88 : _state = START, _headResponse = false; | |
89 | |
90 int get contentLength => _headers.contentLength; | |
91 HttpHeaders get headers => _headers; | |
92 | |
93 bool get persistentConnection { | |
94 List<String> connection = headers[HttpHeaders.CONNECTION]; | |
95 if (_protocolVersion == "1.1") { | |
96 if (connection == null) return true; | |
97 return !headers[HttpHeaders.CONNECTION].any( | |
98 (value) => value.toLowerCase() == "close"); | |
99 } else { | |
100 if (connection == null) return false; | |
101 return headers[HttpHeaders.CONNECTION].any( | |
102 (value) => value.toLowerCase() == "keep-alive"); | |
103 } | |
104 } | |
105 | |
106 X509Certificate get certificate { | |
107 var socket = _httpConnection._socket as SecureSocket; | |
108 return socket == null ? socket : socket.peerCertificate; | |
109 } | |
110 | |
111 void set persistentConnection(bool persistentConnection) { | |
112 if (_outputStream != null) throw new HttpException("Header already sent"); | |
113 | |
114 // Determine the value of the "Connection" header. | |
115 headers.remove(HttpHeaders.CONNECTION, "close"); | |
116 headers.remove(HttpHeaders.CONNECTION, "keep-alive"); | |
117 if (_protocolVersion == "1.1" && !persistentConnection) { | |
118 headers.add(HttpHeaders.CONNECTION, "close"); | |
119 } else if (_protocolVersion == "1.0" && persistentConnection) { | |
120 headers.add(HttpHeaders.CONNECTION, "keep-alive"); | |
121 } | |
122 } | |
123 | |
124 | |
125 bool _write(List<int> data, bool copyBuffer) { | |
126 if (_headResponse) return true; | |
127 _ensureHeadersSent(); | |
128 bool allWritten = true; | |
129 if (data.length > 0) { | |
130 if (_headers.chunkedTransferEncoding) { | |
131 // Write chunk size if transfer encoding is chunked. | |
132 _writeHexString(data.length); | |
133 _writeCRLF(); | |
134 _httpConnection._write(data, copyBuffer); | |
135 allWritten = _writeCRLF(); | |
136 } else { | |
137 _updateContentLength(data.length); | |
138 allWritten = _httpConnection._write(data, copyBuffer); | |
139 } | |
140 } | |
141 return allWritten; | |
142 } | |
143 | |
144 bool _writeList(List<int> data, int offset, int count) { | |
145 if (_headResponse) return true; | |
146 _ensureHeadersSent(); | |
147 bool allWritten = true; | |
148 if (count > 0) { | |
149 if (_headers.chunkedTransferEncoding) { | |
150 // Write chunk size if transfer encoding is chunked. | |
151 _writeHexString(count); | |
152 _writeCRLF(); | |
153 _httpConnection._writeFrom(data, offset, count); | |
154 allWritten = _writeCRLF(); | |
155 } else { | |
156 _updateContentLength(count); | |
157 allWritten = _httpConnection._writeFrom(data, offset, count); | |
158 } | |
159 } | |
160 return allWritten; | |
161 } | |
162 | |
163 bool _writeDone() { | |
164 bool allWritten = true; | |
165 if (_headers.chunkedTransferEncoding) { | |
166 // Terminate the content if transfer encoding is chunked. | |
167 allWritten = _httpConnection._write(_Const.END_CHUNKED); | |
168 } else { | |
169 if (!_headResponse && _bodyBytesWritten < _headers.contentLength) { | |
170 throw new HttpException("Sending less than specified content length"); | |
171 } | |
172 assert(_headResponse || _bodyBytesWritten == _headers.contentLength); | |
173 } | |
174 return allWritten; | |
175 } | |
176 | |
177 bool _writeHeaders() { | |
178 _headers._write(_httpConnection); | |
179 // Terminate header. | |
180 return _writeCRLF(); | |
181 } | |
182 | |
183 bool _writeHexString(int x) { | |
184 final List<int> hexDigits = [0x30, 0x31, 0x32, 0x33, 0x34, | |
185 0x35, 0x36, 0x37, 0x38, 0x39, | |
186 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; | |
187 List<int> hex = new Uint8List(10); | |
188 int index = hex.length; | |
189 while (x > 0) { | |
190 index--; | |
191 hex[index] = hexDigits[x % 16]; | |
192 x = x >> 4; | |
193 } | |
194 return _httpConnection._writeFrom(hex, index, hex.length - index); | |
195 } | |
196 | |
197 bool _writeCRLF() { | |
198 final CRLF = const [_CharCode.CR, _CharCode.LF]; | |
199 return _httpConnection._write(CRLF); | |
200 } | |
201 | |
202 bool _writeSP() { | |
203 final SP = const [_CharCode.SP]; | |
204 return _httpConnection._write(SP); | |
205 } | |
206 | |
207 void _ensureHeadersSent() { | |
208 // Ensure that headers are written. | |
209 if (_state == START) { | |
210 _writeHeader(); | |
211 } | |
212 } | |
213 | |
214 void _updateContentLength(int bytes) { | |
215 if (_bodyBytesWritten + bytes > _headers.contentLength) { | |
216 throw new HttpException("Writing more than specified content length"); | |
217 } | |
218 _bodyBytesWritten += bytes; | |
219 } | |
220 | |
221 HttpConnectionInfo get connectionInfo => _httpConnection.connectionInfo; | |
222 | |
223 bool get _done => _state == DONE; | |
224 | |
225 int _state; | |
226 bool _headResponse; | |
227 | |
228 _HttpConnectionBase _httpConnection; | |
229 _HttpHeaders _headers; | |
230 List<Cookie> _cookies; | 59 List<Cookie> _cookies; |
231 String _protocolVersion = "1.1"; | 60 |
232 | 61 _HttpInboundMessage(_HttpIncoming this._incoming); |
233 // Number of body bytes written. This is only actual body data not | |
234 // including headers or chunk information of using chinked transfer | |
235 // encoding. | |
236 int _bodyBytesWritten = 0; | |
237 } | |
238 | |
239 | |
240 // Parsed HTTP request providing information on the HTTP headers. | |
241 class _HttpRequest extends _HttpRequestResponseBase implements HttpRequest { | |
242 _HttpRequest(_HttpConnection connection) : super(connection); | |
243 | |
244 String get method => _method; | |
245 String get uri => _uri; | |
246 String get path => _path; | |
247 String get queryString => _queryString; | |
248 Map get queryParameters => _queryParameters; | |
249 | 62 |
250 List<Cookie> get cookies { | 63 List<Cookie> get cookies { |
251 if (_cookies != null) return _cookies; | 64 if (_cookies != null) return _cookies; |
252 | 65 return _cookies = headers._parseCookies(); |
253 // Parse a Cookie header value according to the rules in RFC 6265. | 66 } |
254 void _parseCookieString(String s) { | 67 |
255 int index = 0; | 68 HttpHeaders get headers => _incoming.headers; |
256 | 69 String get protocolVersion => headers.protocolVersion; |
257 bool done() => index == s.length; | 70 int get contentLength => headers.contentLength; |
258 | 71 bool get persistentConnection => headers.persistentConnection; |
259 void skipWS() { | 72 } |
260 while (!done()) { | 73 |
261 if (s[index] != " " && s[index] != "\t") return; | 74 |
262 index++; | 75 class _HttpRequest extends _HttpInboundMessage implements HttpRequest { |
263 } | 76 final HttpResponse response; |
264 } | 77 |
265 | 78 // Lazy initialized parsed query parameters. |
266 String parseName() { | 79 Map<String, String> _queryParameters; |
267 int start = index; | 80 |
268 while (!done()) { | 81 final _HttpServer _httpServer; |
269 if (s[index] == " " || s[index] == "\t" || s[index] == "=") break; | 82 |
270 index++; | 83 final _HttpConnection _httpConnection; |
271 } | 84 |
272 return s.substring(start, index).toLowerCase(); | 85 HttpSession _session; |
273 } | 86 |
274 | 87 _HttpRequest(_HttpResponse this.response, |
275 String parseValue() { | 88 _HttpIncoming _incoming, |
276 int start = index; | 89 _HttpServer this._httpServer, |
277 while (!done()) { | 90 _HttpConnection this._httpConnection) |
278 if (s[index] == " " || s[index] == "\t" || s[index] == ";") break; | 91 : super(_incoming) { |
279 index++; | 92 response.headers.persistentConnection = headers.persistentConnection; |
280 } | 93 |
281 return s.substring(start, index).toLowerCase(); | 94 if (_httpServer._sessionManagerInstance != null) { |
282 } | |
283 | |
284 void expect(String expected) { | |
285 if (done()) { | |
286 throw new HttpException("Failed to parse header value [$s]"); | |
287 } | |
288 if (s[index] != expected) { | |
289 throw new HttpException("Failed to parse header value [$s]"); | |
290 } | |
291 index++; | |
292 } | |
293 | |
294 while (!done()) { | |
295 skipWS(); | |
296 if (done()) return; | |
297 String name = parseName(); | |
298 skipWS(); | |
299 expect("="); | |
300 skipWS(); | |
301 String value = parseValue(); | |
302 _cookies.add(new _Cookie(name, value)); | |
303 skipWS(); | |
304 if (done()) return; | |
305 expect(";"); | |
306 } | |
307 } | |
308 | |
309 _cookies = new List<Cookie>(); | |
310 List<String> headerValues = headers["cookie"]; | |
311 if (headerValues != null) { | |
312 headerValues.forEach((headerValue) => _parseCookieString(headerValue)); | |
313 } | |
314 return _cookies; | |
315 } | |
316 | |
317 InputStream get inputStream { | |
318 if (_inputStream == null) { | |
319 _inputStream = new _HttpInputStream(this); | |
320 } | |
321 return _inputStream; | |
322 } | |
323 | |
324 String get protocolVersion => _protocolVersion; | |
325 | |
326 HttpSession session([init(HttpSession session)]) { | |
327 if (_session != null) { | |
328 // It's already mapped, use it. | |
329 return _session; | |
330 } | |
331 // Create session, store it in connection, and return. | |
332 var sessionManager = _httpConnection._server._sessionManager; | |
333 return _session = sessionManager.createSession(init); | |
334 } | |
335 | |
336 void _onRequestReceived(String method, | |
337 String uri, | |
338 String version, | |
339 _HttpHeaders headers) { | |
340 _method = method; | |
341 _uri = uri; | |
342 _parseRequestUri(uri); | |
343 _headers = headers; | |
344 if (_httpConnection._server._sessionManagerInstance != null) { | |
345 // Map to session if exists. | 95 // Map to session if exists. |
346 var sessionId = cookies.reduce(null, (last, cookie) { | 96 var sessionId = cookies.reduce(null, (last, cookie) { |
347 if (last != null) return last; | 97 if (last != null) return last; |
348 return cookie.name.toUpperCase() == _DART_SESSION_ID ? | 98 return cookie.name.toUpperCase() == _DART_SESSION_ID ? |
349 cookie.value : null; | 99 cookie.value : null; |
350 }); | 100 }); |
351 if (sessionId != null) { | 101 if (sessionId != null) { |
352 var sessionManager = _httpConnection._server._sessionManager; | 102 _session = _httpServer._sessionManager.getSession(sessionId); |
353 _session = sessionManager.getSession(sessionId); | |
354 if (_session != null) { | 103 if (_session != null) { |
355 _session._markSeen(); | 104 _session._markSeen(); |
356 } | 105 } |
357 } | 106 } |
358 } | 107 } |
359 | 108 } |
360 // Prepare for receiving data. | 109 |
361 _buffer = new _BufferList(); | 110 StreamSubscription<List<int>> listen(void onData(List<int> event), |
362 } | 111 {void onError(AsyncError error), |
363 | 112 void onDone(), |
364 void _onDataReceived(List<int> data) { | 113 bool unsubscribeOnError}) { |
365 _buffer.add(data); | 114 return _incoming.listen(onData, |
366 if (_inputStream != null) _inputStream._dataReceived(); | 115 onError: onError, |
367 } | 116 onDone: onDone, |
368 | 117 unsubscribeOnError: unsubscribeOnError); |
369 void _onDataEnd() { | 118 } |
370 if (_inputStream != null) { | 119 |
371 _inputStream._closeReceived(); | 120 Map<String, String> get queryParameters { |
| 121 if (_queryParameters == null) { |
| 122 _queryParameters = _HttpUtils.splitQueryString(uri.query); |
| 123 } |
| 124 return _queryParameters; |
| 125 } |
| 126 |
| 127 Uri get uri => _incoming.uri; |
| 128 |
| 129 String get method => _incoming.method; |
| 130 |
| 131 HttpSession get session { |
| 132 if (_session != null) { |
| 133 // It's already mapped, use it. |
| 134 return _session; |
| 135 } |
| 136 // Create session, store it in connection, and return. |
| 137 return _session = _httpServer._sessionManager.createSession(); |
| 138 } |
| 139 |
| 140 HttpConnectionInfo get connectionInfo => _httpConnection.connectionInfo; |
| 141 |
| 142 X509Certificate get certificate { |
| 143 Socket socket = _httpConnection._socket; |
| 144 if (socket is SecureSocket) return socket.peerCertificate; |
| 145 return null; |
| 146 } |
| 147 } |
| 148 |
| 149 |
| 150 class _HttpClientResponse |
| 151 extends _HttpInboundMessage implements HttpClientResponse { |
| 152 List<RedirectInfo> get redirects => _httpRequest._responseRedirects; |
| 153 |
| 154 // The HttpClient this response belongs to. |
| 155 final _HttpClient _httpClient; |
| 156 |
| 157 // The HttpClientRequest of this response. |
| 158 final _HttpClientRequest _httpRequest; |
| 159 |
| 160 List<Cookie> _cookies; |
| 161 |
| 162 _HttpClientResponse(_HttpIncoming _incoming, |
| 163 _HttpClientRequest this._httpRequest, |
| 164 _HttpClient this._httpClient) |
| 165 : super(_incoming); |
| 166 |
| 167 int get statusCode => _incoming.statusCode; |
| 168 String get reasonPhrase => _incoming.reasonPhrase; |
| 169 |
| 170 List<Cookie> get cookies { |
| 171 if (_cookies != null) return _cookies; |
| 172 _cookies = new List<Cookie>(); |
| 173 List<String> values = headers["set-cookie"]; |
| 174 if (values != null) { |
| 175 values.forEach((value) { |
| 176 _cookies.add(new Cookie.fromSetCookieValue(value)); |
| 177 }); |
| 178 } |
| 179 return _cookies; |
| 180 } |
| 181 |
| 182 bool get isRedirect { |
| 183 if (_httpRequest.method == "GET" || _httpRequest.method == "HEAD") { |
| 184 return statusCode == HttpStatus.MOVED_PERMANENTLY || |
| 185 statusCode == HttpStatus.FOUND || |
| 186 statusCode == HttpStatus.SEE_OTHER || |
| 187 statusCode == HttpStatus.TEMPORARY_REDIRECT; |
| 188 } else if (_httpRequest.method == "POST") { |
| 189 return statusCode == HttpStatus.SEE_OTHER; |
| 190 } |
| 191 return false; |
| 192 } |
| 193 |
| 194 Future<HttpClientResponse> redirect([String method, |
| 195 Uri url, |
| 196 bool followLoops]) { |
| 197 if (method == null) { |
| 198 // Set method as defined by RFC 2616 section 10.3.4. |
| 199 if (statusCode == HttpStatus.SEE_OTHER && _httpRequest.method == "POST") { |
| 200 method = "GET"; |
| 201 } else { |
| 202 method = _httpRequest.method; |
| 203 } |
| 204 } |
| 205 if (url == null) { |
| 206 String location = headers.value(HttpHeaders.LOCATION); |
| 207 if (location == null) { |
| 208 throw new StateError("Response has no Location header for redirect"); |
| 209 } |
| 210 url = Uri.parse(location); |
| 211 } |
| 212 if (followLoops != true) { |
| 213 for (var redirect in redirects) { |
| 214 if (redirect.location == url) { |
| 215 return new Future.immediateError( |
| 216 new RedirectLoopException(redirects)); |
| 217 } |
| 218 } |
| 219 } |
| 220 return _httpClient._openUrlFromRequest(method, url, _httpRequest) |
| 221 .then((request) { |
| 222 request._responseRedirects.addAll(this.redirects); |
| 223 request._responseRedirects.add(new _RedirectInfo(statusCode, |
| 224 method, |
| 225 url)); |
| 226 return request.close(); |
| 227 }); |
| 228 } |
| 229 |
| 230 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 231 {void onError(AsyncError error), |
| 232 void onDone(), |
| 233 bool unsubscribeOnError}) { |
| 234 return _incoming.listen(onData, |
| 235 onError: onError, |
| 236 onDone: onDone, |
| 237 unsubscribeOnError: unsubscribeOnError); |
| 238 } |
| 239 |
| 240 Future<Socket> detachSocket() { |
| 241 _httpClient._connectionClosed(_httpRequest._httpClientConnection); |
| 242 return _httpRequest._httpClientConnection.detachSocket(); |
| 243 } |
| 244 |
| 245 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
| 246 |
| 247 bool get _shouldAuthenticate { |
| 248 // Only try to authenticate if there is a challenge in the response. |
| 249 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; |
| 250 return statusCode == HttpStatus.UNAUTHORIZED && |
| 251 challenge != null && challenge.length == 1; |
| 252 } |
| 253 |
| 254 Future<HttpClientResponse> _authenticate() { |
| 255 Future<HttpClientResponse> retryWithCredentials(_Credentials cr) { |
| 256 if (cr != null) { |
| 257 // TODO(sgjesse): Support digest. |
| 258 if (cr.scheme == _AuthenticationScheme.BASIC) { |
| 259 // Drain body and retry. |
| 260 return reduce(null, (x, y) {}).then((_) { |
| 261 return _httpClient._openUrlFromRequest(_httpRequest.method, |
| 262 _httpRequest.uri, |
| 263 _httpRequest) |
| 264 .then((request) => request.close()); |
| 265 }); |
| 266 } |
| 267 } |
| 268 |
| 269 // Fall through to here to perform normal response handling if |
| 270 // there is no sensible authorization handling. |
| 271 return new Future.immediate(this); |
| 272 } |
| 273 |
| 274 List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE]; |
| 275 assert(challenge != null || challenge.length == 1); |
| 276 _HeaderValue header = |
| 277 new _HeaderValue.fromString(challenge[0], parameterSeparator: ","); |
| 278 _AuthenticationScheme scheme = |
| 279 new _AuthenticationScheme.fromString(header.value); |
| 280 String realm = header.parameters["realm"]; |
| 281 |
| 282 // See if any credentials are available. |
| 283 _Credentials cr = _httpClient._findCredentials(_httpRequest.uri, scheme); |
| 284 |
| 285 if (cr != null && !cr.used) { |
| 286 // If credentials found prepare for retrying the request. |
| 287 return retryWithCredentials(cr); |
| 288 } |
| 289 |
| 290 // Ask for more credentials if none found or the one found has |
| 291 // already been used. If it has already been used it must now be |
| 292 // invalid and is removed. |
| 293 if (cr != null) { |
| 294 _httpClient._removeCredentials(cr); |
| 295 cr = null; |
| 296 } |
| 297 if (_httpClient._authenticate != null) { |
| 298 Future authComplete = _httpClient._authenticate(_httpRequest.uri, |
| 299 scheme.toString(), |
| 300 realm); |
| 301 return authComplete.then((credsAvailable) { |
| 302 if (credsAvailable) { |
| 303 cr = _httpClient._findCredentials(_httpRequest.uri, scheme); |
| 304 return retryWithCredentials(cr); |
| 305 } else { |
| 306 // No credentials available, complete with original response. |
| 307 return this; |
| 308 } |
| 309 }); |
| 310 } |
| 311 // No credentials were found and the callback was not set. |
| 312 return new Future.immediate(this); |
| 313 } |
| 314 } |
| 315 |
| 316 |
| 317 class _HttpOutboundMessage<T> extends IOSink { |
| 318 // Used to mark when the body should be written. This is used for HEAD |
| 319 // requests and in error handling. |
| 320 bool _ignoreBody = false; |
| 321 |
| 322 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
| 323 : super(outgoing), |
| 324 _outgoing = outgoing, |
| 325 headers = new _HttpHeaders(protocolVersion); |
| 326 |
| 327 int get contentLength => headers.contentLength; |
| 328 void set contentLength(int contentLength) { |
| 329 headers.contentLength = contentLength; |
| 330 } |
| 331 |
| 332 bool get persistentConnection => headers.persistentConnection; |
| 333 bool set persistentConnection(bool p) { |
| 334 headers.persistentConnection = p; |
| 335 } |
| 336 |
| 337 Future<T> consume(Stream<List<int>> stream) { |
| 338 _writeHeaders(); |
| 339 if (_ignoreBody) return new Future.immediate(this); |
| 340 if (_chunked) { |
| 341 // Transform when chunked. |
| 342 stream = stream.transform(new _ChunkedTransformer()); |
| 343 } |
| 344 return super.consume(stream).then((_) => this); |
| 345 } |
| 346 |
| 347 void add(List<int> data) { |
| 348 _writeHeaders(); |
| 349 if (_ignoreBody) return; |
| 350 if (_chunked) { |
| 351 _ChunkedTransformer._addChunk(data, super.add); |
372 } else { | 352 } else { |
373 inputStream._streamMarkedClosed = true; | 353 super.add(data); |
374 } | 354 } |
375 } | 355 } |
376 | 356 |
377 // Escaped characters in uri are expected to have been parsed. | 357 void close() { |
378 void _parseRequestUri(String uri) { | 358 if (!_headersWritten && !_ignoreBody && headers.chunkedTransferEncoding) { |
379 int position; | 359 // If no body was written, _ignoreBody is false (it's not a HEAD |
380 position = uri.indexOf("?", 0); | 360 // request) and the content-length is unspecified, set contentLength to 0. |
381 if (position == -1) { | 361 headers.contentLength = 0; |
382 _path = _HttpUtils.decodeUrlEncodedString(_uri); | 362 } |
383 _queryString = null; | 363 _writeHeaders(); |
384 _queryParameters = new Map(); | 364 if (!_ignoreBody) { |
385 } else { | 365 if (_chunked) { |
386 _path = _HttpUtils.decodeUrlEncodedString(_uri.substring(0, position)); | 366 _ChunkedTransformer._addChunk([], super.add); |
387 _queryString = _uri.substring(position + 1); | 367 } |
388 _queryParameters = _HttpUtils.splitQueryString(_queryString); | 368 } |
389 } | 369 super.close(); |
390 } | 370 } |
391 | 371 |
392 // Delegate functions for the HttpInputStream implementation. | 372 void _writeHeaders() { |
393 int _streamAvailable() { | 373 if (_headersWritten) return; |
394 return _buffer.length; | 374 bool _tmpIgnoreBody = _ignoreBody; |
395 } | 375 _ignoreBody = false; |
396 | 376 _headersWritten = true; |
397 List<int> _streamRead(int bytesToRead) { | 377 _writeHeader(); |
398 return _buffer.readBytes(bytesToRead); | 378 _ignoreBody = _tmpIgnoreBody; |
399 } | 379 if (_ignoreBody) { |
400 | 380 super.close(); |
401 int _streamReadInto(List<int> buffer, int offset, int len) { | 381 return; |
402 List<int> data = _buffer.readBytes(len); | 382 } |
403 buffer.setRange(offset, data.length, data); | 383 _chunked = headers.chunkedTransferEncoding; |
404 return data.length; | 384 if (!_chunked) { |
405 } | 385 _outgoing.setTransferLength(headers.contentLength); |
406 | 386 } |
407 void _streamSetErrorHandler(callback(e)) { | 387 } |
408 _streamErrorHandler = callback; | 388 |
409 } | 389 void _writeHeader(); // TODO(ajohnsen): Better name. |
410 | 390 |
411 String _method; | 391 final _HttpHeaders headers; |
412 String _uri; | 392 |
413 String _path; | 393 final _HttpOutgoing _outgoing; |
414 String _queryString; | 394 bool _headersWritten = false; |
415 Map<String, String> _queryParameters; | 395 bool _chunked = false; |
416 _HttpInputStream _inputStream; | |
417 _BufferList _buffer; | |
418 Function _streamErrorHandler; | |
419 _HttpSession _session; | |
420 } | 396 } |
421 | 397 |
422 | 398 |
423 // HTTP response object for sending a HTTP response. | 399 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
424 class _HttpResponse extends _HttpRequestResponseBase implements HttpResponse { | 400 implements HttpResponse { |
425 _HttpResponse(_HttpConnection httpConnection) | 401 int statusCode = 200; |
426 : super(httpConnection), | 402 String _reasonPhrase; |
427 _statusCode = HttpStatus.OK { | 403 List<Cookie> _cookies; |
428 _headers = new _HttpHeaders(); | 404 _HttpRequest _httpRequest; |
429 } | 405 |
430 | 406 _HttpResponse(String protocolVersion, |
431 void set contentLength(int contentLength) { | 407 _HttpOutgoing _outgoing) |
432 if (_state >= _HttpRequestResponseBase.HEADER_SENT) { | 408 : super(protocolVersion, _outgoing); |
433 throw new HttpException("Header already sent"); | |
434 } | |
435 _headers.contentLength = contentLength; | |
436 } | |
437 | |
438 int get statusCode => _statusCode; | |
439 void set statusCode(int statusCode) { | |
440 if (_outputStream != null) throw new HttpException("Header already sent"); | |
441 _statusCode = statusCode; | |
442 } | |
443 | |
444 String get reasonPhrase => _findReasonPhrase(_statusCode); | |
445 void set reasonPhrase(String reasonPhrase) { | |
446 if (_outputStream != null) throw new HttpException("Header already sent"); | |
447 _reasonPhrase = reasonPhrase; | |
448 } | |
449 | 409 |
450 List<Cookie> get cookies { | 410 List<Cookie> get cookies { |
451 if (_cookies == null) _cookies = new List<Cookie>(); | 411 if (_cookies == null) _cookies = new List<Cookie>(); |
452 return _cookies; | 412 return _cookies; |
453 } | 413 } |
454 | 414 |
455 OutputStream get outputStream { | 415 String get reasonPhrase => _findReasonPhrase(statusCode); |
456 if (_state >= _HttpRequestResponseBase.DONE) { | 416 void set reasonPhrase(String reasonPhrase) { |
457 throw new HttpException("Response closed"); | 417 if (_headersWritten) throw new StateError("Header already sent"); |
458 } | 418 _reasonPhrase = reasonPhrase; |
459 if (_outputStream == null) { | 419 } |
460 _outputStream = new _HttpOutputStream(this); | 420 |
461 } | 421 Future<Socket> detachSocket() { |
462 return _outputStream; | 422 if (_headersWritten) throw new StateError("Headers already sent"); |
463 } | 423 _writeHeaders(); |
464 | 424 var future = _httpRequest._httpConnection.detachSocket(); |
465 DetachedSocket detachSocket() { | 425 // Close connection so the socket is 'free'. |
466 if (_state >= _HttpRequestResponseBase.DONE) { | 426 close(); |
467 throw new HttpException("Response closed"); | 427 return future; |
468 } | 428 } |
469 // Ensure that headers are written. | 429 |
470 if (_state == _HttpRequestResponseBase.START) { | 430 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
471 _writeHeader(); | 431 |
472 } | 432 void _writeHeader() { |
473 _state = _HttpRequestResponseBase.UPGRADED; | 433 writeSP() => add([_CharCode.SP]); |
474 // Ensure that any trailing data is written. | 434 writeCRLF() => add([_CharCode.CR, _CharCode.LF]); |
475 _writeDone(); | 435 |
476 // Indicate to the connection that the response handling is done. | 436 // Write status line. |
477 return _httpConnection._detachSocket(); | 437 if (headers.protocolVersion == "1.1") { |
478 } | 438 add(_Const.HTTP11); |
479 | 439 } else { |
480 // Delegate functions for the HttpOutputStream implementation. | 440 add(_Const.HTTP10); |
481 bool _streamWrite(List<int> buffer, bool copyBuffer) { | 441 } |
482 if (_done) throw new HttpException("Response closed"); | 442 writeSP(); |
483 return _write(buffer, copyBuffer); | 443 addString(statusCode.toString()); |
484 } | 444 writeSP(); |
485 | 445 addString(reasonPhrase); |
486 bool _streamWriteFrom(List<int> buffer, int offset, int len) { | 446 writeCRLF(); |
487 if (_done) throw new HttpException("Response closed"); | 447 |
488 return _writeList(buffer, offset, len); | 448 var session = _httpRequest._session; |
489 } | 449 if (session != null && !session._destroyed) { |
490 | 450 // Mark as not new. |
491 void _streamFlush() { | 451 session._isNew = false; |
492 _httpConnection._flush(); | 452 // Make sure we only send the current session id. |
493 } | 453 bool found = false; |
494 | 454 for (int i = 0; i < cookies.length; i++) { |
495 void _streamClose() { | 455 if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) { |
496 _ensureHeadersSent(); | 456 cookies[i].value = session.id; |
497 _state = _HttpRequestResponseBase.DONE; | 457 cookies[i].httpOnly = true; |
498 // Stop tracking no pending write events. | 458 found = true; |
499 _httpConnection._onNoPendingWrites = null; | 459 break; |
500 // Ensure that any trailing data is written. | 460 } |
501 _writeDone(); | 461 } |
502 // Indicate to the connection that the response handling is done. | 462 if (!found) { |
503 _httpConnection._responseClosed(); | 463 cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true); |
504 if (_streamClosedHandler != null) { | 464 } |
505 Timer.run(_streamClosedHandler); | 465 } |
506 } | 466 // Add all the cookies set to the headers. |
507 } | 467 if (_cookies != null) { |
508 | 468 _cookies.forEach((cookie) { |
509 void _streamSetNoPendingWriteHandler(callback()) { | 469 headers.add("set-cookie", cookie); |
510 if (_state != _HttpRequestResponseBase.DONE) { | 470 }); |
511 _httpConnection._onNoPendingWrites = callback; | 471 } |
512 } | 472 |
513 } | 473 headers._finalize(); |
514 | 474 |
515 void _streamSetClosedHandler(callback()) { | 475 // Write headers. |
516 _streamClosedHandler = callback; | 476 headers._write(this); |
517 } | 477 writeCRLF(); |
518 | |
519 void _streamSetErrorHandler(callback(e)) { | |
520 _streamErrorHandler = callback; | |
521 } | 478 } |
522 | 479 |
523 String _findReasonPhrase(int statusCode) { | 480 String _findReasonPhrase(int statusCode) { |
524 if (_reasonPhrase != null) { | 481 if (_reasonPhrase != null) { |
525 return _reasonPhrase; | 482 return _reasonPhrase; |
526 } | 483 } |
527 | 484 |
528 switch (statusCode) { | 485 switch (statusCode) { |
529 case HttpStatus.CONTINUE: return "Continue"; | 486 case HttpStatus.CONTINUE: return "Continue"; |
530 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; | 487 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
567 case HttpStatus.INTERNAL_SERVER_ERROR: return "Internal Server Error"; | 524 case HttpStatus.INTERNAL_SERVER_ERROR: return "Internal Server Error"; |
568 case HttpStatus.NOT_IMPLEMENTED: return "Not Implemented"; | 525 case HttpStatus.NOT_IMPLEMENTED: return "Not Implemented"; |
569 case HttpStatus.BAD_GATEWAY: return "Bad Gateway"; | 526 case HttpStatus.BAD_GATEWAY: return "Bad Gateway"; |
570 case HttpStatus.SERVICE_UNAVAILABLE: return "Service Unavailable"; | 527 case HttpStatus.SERVICE_UNAVAILABLE: return "Service Unavailable"; |
571 case HttpStatus.GATEWAY_TIMEOUT: return "Gateway Time-out"; | 528 case HttpStatus.GATEWAY_TIMEOUT: return "Gateway Time-out"; |
572 case HttpStatus.HTTP_VERSION_NOT_SUPPORTED: | 529 case HttpStatus.HTTP_VERSION_NOT_SUPPORTED: |
573 return "Http Version not supported"; | 530 return "Http Version not supported"; |
574 default: return "Status $statusCode"; | 531 default: return "Status $statusCode"; |
575 } | 532 } |
576 } | 533 } |
577 | 534 } |
578 bool _writeHeader() { | 535 |
579 List<int> data; | 536 |
580 | 537 class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest> |
581 // Write status line. | 538 implements HttpClientRequest { |
582 if (_protocolVersion == "1.1") { | 539 final String method; |
583 _httpConnection._write(_Const.HTTP11); | 540 final Uri uri; |
| 541 final List<Cookie> cookies = new List<Cookie>(); |
| 542 |
| 543 // The HttpClient this request belongs to. |
| 544 final _HttpClient _httpClient; |
| 545 final _HttpClientConnection _httpClientConnection; |
| 546 |
| 547 final Completer<HttpClientResponse> _responseCompleter |
| 548 = new Completer<HttpClientResponse>(); |
| 549 |
| 550 final bool _usingProxy; |
| 551 |
| 552 // TODO(ajohnsen): Get default value from client? |
| 553 bool _followRedirects = true; |
| 554 |
| 555 int _maxRedirects = 5; |
| 556 |
| 557 List<RedirectInfo> _responseRedirects = []; |
| 558 |
| 559 _HttpClientRequest(_HttpOutgoing outgoing, |
| 560 Uri this.uri, |
| 561 String this.method, |
| 562 bool this._usingProxy, |
| 563 _HttpClient this._httpClient, |
| 564 _HttpClientConnection this._httpClientConnection) |
| 565 : super("1.1", outgoing) { |
| 566 // GET and HEAD have 'content-length: 0' by default. |
| 567 if (method == "GET" || method == "HEAD") { |
| 568 contentLength = 0; |
| 569 } |
| 570 } |
| 571 |
| 572 Future<HttpClientResponse> get response => _responseCompleter.future; |
| 573 |
| 574 Future<HttpClientResponse> close() { |
| 575 super.close(); |
| 576 return response; |
| 577 } |
| 578 |
| 579 int get maxRedirects => _maxRedirects; |
| 580 void set maxRedirects(int maxRedirects) { |
| 581 if (_headersWritten) throw new StateError("Request already sent"); |
| 582 _maxRedirects = maxRedirects; |
| 583 } |
| 584 |
| 585 bool get followRedirects => _followRedirects; |
| 586 void set followRedirects(bool followRedirects) { |
| 587 if (_headersWritten) throw new StateError("Request already sent"); |
| 588 _followRedirects = followRedirects; |
| 589 } |
| 590 |
| 591 HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo; |
| 592 |
| 593 void _onIncoming(_HttpIncoming incoming) { |
| 594 var response = new _HttpClientResponse(incoming, |
| 595 this, |
| 596 _httpClient); |
| 597 Future<HttpClientResponse> future; |
| 598 if (followRedirects && response.isRedirect) { |
| 599 if (response.redirects.length < maxRedirects) { |
| 600 // Redirect and drain response. |
| 601 future = response.reduce(null, (x, y) {}) |
| 602 .then((_) => response.redirect()); |
| 603 } else { |
| 604 // End with exception, too many redirects. |
| 605 future = response.reduce(null, (x, y) {}) |
| 606 .then((_) => new Future.immediateError( |
| 607 new RedirectLimitExceededException(response.redirects))); |
| 608 } |
| 609 } else if (response._shouldAuthenticate) { |
| 610 future = response._authenticate(); |
584 } else { | 611 } else { |
585 _httpConnection._write(_Const.HTTP10); | 612 future = new Future<HttpClientResponse>.immediate(response); |
586 } | 613 } |
587 _writeSP(); | 614 future.then( |
588 data = _statusCode.toString().charCodes; | 615 (v) => _responseCompleter.complete(v), |
589 _httpConnection._write(data); | 616 onError: (e) { |
590 _writeSP(); | 617 _responseCompleter.completeError(e); |
591 data = reasonPhrase.charCodes; | 618 }); |
592 _httpConnection._write(data); | 619 } |
593 _writeCRLF(); | 620 |
594 | 621 void _onError(AsyncError error) { |
595 var session = _httpConnection._request._session; | 622 _responseCompleter.completeError(error); |
596 if (session != null && !session._destroyed) { | 623 } |
597 // Make sure we only send the current session id. | 624 |
598 bool found = false; | 625 void _writeHeader() { |
599 for (int i = 0; i < cookies.length; i++) { | 626 writeSP() => add([_CharCode.SP]); |
600 if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) { | 627 writeCRLF() => add([_CharCode.CR, _CharCode.LF]); |
601 cookies[i].value = session.id; | 628 |
602 cookies[i].httpOnly = true; | 629 addString(method); |
603 found = true; | 630 writeSP(); |
604 break; | 631 // Send the path for direct connections and the whole URL for |
| 632 // proxy connections. |
| 633 if (!_usingProxy) { |
| 634 String path = uri.path; |
| 635 if (path.length == 0) path = "/"; |
| 636 if (uri.query != "") { |
| 637 if (uri.fragment != "") { |
| 638 path = "${path}?${uri.query}#${uri.fragment}"; |
| 639 } else { |
| 640 path = "${path}?${uri.query}"; |
605 } | 641 } |
606 } | 642 } |
607 if (!found) { | 643 addString(path); |
608 cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true); | 644 } else { |
| 645 addString(uri.toString()); |
| 646 } |
| 647 writeSP(); |
| 648 add(_Const.HTTP11); |
| 649 writeCRLF(); |
| 650 |
| 651 // Add the cookies to the headers. |
| 652 if (!cookies.isEmpty) { |
| 653 StringBuffer sb = new StringBuffer(); |
| 654 for (int i = 0; i < cookies.length; i++) { |
| 655 if (i > 0) sb.add("; "); |
| 656 sb.add(cookies[i].name); |
| 657 sb.add("="); |
| 658 sb.add(cookies[i].value); |
609 } | 659 } |
610 } | 660 headers.add("cookie", sb.toString()); |
611 // Add all the cookies set to the headers. | 661 } |
612 if (_cookies != null) { | 662 |
613 _cookies.forEach((cookie) { | 663 headers._finalize(); |
614 _headers.add("set-cookie", cookie); | |
615 }); | |
616 } | |
617 | 664 |
618 // Write headers. | 665 // Write headers. |
619 _headers._finalize(_protocolVersion); | 666 headers._write(this); |
620 bool allWritten = _writeHeaders(); | 667 writeCRLF(); |
621 _state = _HttpRequestResponseBase.HEADER_SENT; | 668 } |
622 return allWritten; | 669 } |
623 } | 670 |
624 | 671 |
625 int _statusCode; // Response status code. | 672 // Transformer that transforms data to HTTP Chunked Encoding. |
626 String _reasonPhrase; // Response reason phrase. | 673 class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> { |
627 _HttpOutputStream _outputStream; | 674 final StreamController<List<int>> _controller |
628 Function _streamClosedHandler; | 675 = new StreamController<List<int>>(); |
629 Function _streamErrorHandler; | 676 |
630 } | 677 Stream<List<int>> bind(Stream<List<int>> stream) { |
631 | 678 var subscription = stream.listen( |
632 | 679 (data) { |
633 class _HttpInputStream extends _BaseDataInputStream implements InputStream { | 680 if (data.length == 0) return; // Avoid close on 0-bytes payload. |
634 _HttpInputStream(_HttpRequestResponseBase this._requestOrResponse) { | 681 _addChunk(data, _controller.add); |
635 _checkScheduleCallbacks(); | 682 }, |
636 } | 683 onDone: () { |
637 | 684 _addChunk([], _controller.add); |
638 int available() { | 685 _controller.close(); |
639 return _requestOrResponse._streamAvailable(); | 686 }); |
640 } | 687 return _controller.stream; |
641 | 688 } |
642 void pipe(OutputStream output, {bool close: true}) { | 689 |
643 _pipe(this, output, close: close); | 690 static void _addChunk(List<int> data, void add(List<int> data)) { |
644 } | 691 add(_chunkHeader(data.length)); |
645 | 692 if (data.length > 0) add(data); |
646 List<int> _read(int bytesToRead) { | 693 add(_chunkFooter); |
647 List<int> result = _requestOrResponse._streamRead(bytesToRead); | 694 } |
648 _checkScheduleCallbacks(); | 695 |
649 return result; | 696 static List<int> _chunkHeader(int length) { |
650 } | 697 const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, |
651 | 698 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46]; |
652 void set onError(void callback(e)) { | 699 var header = []; |
653 _requestOrResponse._streamSetErrorHandler(callback); | 700 if (length == 0) { |
654 } | 701 header.add(hexDigits[length]); |
655 | 702 } else { |
656 int _readInto(List<int> buffer, int offset, int len) { | 703 while (length > 0) { |
657 int result = _requestOrResponse._streamReadInto(buffer, offset, len); | 704 header.insertRange(0, 1, hexDigits[length % 16]); |
658 _checkScheduleCallbacks(); | 705 length = length >> 4; |
659 return result; | 706 } |
660 } | 707 } |
661 | 708 header.add(_CharCode.CR); |
662 void _close() { | 709 header.add(_CharCode.LF); |
663 // TODO(sgjesse): Handle this. | 710 return header; |
664 } | 711 } |
665 | 712 |
666 void _dataReceived() { | 713 // Footer is just a CRLF. |
667 super._dataReceived(); | 714 static List<int> get _chunkFooter => const [_CharCode.CR, _CharCode.LF]; |
668 } | 715 } |
669 | 716 |
670 _HttpRequestResponseBase _requestOrResponse; | 717 |
671 } | 718 // Transformer that invokes [_onDone] when completed. |
672 | 719 class _DoneTransformer implements StreamTransformer<List<int>, List<int>> { |
673 | 720 final StreamController<List<int>> _controller |
674 class _HttpOutputStream extends _BaseOutputStream implements OutputStream { | 721 = new StreamController<List<int>>(); |
675 _HttpOutputStream(_HttpRequestResponseBase this._requestOrResponse); | 722 final Function _onDone; |
676 | 723 |
677 bool write(List<int> buffer, [bool copyBuffer = true]) { | 724 _DoneTransformer(this._onDone); |
678 return _requestOrResponse._streamWrite(buffer, copyBuffer); | 725 |
679 } | 726 Stream<List<int>> bind(Stream<List<int>> stream) { |
680 | 727 var subscription = stream.listen( |
681 bool writeFrom(List<int> buffer, [int offset = 0, int len]) { | 728 _controller.add, |
682 if (offset < 0 || offset >= buffer.length) throw new ArgumentError(); | 729 onError: _controller.signalError, |
683 len = len != null ? len : buffer.length - offset; | 730 onDone: () { |
684 if (len < 0) throw new ArgumentError(); | 731 _onDone(); |
685 return _requestOrResponse._streamWriteFrom(buffer, offset, len); | 732 _controller.close(); |
686 } | 733 }); |
687 | 734 return _controller.stream; |
688 void flush() { | 735 } |
689 _requestOrResponse._streamFlush(); | 736 } |
| 737 |
| 738 // Transformer that validates the data written. |
| 739 class _DataValidatorTransformer |
| 740 implements StreamTransformer<List<int>, List<int>> { |
| 741 final StreamController<List<int>> _controller |
| 742 = new StreamController<List<int>>(); |
| 743 int _bytesWritten = 0; |
| 744 Completer _completer = new Completer(); |
| 745 |
| 746 int expectedTransferLength; |
| 747 |
| 748 _DataValidatorTransformer(); |
| 749 |
| 750 Future get validatorFuture => _completer.future; |
| 751 |
| 752 Stream<List<int>> bind(Stream<List<int>> stream) { |
| 753 var subscription; |
| 754 subscription = stream.listen( |
| 755 (data) { |
| 756 if (expectedTransferLength != null) { |
| 757 _bytesWritten += data.length; |
| 758 if (_bytesWritten > expectedTransferLength) { |
| 759 _controller.close(); |
| 760 subscription.cancel(); |
| 761 if (_completer != null) { |
| 762 _completer.completeError(new HttpException( |
| 763 "Content size exceeds specified contentLength. " |
| 764 "$_bytesWritten bytes written while expected " |
| 765 "$expectedTransferLength.")); |
| 766 _completer = null; |
| 767 } |
| 768 return; |
| 769 } |
| 770 } |
| 771 _controller.add(data); |
| 772 }, |
| 773 onError: (error) { |
| 774 _controller.close(); |
| 775 if (_completer != null) { |
| 776 _completer.completeError(error); |
| 777 _completer = null; |
| 778 } |
| 779 }, |
| 780 onDone: () { |
| 781 _controller.close(); |
| 782 if (expectedTransferLength != null) { |
| 783 if (_bytesWritten < expectedTransferLength) { |
| 784 if (_completer != null) { |
| 785 _completer.completeError(new HttpException( |
| 786 "Content size below specified contentLength. " |
| 787 " $_bytesWritten bytes written while expected " |
| 788 "$expectedTransferLength.")); |
| 789 _completer = null; |
| 790 return; |
| 791 } |
| 792 } |
| 793 } |
| 794 if (_completer != null) { |
| 795 _completer.complete(this); |
| 796 _completer = null; |
| 797 } |
| 798 }, |
| 799 unsubscribeOnError: true); |
| 800 return _controller.stream; |
| 801 } |
| 802 } |
| 803 |
| 804 // Extends StreamConsumer as this is an internal type, only used to pipe to. |
| 805 class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> { |
| 806 final Completer _dataCompleter = new Completer(); |
| 807 final Completer _streamCompleter = new Completer(); |
| 808 final _DataValidatorTransformer _validator = new _DataValidatorTransformer(); |
| 809 |
| 810 // Future that completes when all data is written. |
| 811 Future get dataDone => _dataCompleter.future; |
| 812 |
| 813 // Future that completes with the Stream, once the _HttpClientConnection is |
| 814 // bound to one. |
| 815 Future<Stream<List<int>>> get stream => _streamCompleter.future; |
| 816 |
| 817 void setTransferLength(int transferLength) { |
| 818 _validator.expectedTransferLength = transferLength; |
| 819 } |
| 820 |
| 821 Future consume(Stream<List<int>> stream) { |
| 822 stream = stream.transform(_validator); |
| 823 _streamCompleter.complete(stream); |
| 824 _validator.validatorFuture.catchError((e) { |
| 825 _dataCompleter.completeError(e); |
| 826 }); |
| 827 return _validator.validatorFuture.then((v) { |
| 828 _dataCompleter.complete(); |
| 829 return v; |
| 830 }); |
| 831 } |
| 832 } |
| 833 |
| 834 |
| 835 class _HttpClientConnection { |
| 836 final String key; |
| 837 final Socket _socket; |
| 838 final _HttpParser _httpParser; |
| 839 StreamSubscription _subscription; |
| 840 final _HttpClient _httpClient; |
| 841 |
| 842 Completer<_HttpIncoming> _nextResponseCompleter; |
| 843 Future _writeDoneFuture; |
| 844 |
| 845 _HttpClientConnection(String this.key, |
| 846 Socket this._socket, |
| 847 _HttpClient this._httpClient) |
| 848 : _httpParser = new _HttpParser.responseParser() { |
| 849 _socket.pipe(_httpParser); |
| 850 _socket.done.catchError((e) { destroy(); }); |
| 851 |
| 852 // Set up handlers on the parser here, so we are sure to get 'onDone' from |
| 853 // the parser. |
| 854 _subscription = _httpParser.listen( |
| 855 (incoming) { |
| 856 // Only handle one incoming response at the time. Keep the |
| 857 // stream paused until the response have been processed. |
| 858 _subscription.pause(); |
| 859 // We assume the response is not here, until we have send the request. |
| 860 assert(_nextResponseCompleter != null); |
| 861 _nextResponseCompleter.complete(incoming); |
| 862 }, |
| 863 onError: (error) { |
| 864 if (_nextResponseCompleter != null) { |
| 865 _nextResponseCompleter.completeError(error); |
| 866 } |
| 867 }, |
| 868 onDone: () { |
| 869 close(); |
| 870 }); |
| 871 } |
| 872 |
| 873 Future<_HttpIncoming> sendRequest(_HttpOutgoing outgoing) { |
| 874 return outgoing.stream |
| 875 .then((stream) { |
| 876 // Close socket if output data is invalid. |
| 877 outgoing.dataDone.catchError((e) { |
| 878 close(); |
| 879 }); |
| 880 // Sending request, set up response completer. |
| 881 _nextResponseCompleter = new Completer(); |
| 882 _writeDoneFuture = _socket.addStream(stream); |
| 883 // Listen for response. |
| 884 return _nextResponseCompleter.future |
| 885 .whenComplete(() { |
| 886 _nextResponseCompleter = null; |
| 887 }) |
| 888 .then((incoming) { |
| 889 incoming.dataDone.then((_) { |
| 890 if (!incoming.headers.persistentConnection) { |
| 891 close(); |
| 892 } else { |
| 893 // Wait for the socket to be done with writing, before we |
| 894 // continue. |
| 895 _writeDoneFuture.then((_) { |
| 896 _subscription.resume(); |
| 897 // Return connection, now we are done. |
| 898 _httpClient._returnConnection(this); |
| 899 }); |
| 900 } |
| 901 }); |
| 902 // TODO(ajohnsen): Can there be an error on dataDone? |
| 903 return incoming; |
| 904 }) |
| 905 // If we see a state error, we failed to get the 'first' element. |
| 906 // Transform the error to a HttpParserException, for consistency. |
| 907 .catchError((error) { |
| 908 throw new HttpParserException( |
| 909 "Connection closed before data was received"); |
| 910 }, test: (error) => error is StateError) |
| 911 .catchError((error) { |
| 912 // We are done with the socket. |
| 913 destroy(); |
| 914 throw error; |
| 915 }); |
| 916 }); |
| 917 } |
| 918 |
| 919 Future<Socket> detachSocket() { |
| 920 return _writeDoneFuture.then((_) => |
| 921 new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
| 922 } |
| 923 |
| 924 void destroy() { |
| 925 _socket.destroy(); |
| 926 _httpClient._connectionClosed(this); |
690 } | 927 } |
691 | 928 |
692 void close() { | 929 void close() { |
693 _requestOrResponse._streamClose(); | 930 var future = _writeDoneFuture; |
694 } | 931 if (future == null) future = new Future.immediate(null); |
695 | 932 _httpClient._connectionClosed(this); |
696 bool get closed => _requestOrResponse._done; | 933 future.then((_) { |
| 934 _socket.close(); |
| 935 // TODO(ajohnsen): Add timeout. |
| 936 // Delay destroy until socket is actually done writing. |
| 937 _socket.done.then((_) => _socket.destroy(), |
| 938 onError: (_) => _socket.destroy()); |
| 939 }); |
| 940 } |
| 941 |
| 942 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
| 943 } |
| 944 |
| 945 class _ConnnectionInfo { |
| 946 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); |
| 947 final _HttpClientConnection connection; |
| 948 final _Proxy proxy; |
| 949 } |
| 950 |
| 951 |
| 952 class _HttpClient implements HttpClient { |
| 953 // TODO(ajohnsen): Use eviction timeout. |
| 954 static const int DEFAULT_EVICTION_TIMEOUT = 60000; |
| 955 bool _closing = false; |
| 956 |
| 957 final Map<String, Queue<_HttpClientConnection>> _idleConnections |
| 958 = new Map<String, Queue<_HttpClientConnection>>(); |
| 959 final Set<_HttpClientConnection> _activeConnections |
| 960 = new Set<_HttpClientConnection>(); |
| 961 final List<_Credentials> _credentials = []; |
| 962 Function _authenticate; |
| 963 Function _findProxy; |
| 964 |
| 965 Future<HttpClientRequest> open(String method, |
| 966 String host, |
| 967 int port, |
| 968 String path) { |
| 969 // TODO(sgjesse): The path set here can contain both query and |
| 970 // fragment. They should be cracked and set correctly. |
| 971 return _openUrl(method, new Uri.fromComponents( |
| 972 scheme: "http", domain: host, port: port, path: path)); |
| 973 } |
| 974 |
| 975 Future<HttpClientRequest> openUrl(String method, Uri url) { |
| 976 return _openUrl(method, url); |
| 977 } |
| 978 |
| 979 Future<HttpClientRequest> get(String host, |
| 980 int port, |
| 981 String path) { |
| 982 return open("get", host, port, path); |
| 983 } |
| 984 |
| 985 Future<HttpClientRequest> getUrl(Uri url) { |
| 986 return _openUrl("get", url); |
| 987 } |
| 988 |
| 989 Future<HttpClientRequest> post(String host, |
| 990 int port, |
| 991 String path) { |
| 992 return open("post", host, port, path); |
| 993 } |
| 994 |
| 995 Future<HttpClientRequest> postUrl(Uri url) { |
| 996 return _openUrl("post", url); |
| 997 } |
| 998 |
| 999 void close({bool force: false}) { |
| 1000 _closing = true; |
| 1001 // Create flattened copy of _idleConnections, as 'destory' will manipulate |
| 1002 // it. |
| 1003 var idle = _idleConnections.values.reduce( |
| 1004 [], |
| 1005 (l, e) { |
| 1006 l.addAll(e); |
| 1007 return l; |
| 1008 }); |
| 1009 idle.forEach((e) { |
| 1010 e.close(); |
| 1011 }); |
| 1012 assert(_idleConnections.isEmpty); |
| 1013 if (force) { |
| 1014 for (var connection in _activeConnections.toList()) { |
| 1015 connection.destroy(); |
| 1016 } |
| 1017 assert(_activeConnections.isEmpty); |
| 1018 _activeConnections.clear(); |
| 1019 } |
| 1020 } |
| 1021 |
| 1022 set authenticate(Future<bool> f(Uri url, String scheme, String realm)) { |
| 1023 _authenticate = f; |
| 1024 } |
| 1025 |
| 1026 void addCredentials(Uri url, String realm, HttpClientCredentials cr) { |
| 1027 _credentials.add(new _Credentials(url, realm, cr)); |
| 1028 } |
| 1029 |
| 1030 set findProxy(String f(Uri uri)) => _findProxy = f; |
| 1031 |
| 1032 Future<HttpClientRequest> _openUrl(String method, Uri uri) { |
| 1033 if (method == null) { |
| 1034 throw new ArgumentError(method); |
| 1035 } |
| 1036 if (uri.domain.isEmpty || (uri.scheme != "http" && uri.scheme != "https")) { |
| 1037 throw new ArgumentError("Unsupported scheme '${uri.scheme}' in $uri"); |
| 1038 } |
| 1039 |
| 1040 bool isSecure = (uri.scheme == "https"); |
| 1041 int port = uri.port; |
| 1042 if (port == 0) { |
| 1043 port = isSecure ? |
| 1044 HttpClient.DEFAULT_HTTPS_PORT : |
| 1045 HttpClient.DEFAULT_HTTP_PORT; |
| 1046 } |
| 1047 // Check to see if a proxy server should be used for this connection. |
| 1048 var proxyConf = const _ProxyConfiguration.direct(); |
| 1049 if (_findProxy != null) { |
| 1050 // TODO(sgjesse): Keep a map of these as normally only a few |
| 1051 // configuration strings will be used. |
| 1052 try { |
| 1053 proxyConf = new _ProxyConfiguration(_findProxy(uri)); |
| 1054 } catch (error, stackTrace) { |
| 1055 return new Future.immediateError(error, stackTrace); |
| 1056 } |
| 1057 } |
| 1058 return _getConnection(uri.domain, port, proxyConf, isSecure).then((info) { |
| 1059 // Create new internal outgoing connection. |
| 1060 var outgoing = new _HttpOutgoing(); |
| 1061 // Create new request object, wrapping the outgoing connection. |
| 1062 var request = new _HttpClientRequest(outgoing, |
| 1063 uri, |
| 1064 method.toUpperCase(), |
| 1065 !info.proxy.isDirect, |
| 1066 this, |
| 1067 info.connection); |
| 1068 request.headers.host = uri.domain; |
| 1069 request.headers.port = port; |
| 1070 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
| 1071 // If the URL contains user information use that for basic |
| 1072 // authorization |
| 1073 String auth = |
| 1074 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); |
| 1075 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
| 1076 } else { |
| 1077 // Look for credentials. |
| 1078 _Credentials cr = _findCredentials(uri); |
| 1079 if (cr != null) { |
| 1080 cr.authorize(request); |
| 1081 } |
| 1082 } |
| 1083 // Start sending the request (lazy, delayed until the user provides |
| 1084 // data). |
| 1085 info.connection._httpParser.responseToMethod = method; |
| 1086 info.connection.sendRequest(outgoing) |
| 1087 .then((incoming) { |
| 1088 // The full request have been sent and a response is received |
| 1089 // containing status-code, headers and etc. |
| 1090 request._onIncoming(incoming); |
| 1091 }) |
| 1092 .catchError((error) { |
| 1093 // An error occoured before the http-header was parsed. This |
| 1094 // could be either a socket-error or parser-error. |
| 1095 request._onError(error); |
| 1096 }); |
| 1097 // Return the request to the user. Immediate socket errors are not |
| 1098 // handled, thus forwarded to the user. |
| 1099 return request; |
| 1100 }); |
| 1101 } |
| 1102 |
| 1103 Future<HttpClientRequest> _openUrlFromRequest(String method, |
| 1104 Uri uri, |
| 1105 _HttpClientRequest previous) { |
| 1106 return openUrl(method, uri).then((request) { |
| 1107 // Only follow redirects if initial request did. |
| 1108 request.followRedirects = previous.followRedirects; |
| 1109 // Allow same number of redirects. |
| 1110 request.maxRedirects = previous.maxRedirects; |
| 1111 // Copy headers |
| 1112 for (var header in previous.headers._headers.keys) { |
| 1113 if (request.headers[header] == null) { |
| 1114 request.headers.set(header, previous.headers[header]); |
| 1115 } |
| 1116 } |
| 1117 request.headers.chunkedTransferEncoding = false; |
| 1118 request.contentLength = 0; |
| 1119 return request; |
| 1120 }); |
| 1121 } |
| 1122 |
| 1123 // Return a live connection to the idle pool. |
| 1124 void _returnConnection(_HttpClientConnection connection) { |
| 1125 _activeConnections.remove(connection); |
| 1126 if (_closing) { |
| 1127 connection.close(); |
| 1128 return; |
| 1129 } |
| 1130 // TODO(ajohnsen): Listen for socket close events. |
| 1131 if (!_idleConnections.containsKey(connection.key)) { |
| 1132 _idleConnections[connection.key] = new Queue(); |
| 1133 } |
| 1134 _idleConnections[connection.key].addLast(connection); |
| 1135 } |
| 1136 |
| 1137 // Remove a closed connnection from the active set. |
| 1138 void _connectionClosed(_HttpClientConnection connection) { |
| 1139 _activeConnections.remove(connection); |
| 1140 if (_idleConnections.containsKey(connection.key)) { |
| 1141 _idleConnections[connection.key].remove(connection); |
| 1142 if (_idleConnections[connection.key].isEmpty) { |
| 1143 _idleConnections.remove(connection.key); |
| 1144 } |
| 1145 } |
| 1146 } |
| 1147 |
| 1148 // Get a new _HttpClientConnection, either from the idle pool or created from |
| 1149 // a new Socket. |
| 1150 Future<_ConnnectionInfo> _getConnection(String uriHost, |
| 1151 int uriPort, |
| 1152 _ProxyConfiguration proxyConf, |
| 1153 bool isSecure) { |
| 1154 Iterator<_Proxy> proxies = proxyConf.proxies.iterator; |
| 1155 |
| 1156 Future<_ConnnectionInfo> connect(error) { |
| 1157 if (!proxies.moveNext()) return new Future.immediateError(error); |
| 1158 _Proxy proxy = proxies.current; |
| 1159 String host = proxy.isDirect ? uriHost: proxy.host; |
| 1160 int port = proxy.isDirect ? uriPort: proxy.port; |
| 1161 String key = isSecure ? "ssh:$host:$port" : "$host:$port"; |
| 1162 if (_idleConnections.containsKey(key)) { |
| 1163 var connection = _idleConnections[key].removeFirst(); |
| 1164 if (_idleConnections[key].isEmpty) { |
| 1165 _idleConnections.remove(key); |
| 1166 } |
| 1167 _activeConnections.add(connection); |
| 1168 return new Future.immediate(new _ConnnectionInfo(connection, proxy)); |
| 1169 } |
| 1170 return (isSecure && proxy.isDirect |
| 1171 ? SecureSocket.connect(host, |
| 1172 port, |
| 1173 sendClientCertificate: true) |
| 1174 : Socket.connect(host, port)) |
| 1175 .then((socket) { |
| 1176 var connection = new _HttpClientConnection(key, socket, this); |
| 1177 _activeConnections.add(connection); |
| 1178 return new _ConnnectionInfo(connection, proxy); |
| 1179 }, onError: (error) { |
| 1180 // Continue with next proxy. |
| 1181 return connect(error.error); |
| 1182 }); |
| 1183 } |
| 1184 return connect(new HttpException("No proxies given")); |
| 1185 } |
| 1186 |
| 1187 _Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) { |
| 1188 // Look for credentials. |
| 1189 _Credentials cr = |
| 1190 _credentials.reduce(null, (_Credentials prev, _Credentials value) { |
| 1191 if (value.applies(url, scheme)) { |
| 1192 if (prev == null) return value; |
| 1193 return value.uri.path.length > prev.uri.path.length ? value : prev; |
| 1194 } else { |
| 1195 return prev; |
| 1196 } |
| 1197 }); |
| 1198 return cr; |
| 1199 } |
| 1200 |
| 1201 void _removeCredentials(_Credentials cr) { |
| 1202 int index = _credentials.indexOf(cr); |
| 1203 if (index != -1) { |
| 1204 _credentials.removeAt(index); |
| 1205 } |
| 1206 } |
| 1207 } |
| 1208 |
| 1209 |
| 1210 class _HttpConnection { |
| 1211 static const _ACTIVE = 0; |
| 1212 static const _IDLE = 1; |
| 1213 static const _CLOSING = 2; |
| 1214 static const _DETACHED = 3; |
| 1215 |
| 1216 int _state = _IDLE; |
| 1217 |
| 1218 final Socket _socket; |
| 1219 final _HttpServer _httpServer; |
| 1220 final _HttpParser _httpParser; |
| 1221 StreamSubscription _subscription; |
| 1222 |
| 1223 Future _writeDoneFuture; |
| 1224 |
| 1225 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
| 1226 : _httpParser = new _HttpParser.requestParser() { |
| 1227 _socket.pipe(_httpParser); |
| 1228 _socket.done.catchError((e) => destroy()); |
| 1229 _subscription = _httpParser.listen( |
| 1230 (incoming) { |
| 1231 // Only handle one incoming request at the time. Keep the |
| 1232 // stream paused until the request has been send. |
| 1233 _subscription.pause(); |
| 1234 _state = _ACTIVE; |
| 1235 var outgoing = new _HttpOutgoing(); |
| 1236 _writeDoneFuture = outgoing.stream.then(_socket.addStream); |
| 1237 var response = new _HttpResponse( |
| 1238 incoming.headers.protocolVersion, |
| 1239 outgoing); |
| 1240 var request = new _HttpRequest(response, incoming, _httpServer, this); |
| 1241 response._ignoreBody = request.method == "HEAD"; |
| 1242 response._httpRequest = request; |
| 1243 outgoing.dataDone.then((_) { |
| 1244 if (_state == _DETACHED) return; |
| 1245 if (response.headers.persistentConnection && |
| 1246 incoming.fullBodyRead) { |
| 1247 // Wait for the socket to be done with writing, before we |
| 1248 // continue. |
| 1249 _writeDoneFuture.then((_) { |
| 1250 _state = _IDLE; |
| 1251 // Resume the subscription for incoming requests as the |
| 1252 // request is now processed. |
| 1253 _subscription.resume(); |
| 1254 }); |
| 1255 } else { |
| 1256 // Close socket, keep-alive not used or body sent before received |
| 1257 // data was handled. |
| 1258 close(); |
| 1259 } |
| 1260 }).catchError((e) { |
| 1261 close(); |
| 1262 }); |
| 1263 _httpServer._handleRequest(request); |
| 1264 }, |
| 1265 onDone: () { |
| 1266 close(); |
| 1267 }, |
| 1268 onError: (error) { |
| 1269 _httpServer._handleError(error); |
| 1270 destroy(); |
| 1271 }); |
| 1272 } |
697 | 1273 |
698 void destroy() { | 1274 void destroy() { |
699 throw "Not implemented"; | 1275 if (_state == _CLOSING || _state == _DETACHED) return; |
700 } | 1276 _state = _CLOSING; |
701 | 1277 _socket.destroy(); |
702 void set onNoPendingWrites(void callback()) { | 1278 _httpServer._connectionClosed(this); |
703 _requestOrResponse._streamSetNoPendingWriteHandler(callback); | |
704 } | |
705 | |
706 void set onClosed(void callback()) { | |
707 _requestOrResponse._streamSetClosedHandler(callback); | |
708 } | |
709 | |
710 void set onError(void callback(e)) { | |
711 _requestOrResponse._streamSetErrorHandler(callback); | |
712 } | |
713 | |
714 _HttpRequestResponseBase _requestOrResponse; | |
715 } | |
716 | |
717 | |
718 abstract class _HttpConnectionBase { | |
719 static const int IDLE = 0; | |
720 static const int ACTIVE = 1; | |
721 static const int CLOSING = 2; | |
722 static const int REQUEST_DONE = 4; | |
723 static const int RESPONSE_DONE = 8; | |
724 static const int ALL_DONE = REQUEST_DONE | RESPONSE_DONE; | |
725 static const int READ_CLOSED = 16; | |
726 static const int WRITE_CLOSED = 32; | |
727 static const int FULLY_CLOSED = READ_CLOSED | WRITE_CLOSED; | |
728 | |
729 _HttpConnectionBase() : hashCode = _nextHashCode { | |
730 _nextHashCode = (_nextHashCode + 1) & 0xFFFFFFF; | |
731 } | |
732 | |
733 bool get _isIdle => (_state & ACTIVE) == 0; | |
734 bool get _isActive => (_state & ACTIVE) == ACTIVE; | |
735 bool get _isClosing => (_state & CLOSING) == CLOSING; | |
736 bool get _isRequestDone => (_state & REQUEST_DONE) == REQUEST_DONE; | |
737 bool get _isResponseDone => (_state & RESPONSE_DONE) == RESPONSE_DONE; | |
738 bool get _isAllDone => (_state & ALL_DONE) == ALL_DONE; | |
739 bool get _isReadClosed => (_state & READ_CLOSED) == READ_CLOSED; | |
740 bool get _isWriteClosed => (_state & WRITE_CLOSED) == WRITE_CLOSED; | |
741 bool get _isFullyClosed => (_state & FULLY_CLOSED) == FULLY_CLOSED; | |
742 | |
743 void _connectionEstablished(Socket socket) { | |
744 _socket = socket; | |
745 // Register handlers for socket events. All socket events are | |
746 // passed to the HTTP parser. | |
747 _socket.onData = () { | |
748 List<int> buffer = _socket.read(); | |
749 if (buffer != null) { | |
750 _httpParser.streamData(buffer); | |
751 } | |
752 }; | |
753 _socket.onClosed = _httpParser.streamDone; | |
754 _socket.onError = _httpParser.streamError; | |
755 _socket.outputStream.onError = _httpParser.streamError; | |
756 } | |
757 | |
758 bool _write(List<int> data, [bool copyBuffer = false]); | |
759 bool _writeFrom(List<int> buffer, [int offset, int len]); | |
760 bool _flush(); | |
761 bool _close(); | |
762 bool _destroy(); | |
763 DetachedSocket _detachSocket(); | |
764 | |
765 HttpConnectionInfo get connectionInfo { | |
766 if (_socket == null) return null; | |
767 try { | |
768 _HttpConnectionInfo info = new _HttpConnectionInfo(); | |
769 info.remoteHost = _socket.remoteHost; | |
770 info.remotePort = _socket.remotePort; | |
771 info.localPort = _socket.port; | |
772 return info; | |
773 } catch (e) { } | |
774 return null; | |
775 } | |
776 | |
777 void set _onNoPendingWrites(void callback()) { | |
778 _socket.outputStream.onNoPendingWrites = callback; | |
779 } | |
780 | |
781 int _state = IDLE; | |
782 | |
783 Socket _socket; | |
784 _HttpParser _httpParser; | |
785 | |
786 // Callbacks. | |
787 Function onDetach; | |
788 Function onClosed; | |
789 | |
790 // Hash code for HTTP connection. Currently this is just a counter. | |
791 final int hashCode; | |
792 static int _nextHashCode = 0; | |
793 } | |
794 | |
795 | |
796 // HTTP server connection over a socket. | |
797 class _HttpConnection extends _HttpConnectionBase { | |
798 _HttpConnection(HttpServer this._server) { | |
799 _httpParser = new _HttpParser.requestParser(); | |
800 // Register HTTP parser callbacks. | |
801 _httpParser.requestStart = _onRequestReceived; | |
802 _httpParser.dataReceived = _onDataReceived; | |
803 _httpParser.dataEnd = _onDataEnd; | |
804 _httpParser.error = _onError; | |
805 _httpParser.closed = _onClosed; | |
806 _httpParser.responseStart = (statusCode, reasonPhrase, version) { | |
807 assert(false); | |
808 }; | |
809 } | |
810 | |
811 void _bufferData(List<int> data, [bool copyBuffer = false]) { | |
812 if (_buffer == null) _buffer = new _BufferList(); | |
813 if (copyBuffer) data = data.getRange(0, data.length); | |
814 _buffer.add(data); | |
815 } | |
816 | |
817 void _writeBufferedResponse() { | |
818 if (_buffer != null) { | |
819 while (!_buffer.isEmpty) { | |
820 var data = _buffer.first; | |
821 _socket.outputStream.write(data, false); | |
822 _buffer.removeBytes(data.length); | |
823 } | |
824 _buffer = null; | |
825 } | |
826 } | |
827 | |
828 bool _write(List<int> data, [bool copyBuffer = false]) { | |
829 if (_isRequestDone || !_hasBody || _httpParser.upgrade) { | |
830 return _socket.outputStream.write(data, copyBuffer); | |
831 } else { | |
832 _bufferData(data, copyBuffer); | |
833 return false; | |
834 } | |
835 } | |
836 | |
837 bool _writeFrom(List<int> data, [int offset, int len]) { | |
838 if (_isRequestDone || !_hasBody || _httpParser.upgrade) { | |
839 return _socket.outputStream.writeFrom(data, offset, len); | |
840 } else { | |
841 if (offset == null) offset = 0; | |
842 if (len == null) len = buffer.length - offset; | |
843 _bufferData(data.getRange(offset, len), false); | |
844 return false; | |
845 } | |
846 } | |
847 | |
848 bool _flush() { | |
849 _socket.outputStream.flush(); | |
850 } | |
851 | |
852 bool _close() { | |
853 _socket.outputStream.close(); | |
854 } | |
855 | |
856 bool _destroy() { | |
857 _socket.close(); | |
858 } | |
859 | |
860 void _onClosed() { | |
861 _state |= _HttpConnectionBase.READ_CLOSED; | |
862 _checkDone(); | |
863 } | |
864 | |
865 DetachedSocket _detachSocket() { | |
866 _socket.onData = null; | |
867 _socket.onClosed = null; | |
868 _socket.onError = null; | |
869 _socket.outputStream.onNoPendingWrites = null; | |
870 _writeBufferedResponse(); | |
871 Socket socket = _socket; | |
872 _socket = null; | |
873 if (onDetach != null) onDetach(); | |
874 return new _DetachedSocket(socket, _httpParser.readUnparsedData()); | |
875 } | |
876 | |
877 void _onError(e) { | |
878 // Don't report errors for a request parser when HTTP parser is in | |
879 // idle state. Clients can close the connection and cause a | |
880 // connection reset by peer error which is OK. | |
881 _onClosed(); | |
882 if (_state == _HttpConnectionBase.IDLE) return; | |
883 | |
884 // Propagate the error to the streams. | |
885 if (_request != null && | |
886 !_isRequestDone && | |
887 _request._streamErrorHandler != null) { | |
888 _request._streamErrorHandler(e); | |
889 } else if (_response != null && | |
890 !_isResponseDone && | |
891 _response._streamErrorHandler != null) { | |
892 _response._streamErrorHandler(e); | |
893 } else { | |
894 onError(e); | |
895 } | |
896 if (_socket != null) _socket.close(); | |
897 } | |
898 | |
899 void _onRequestReceived(String method, | |
900 String uri, | |
901 String version, | |
902 _HttpHeaders headers, | |
903 bool hasBody) { | |
904 _state = _HttpConnectionBase.ACTIVE; | |
905 // Create new request and response objects for this request. | |
906 _request = new _HttpRequest(this); | |
907 _response = new _HttpResponse(this); | |
908 _request._onRequestReceived(method, uri, version, headers); | |
909 _request._protocolVersion = version; | |
910 _response._protocolVersion = version; | |
911 _response._headResponse = method == "HEAD"; | |
912 _response.persistentConnection = _httpParser.persistentConnection; | |
913 _hasBody = hasBody; | |
914 if (onRequestReceived != null) { | |
915 onRequestReceived(_request, _response); | |
916 } | |
917 _checkDone(); | |
918 } | |
919 | |
920 void _onDataReceived(List<int> data) { | |
921 _request._onDataReceived(data); | |
922 _checkDone(); | |
923 } | |
924 | |
925 void _checkDone() { | |
926 if (_isReadClosed) { | |
927 // If the client closes the conversation is ended. | |
928 _server._closeQueue.add(this); | |
929 } else if (_isAllDone) { | |
930 // If we are done writing the response, and the connection is | |
931 // not persistent, we must close. Also if using HTTP 1.0 and the | |
932 // content length was not known we must close to indicate end of | |
933 // body. | |
934 bool close = | |
935 !_response.persistentConnection || | |
936 (_response._protocolVersion == "1.0" && _response.contentLength < 0); | |
937 _request = null; | |
938 _response = null; | |
939 if (close) { | |
940 _httpParser.cancel(); | |
941 _server._closeQueue.add(this); | |
942 } else { | |
943 _state = _HttpConnectionBase.IDLE; | |
944 } | |
945 } else if (_isResponseDone && _hasBody) { | |
946 // If the response is closed before the request is fully read | |
947 // close this connection. If there is buffered output | |
948 // (e.g. error response for invalid request where the server did | |
949 // not care to read the request body) this is send. | |
950 assert(!_isRequestDone); | |
951 _writeBufferedResponse(); | |
952 _httpParser.cancel(); | |
953 _server._closeQueue.add(this); | |
954 } | |
955 } | |
956 | |
957 void _onDataEnd(bool close) { | |
958 // Start sending queued response if any. | |
959 _state |= _HttpConnectionBase.REQUEST_DONE; | |
960 _writeBufferedResponse(); | |
961 _request._onDataEnd(); | |
962 _checkDone(); | |
963 } | |
964 | |
965 void _responseClosed() { | |
966 _state |= _HttpConnectionBase.RESPONSE_DONE; | |
967 } | |
968 | |
969 HttpServer _server; | |
970 HttpRequest _request; | |
971 HttpResponse _response; | |
972 bool _hasBody = false; | |
973 | |
974 // Buffer for data written before full response has been processed. | |
975 _BufferList _buffer; | |
976 | |
977 // Callbacks. | |
978 Function onRequestReceived; | |
979 Function onError; | |
980 } | |
981 | |
982 | |
983 class _RequestHandlerRegistration { | |
984 _RequestHandlerRegistration(Function this._matcher, Function this._handler); | |
985 Function _matcher; | |
986 Function _handler; | |
987 } | |
988 | |
989 // HTTP server waiting for socket connections. The connections are | |
990 // managed by the server and as requests are received the request. | |
991 // HTTPS connections are also supported, if the _HttpServer.httpsServer | |
992 // constructor is used and a certificate name is provided in listen, | |
993 // or a SecureServerSocket is provided to listenOn. | |
994 class _HttpServer implements HttpServer, HttpsServer { | |
995 _HttpServer() : this._internal(isSecure: false); | |
996 | |
997 _HttpServer.httpsServer() : this._internal(isSecure: true); | |
998 | |
999 _HttpServer._internal({ bool isSecure: false }) | |
1000 : _secure = isSecure, | |
1001 _connections = new Set<_HttpConnection>(), | |
1002 _handlers = new List<_RequestHandlerRegistration>(), | |
1003 _closeQueue = new _CloseQueue(); | |
1004 | |
1005 void listen(String host, | |
1006 int port, | |
1007 {int backlog: 128, | |
1008 String certificate_name, | |
1009 bool requestClientCertificate: false}) { | |
1010 if (_secure) { | |
1011 listenOn(new SecureServerSocket( | |
1012 host, | |
1013 port, | |
1014 backlog, | |
1015 certificate_name, | |
1016 requestClientCertificate: requestClientCertificate)); | |
1017 } else { | |
1018 listenOn(new ServerSocket(host, port, backlog)); | |
1019 } | |
1020 _closeServer = true; | |
1021 } | |
1022 | |
1023 void listenOn(ServerSocket serverSocket) { | |
1024 if (_secure && serverSocket is! SecureServerSocket) { | |
1025 throw new HttpException( | |
1026 'HttpsServer.listenOn was called with non-secure server socket'); | |
1027 } else if (!_secure && serverSocket is SecureServerSocket) { | |
1028 throw new HttpException( | |
1029 'HttpServer.listenOn was called with a secure server socket'); | |
1030 } | |
1031 void onConnection(Socket socket) { | |
1032 // Accept the client connection. | |
1033 _HttpConnection connection = new _HttpConnection(this); | |
1034 connection._connectionEstablished(socket); | |
1035 _connections.add(connection); | |
1036 connection.onRequestReceived = _handleRequest; | |
1037 connection.onClosed = () => _connections.remove(connection); | |
1038 connection.onDetach = () => _connections.remove(connection); | |
1039 connection.onError = (e) { | |
1040 _connections.remove(connection); | |
1041 if (_onError != null) { | |
1042 _onError(e); | |
1043 } else { | |
1044 throw(e); | |
1045 } | |
1046 }; | |
1047 } | |
1048 serverSocket.onConnection = onConnection; | |
1049 _server = serverSocket; | |
1050 _closeServer = false; | |
1051 } | |
1052 | |
1053 addRequestHandler(bool matcher(HttpRequest request), | |
1054 void handler(HttpRequest request, HttpResponse response)) { | |
1055 _handlers.add(new _RequestHandlerRegistration(matcher, handler)); | |
1056 } | |
1057 | |
1058 void set defaultRequestHandler( | |
1059 void handler(HttpRequest request, HttpResponse response)) { | |
1060 _defaultHandler = handler; | |
1061 } | 1279 } |
1062 | 1280 |
1063 void close() { | 1281 void close() { |
1064 _closeQueue.shutdown(); | 1282 if (_state == _CLOSING || _state == _DETACHED) return; |
| 1283 _state = _CLOSING; |
| 1284 var future = _writeDoneFuture; |
| 1285 if (future == null) future = new Future.immediate(null); |
| 1286 _httpServer._connectionClosed(this); |
| 1287 future.then((_) { |
| 1288 _socket.close(); |
| 1289 // TODO(ajohnsen): Add timeout. |
| 1290 // Delay destroy until socket is actually done writing. |
| 1291 _socket.done.then((_) => _socket.destroy(), |
| 1292 onError: (_) => _socket.destroy()); |
| 1293 }); |
| 1294 } |
| 1295 |
| 1296 Future<Socket> detachSocket() { |
| 1297 _state = _DETACHED; |
| 1298 // Remove connection from server. |
| 1299 _httpServer._connectionClosed(this); |
| 1300 |
| 1301 _HttpDetachedIncoming detachedIncoming = _httpParser.detachIncoming(); |
| 1302 |
| 1303 return _writeDoneFuture.then((_) { |
| 1304 return new _DetachedSocket(_socket, detachedIncoming); |
| 1305 }); |
| 1306 } |
| 1307 |
| 1308 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
| 1309 |
| 1310 bool get _isActive => _state == _ACTIVE; |
| 1311 bool get _isIdle => _state == _IDLE; |
| 1312 bool get _isClosing => _state == _CLOSING; |
| 1313 bool get _isDetached => _state == _DETACHED; |
| 1314 } |
| 1315 |
| 1316 |
| 1317 // HTTP server waiting for socket connections. |
| 1318 class _HttpServer extends Stream<HttpRequest> implements HttpServer { |
| 1319 |
| 1320 static Future<HttpServer> bind(String host, int port, int backlog) { |
| 1321 return ServerSocket.bind(host, port, backlog).then((socket) { |
| 1322 return new _HttpServer._(socket, true); |
| 1323 }); |
| 1324 } |
| 1325 |
| 1326 static Future<HttpServer> bindSecure(String host, |
| 1327 int port, |
| 1328 int backlog, |
| 1329 String certificate_name, |
| 1330 bool requestClientCertificate) { |
| 1331 return SecureServerSocket.bind( |
| 1332 host, |
| 1333 port, |
| 1334 backlog, |
| 1335 certificate_name, |
| 1336 requestClientCertificate: requestClientCertificate) |
| 1337 .then((socket) { |
| 1338 return new _HttpServer._(socket, true); |
| 1339 }); |
| 1340 } |
| 1341 |
| 1342 _HttpServer._(this._serverSocket, this._closeServer); |
| 1343 |
| 1344 _HttpServer.listenOn(ServerSocket this._serverSocket) |
| 1345 : _closeServer = false; |
| 1346 |
| 1347 StreamSubscription<HttpRequest> listen(void onData(HttpRequest event), |
| 1348 {void onError(AsyncError error), |
| 1349 void onDone(), |
| 1350 bool unsubscribeOnError}) { |
| 1351 _serverSocket.listen( |
| 1352 (Socket socket) { |
| 1353 // Accept the client connection. |
| 1354 _HttpConnection connection = new _HttpConnection(socket, this); |
| 1355 _connections.add(connection); |
| 1356 }, |
| 1357 onError: _controller.signalError, |
| 1358 onDone: _controller.close); |
| 1359 return _controller.stream.listen(onData, |
| 1360 onError: onError, |
| 1361 onDone: onDone, |
| 1362 unsubscribeOnError: unsubscribeOnError); |
| 1363 } |
| 1364 |
| 1365 void close() { |
| 1366 closed = true; |
| 1367 if (_serverSocket != null && _closeServer) { |
| 1368 _serverSocket.close(); |
| 1369 } |
1065 if (_sessionManagerInstance != null) { | 1370 if (_sessionManagerInstance != null) { |
1066 _sessionManagerInstance.close(); | 1371 _sessionManagerInstance.close(); |
1067 _sessionManagerInstance = null; | 1372 _sessionManagerInstance = null; |
1068 } | 1373 } |
1069 if (_server != null && _closeServer) { | 1374 for (_HttpConnection connection in _connections.toList()) { |
1070 _server.close(); | 1375 connection.destroy(); |
1071 } | |
1072 _server = null; | |
1073 for (_HttpConnection connection in _connections) { | |
1074 connection._destroy(); | |
1075 } | 1376 } |
1076 _connections.clear(); | 1377 _connections.clear(); |
1077 } | 1378 } |
1078 | 1379 |
1079 int get port { | 1380 int get port { |
1080 if (_server == null) { | 1381 if (closed) throw new HttpException("HttpServer is not bound to a socket"); |
1081 throw new HttpException("The HttpServer is not listening on a port."); | 1382 return _serverSocket.port; |
1082 } | |
1083 return _server.port; | |
1084 } | |
1085 | |
1086 void set onError(void callback(e)) { | |
1087 _onError = callback; | |
1088 } | 1383 } |
1089 | 1384 |
1090 set sessionTimeout(int timeout) { | 1385 set sessionTimeout(int timeout) { |
1091 _sessionManager.sessionTimeout = timeout; | 1386 _sessionManager.sessionTimeout = timeout; |
1092 } | 1387 } |
1093 | 1388 |
1094 void _handleRequest(HttpRequest request, HttpResponse response) { | 1389 void _handleRequest(HttpRequest request) { |
1095 for (int i = 0; i < _handlers.length; i++) { | 1390 _controller.add(request); |
1096 if (_handlers[i]._matcher(request)) { | 1391 } |
1097 Function handler = _handlers[i]._handler; | 1392 |
1098 try { | 1393 void _handleError(AsyncError error) { |
1099 handler(request, response); | 1394 if (!closed) _controller.signalError(error); |
1100 } catch (e) { | 1395 } |
1101 if (_onError != null) { | 1396 |
1102 _onError(e); | 1397 void _connectionClosed(_HttpConnection connection) { |
1103 } else { | 1398 _connections.remove(connection); |
1104 throw e; | |
1105 } | |
1106 } | |
1107 return; | |
1108 } | |
1109 } | |
1110 | |
1111 if (_defaultHandler != null) { | |
1112 _defaultHandler(request, response); | |
1113 } else { | |
1114 response.statusCode = HttpStatus.NOT_FOUND; | |
1115 response.contentLength = 0; | |
1116 response.outputStream.close(); | |
1117 } | |
1118 } | 1399 } |
1119 | 1400 |
1120 _HttpSessionManager get _sessionManager { | 1401 _HttpSessionManager get _sessionManager { |
1121 // Lazy init. | 1402 // Lazy init. |
1122 if (_sessionManagerInstance == null) { | 1403 if (_sessionManagerInstance == null) { |
1123 _sessionManagerInstance = new _HttpSessionManager(); | 1404 _sessionManagerInstance = new _HttpSessionManager(); |
1124 } | 1405 } |
1125 return _sessionManagerInstance; | 1406 return _sessionManagerInstance; |
1126 } | 1407 } |
1127 | 1408 |
1128 HttpConnectionsInfo connectionsInfo() { | 1409 HttpConnectionsInfo connectionsInfo() { |
1129 HttpConnectionsInfo result = new HttpConnectionsInfo(); | 1410 HttpConnectionsInfo result = new HttpConnectionsInfo(); |
1130 result.total = _connections.length; | 1411 result.total = _connections.length; |
1131 _connections.forEach((_HttpConnection conn) { | 1412 _connections.forEach((_HttpConnection conn) { |
1132 if (conn._isActive) { | 1413 if (conn._isActive) { |
1133 result.active++; | 1414 result.active++; |
1134 } else if (conn._isIdle) { | 1415 } else if (conn._isIdle) { |
1135 result.idle++; | 1416 result.idle++; |
1136 } else { | 1417 } else { |
1137 assert(result._isClosing); | 1418 assert(conn._isClosing); |
1138 result.closing++; | 1419 result.closing++; |
1139 } | 1420 } |
1140 }); | 1421 }); |
1141 return result; | 1422 return result; |
1142 } | 1423 } |
1143 | 1424 |
1144 ServerSocket _server; // The server listen socket. | |
1145 bool _closeServer = false; | |
1146 bool _secure; | |
1147 Set<_HttpConnection> _connections; // Set of currently connected clients. | |
1148 List<_RequestHandlerRegistration> _handlers; | |
1149 Object _defaultHandler; | |
1150 Function _onError; | |
1151 _CloseQueue _closeQueue; | |
1152 _HttpSessionManager _sessionManagerInstance; | 1425 _HttpSessionManager _sessionManagerInstance; |
1153 } | 1426 |
1154 | 1427 // Indicated if the http server has been closed. |
1155 | 1428 bool closed = false; |
1156 class _HttpClientRequest | 1429 |
1157 extends _HttpRequestResponseBase implements HttpClientRequest { | 1430 // The server listen socket. |
1158 _HttpClientRequest(String this._method, | 1431 final ServerSocket _serverSocket; |
1159 Uri this._uri, | 1432 final bool _closeServer; |
1160 _HttpClientConnection connection) | 1433 |
1161 : super(connection) { | 1434 // Set of currently connected clients. |
1162 _headers = new _HttpHeaders(); | 1435 final Set<_HttpConnection> _connections = new Set<_HttpConnection>(); |
1163 _connection = connection; | 1436 final StreamController<HttpRequest> _controller |
1164 // Default GET and HEAD requests to have no content. | 1437 = new StreamController<HttpRequest>(); |
1165 if (_method == "GET" || _method == "HEAD") { | 1438 |
1166 contentLength = 0; | 1439 // TODO(ajohnsen): Use close queue? |
1167 } | 1440 } |
1168 } | 1441 |
1169 | |
1170 void set contentLength(int contentLength) { | |
1171 if (_state >= _HttpRequestResponseBase.HEADER_SENT) { | |
1172 throw new HttpException("Header already sent"); | |
1173 } | |
1174 _headers.contentLength = contentLength; | |
1175 } | |
1176 | |
1177 List<Cookie> get cookies { | |
1178 if (_cookies == null) _cookies = new List<Cookie>(); | |
1179 return _cookies; | |
1180 } | |
1181 | |
1182 OutputStream get outputStream { | |
1183 if (_done) throw new HttpException("Request closed"); | |
1184 if (_outputStream == null) { | |
1185 _outputStream = new _HttpOutputStream(this); | |
1186 } | |
1187 return _outputStream; | |
1188 } | |
1189 | |
1190 // Delegate functions for the HttpOutputStream implementation. | |
1191 bool _streamWrite(List<int> buffer, bool copyBuffer) { | |
1192 if (_done) throw new HttpException("Request closed"); | |
1193 _emptyBody = _emptyBody && buffer.length == 0; | |
1194 return _write(buffer, copyBuffer); | |
1195 } | |
1196 | |
1197 bool _streamWriteFrom(List<int> buffer, int offset, int len) { | |
1198 if (_done) throw new HttpException("Request closed"); | |
1199 _emptyBody = _emptyBody && buffer.length == 0; | |
1200 return _writeList(buffer, offset, len); | |
1201 } | |
1202 | |
1203 void _streamFlush() { | |
1204 _httpConnection._flush(); | |
1205 } | |
1206 | |
1207 void _streamClose() { | |
1208 _ensureHeadersSent(); | |
1209 _state = _HttpRequestResponseBase.DONE; | |
1210 // Stop tracking no pending write events. | |
1211 _httpConnection._onNoPendingWrites = null; | |
1212 // Ensure that any trailing data is written. | |
1213 _writeDone(); | |
1214 _connection._requestClosed(); | |
1215 if (_streamClosedHandler != null) { | |
1216 Timer.run(_streamClosedHandler); | |
1217 } | |
1218 } | |
1219 | |
1220 void _streamSetNoPendingWriteHandler(callback()) { | |
1221 if (_state != _HttpRequestResponseBase.DONE) { | |
1222 _httpConnection._onNoPendingWrites = callback; | |
1223 } | |
1224 } | |
1225 | |
1226 void _streamSetClosedHandler(callback()) { | |
1227 _streamClosedHandler = callback; | |
1228 } | |
1229 | |
1230 void _streamSetErrorHandler(callback(e)) { | |
1231 _streamErrorHandler = callback; | |
1232 } | |
1233 | |
1234 void _writeHeader() { | |
1235 List<int> data; | |
1236 | |
1237 // Write request line. | |
1238 data = _method.toString().charCodes; | |
1239 _httpConnection._write(data); | |
1240 _writeSP(); | |
1241 // Send the path for direct connections and the whole URL for | |
1242 // proxy connections. | |
1243 if (!_connection._usingProxy) { | |
1244 String path = _uri.path; | |
1245 if (path.length == 0) path = "/"; | |
1246 if (_uri.query != "") { | |
1247 if (_uri.fragment != "") { | |
1248 path = "${path}?${_uri.query}#${_uri.fragment}"; | |
1249 } else { | |
1250 path = "${path}?${_uri.query}"; | |
1251 } | |
1252 } | |
1253 data = path.charCodes; | |
1254 } else { | |
1255 data = _uri.toString().charCodes; | |
1256 } | |
1257 _httpConnection._write(data); | |
1258 _writeSP(); | |
1259 _httpConnection._write(_Const.HTTP11); | |
1260 _writeCRLF(); | |
1261 | |
1262 // Add the cookies to the headers. | |
1263 if (_cookies != null) { | |
1264 StringBuffer sb = new StringBuffer(); | |
1265 for (int i = 0; i < _cookies.length; i++) { | |
1266 if (i > 0) sb.add("; "); | |
1267 sb.add(_cookies[i].name); | |
1268 sb.add("="); | |
1269 sb.add(_cookies[i].value); | |
1270 } | |
1271 _headers.add("cookie", sb.toString()); | |
1272 } | |
1273 | |
1274 // Write headers. | |
1275 _headers._finalize("1.1"); | |
1276 _writeHeaders(); | |
1277 _state = _HttpRequestResponseBase.HEADER_SENT; | |
1278 } | |
1279 | |
1280 String _method; | |
1281 Uri _uri; | |
1282 _HttpClientConnection _connection; | |
1283 _HttpOutputStream _outputStream; | |
1284 Function _streamClosedHandler; | |
1285 Function _streamErrorHandler; | |
1286 bool _emptyBody = true; | |
1287 } | |
1288 | |
1289 class _HttpClientResponse | |
1290 extends _HttpRequestResponseBase | |
1291 implements HttpClientResponse { | |
1292 _HttpClientResponse(_HttpClientConnection connection) | |
1293 : super(connection) { | |
1294 _connection = connection; | |
1295 } | |
1296 | |
1297 int get statusCode => _statusCode; | |
1298 String get reasonPhrase => _reasonPhrase; | |
1299 | |
1300 bool get isRedirect { | |
1301 var method = _connection._request._method; | |
1302 if (method == "GET" || method == "HEAD") { | |
1303 return statusCode == HttpStatus.MOVED_PERMANENTLY || | |
1304 statusCode == HttpStatus.FOUND || | |
1305 statusCode == HttpStatus.SEE_OTHER || | |
1306 statusCode == HttpStatus.TEMPORARY_REDIRECT; | |
1307 } else if (method == "POST") { | |
1308 return statusCode == HttpStatus.SEE_OTHER; | |
1309 } | |
1310 return false; | |
1311 } | |
1312 | |
1313 List<Cookie> get cookies { | |
1314 if (_cookies != null) return _cookies; | |
1315 _cookies = new List<Cookie>(); | |
1316 List<String> values = _headers["set-cookie"]; | |
1317 if (values != null) { | |
1318 values.forEach((value) { | |
1319 _cookies.add(new Cookie.fromSetCookieValue(value)); | |
1320 }); | |
1321 } | |
1322 return _cookies; | |
1323 } | |
1324 | |
1325 InputStream get inputStream { | |
1326 if (_inputStream == null) { | |
1327 _inputStream = new _HttpInputStream(this); | |
1328 } | |
1329 return _inputStream; | |
1330 } | |
1331 | |
1332 void _onResponseReceived(int statusCode, | |
1333 String reasonPhrase, | |
1334 String version, | |
1335 _HttpHeaders headers, | |
1336 bool hasBody) { | |
1337 _statusCode = statusCode; | |
1338 _reasonPhrase = reasonPhrase; | |
1339 _headers = headers; | |
1340 | |
1341 // Prepare for receiving data. | |
1342 _buffer = new _BufferList(); | |
1343 if (isRedirect && _connection.followRedirects) { | |
1344 if (_connection._redirects == null || | |
1345 _connection._redirects.length < _connection.maxRedirects) { | |
1346 // Check the location header. | |
1347 List<String> location = headers[HttpHeaders.LOCATION]; | |
1348 if (location == null || location.length > 1) { | |
1349 throw new RedirectException("Invalid redirect", | |
1350 _connection._redirects); | |
1351 } | |
1352 // Check for redirect loop | |
1353 if (_connection._redirects != null) { | |
1354 Uri redirectUrl = Uri.parse(location[0]); | |
1355 for (int i = 0; i < _connection._redirects.length; i++) { | |
1356 if (_connection._redirects[i].location.toString() == | |
1357 redirectUrl.toString()) { | |
1358 throw new RedirectLoopException(_connection._redirects); | |
1359 } | |
1360 } | |
1361 } | |
1362 if (!persistentConnection) { | |
1363 throw new RedirectException( | |
1364 "Non-persistent connections are currently not supported for " | |
1365 "redirects", _connection._redirects); | |
1366 } | |
1367 // Drain body and redirect. | |
1368 inputStream.onData = inputStream.read; | |
1369 if (_statusCode == HttpStatus.SEE_OTHER && | |
1370 _connection._method == "POST") { | |
1371 _connection.redirect("GET"); | |
1372 } else { | |
1373 _connection.redirect(); | |
1374 } | |
1375 } else { | |
1376 throw new RedirectLimitExceededException(_connection._redirects); | |
1377 } | |
1378 } else if (statusCode == HttpStatus.UNAUTHORIZED) { | |
1379 _handleUnauthorized(); | |
1380 } else if (_connection._onResponse != null) { | |
1381 _connection._onResponse(this); | |
1382 } | |
1383 } | |
1384 | |
1385 void _handleUnauthorized() { | |
1386 | |
1387 void retryRequest(_Credentials cr) { | |
1388 if (cr != null) { | |
1389 // Drain body and retry. | |
1390 // TODO(sgjesse): Support digest. | |
1391 if (cr.scheme == _AuthenticationScheme.BASIC) { | |
1392 inputStream.onData = inputStream.read; | |
1393 _connection._retry(); | |
1394 return; | |
1395 } | |
1396 } | |
1397 | |
1398 // Fall through to here to perform normal response handling if | |
1399 // there is no sensible authorization handling. | |
1400 if (_connection._onResponse != null) { | |
1401 _connection._onResponse(this); | |
1402 } | |
1403 } | |
1404 | |
1405 // Only try to authenticate if there is a challenge in the response. | |
1406 List<String> challenge = _headers[HttpHeaders.WWW_AUTHENTICATE]; | |
1407 if (challenge != null && challenge.length == 1) { | |
1408 _HeaderValue header = | |
1409 new _HeaderValue.fromString(challenge[0], parameterSeparator: ","); | |
1410 _AuthenticationScheme scheme = | |
1411 new _AuthenticationScheme.fromString(header.value); | |
1412 String realm = header.parameters["realm"]; | |
1413 | |
1414 // See if any credentials are available. | |
1415 _Credentials cr = | |
1416 _connection._client._findCredentials( | |
1417 _connection._request._uri, scheme); | |
1418 | |
1419 // Ask for more credentials if none found or the one found has | |
1420 // already been used. If it has already been used it must now be | |
1421 // invalid and is removed. | |
1422 if (cr == null || cr.used) { | |
1423 if (cr != null) { | |
1424 _connection._client._removeCredentials(cr); | |
1425 } | |
1426 cr = null; | |
1427 if (_connection._client._authenticate != null) { | |
1428 Future authComplete = | |
1429 _connection._client._authenticate( | |
1430 _connection._request._uri, scheme.toString(), realm); | |
1431 authComplete.then((credsAvailable) { | |
1432 if (credsAvailable) { | |
1433 cr = _connection._client._findCredentials( | |
1434 _connection._request._uri, scheme); | |
1435 retryRequest(cr); | |
1436 } else { | |
1437 if (_connection._onResponse != null) { | |
1438 _connection._onResponse(this); | |
1439 } | |
1440 } | |
1441 }); | |
1442 return; | |
1443 } | |
1444 } else { | |
1445 // If credentials found prepare for retrying the request. | |
1446 retryRequest(cr); | |
1447 return; | |
1448 } | |
1449 } | |
1450 | |
1451 // Fall through to here to perform normal response handling if | |
1452 // there is no sensible authorization handling. | |
1453 if (_connection._onResponse != null) { | |
1454 _connection._onResponse(this); | |
1455 } | |
1456 } | |
1457 | |
1458 void _onDataReceived(List<int> data) { | |
1459 _buffer.add(data); | |
1460 if (_inputStream != null) _inputStream._dataReceived(); | |
1461 } | |
1462 | |
1463 void _onDataEnd() { | |
1464 if (_inputStream != null) { | |
1465 _inputStream._closeReceived(); | |
1466 } else { | |
1467 inputStream._streamMarkedClosed = true; | |
1468 } | |
1469 } | |
1470 | |
1471 // Delegate functions for the HttpInputStream implementation. | |
1472 int _streamAvailable() { | |
1473 return _buffer.length; | |
1474 } | |
1475 | |
1476 List<int> _streamRead(int bytesToRead) { | |
1477 return _buffer.readBytes(bytesToRead); | |
1478 } | |
1479 | |
1480 int _streamReadInto(List<int> buffer, int offset, int len) { | |
1481 List<int> data = _buffer.readBytes(len); | |
1482 buffer.setRange(offset, data.length, data); | |
1483 return data.length; | |
1484 } | |
1485 | |
1486 void _streamSetErrorHandler(callback(e)) { | |
1487 _streamErrorHandler = callback; | |
1488 } | |
1489 | |
1490 int _statusCode; | |
1491 String _reasonPhrase; | |
1492 | |
1493 _HttpClientConnection _connection; | |
1494 _HttpInputStream _inputStream; | |
1495 _BufferList _buffer; | |
1496 | |
1497 Function _streamErrorHandler; | |
1498 } | |
1499 | |
1500 | |
1501 class _HttpClientConnection | |
1502 extends _HttpConnectionBase implements HttpClientConnection { | |
1503 | |
1504 _HttpClientConnection(_HttpClient this._client) { | |
1505 _httpParser = new _HttpParser.responseParser(); | |
1506 } | |
1507 | |
1508 bool _write(List<int> data, [bool copyBuffer = false]) { | |
1509 return _socket.outputStream.write(data, copyBuffer); | |
1510 } | |
1511 | |
1512 bool _writeFrom(List<int> data, [int offset, int len]) { | |
1513 return _socket.outputStream.writeFrom(data, offset, len); | |
1514 } | |
1515 | |
1516 bool _flush() { | |
1517 _socket.outputStream.flush(); | |
1518 } | |
1519 | |
1520 bool _close() { | |
1521 _socket.outputStream.close(); | |
1522 } | |
1523 | |
1524 bool _destroy() { | |
1525 _socket.close(); | |
1526 } | |
1527 | |
1528 DetachedSocket _detachSocket() { | |
1529 _socket.onData = null; | |
1530 _socket.onClosed = null; | |
1531 _socket.onError = null; | |
1532 _socket.outputStream.onNoPendingWrites = null; | |
1533 Socket socket = _socket; | |
1534 _socket = null; | |
1535 if (onDetach != null) onDetach(); | |
1536 return new _DetachedSocket(socket, _httpParser.readUnparsedData()); | |
1537 } | |
1538 | |
1539 void _connectionEstablished(_SocketConnection socketConn) { | |
1540 super._connectionEstablished(socketConn._socket); | |
1541 _socketConn = socketConn; | |
1542 // Register HTTP parser callbacks. | |
1543 _httpParser.responseStart = _onResponseReceived; | |
1544 _httpParser.dataReceived = _onDataReceived; | |
1545 _httpParser.dataEnd = _onDataEnd; | |
1546 _httpParser.error = _onError; | |
1547 _httpParser.closed = _onClosed; | |
1548 _httpParser.requestStart = (method, uri, version) { assert(false); }; | |
1549 _state = _HttpConnectionBase.ACTIVE; | |
1550 } | |
1551 | |
1552 void _checkSocketDone() { | |
1553 if (_isAllDone) { | |
1554 // If we are done writing the response, and either the server | |
1555 // has closed or the connection is not persistent, we must | |
1556 // close. | |
1557 if (_isReadClosed || !_response.persistentConnection) { | |
1558 this.onClosed = () { | |
1559 _client._closedSocketConnection(_socketConn); | |
1560 }; | |
1561 _client._closeQueue.add(this); | |
1562 } else if (_socket != null) { | |
1563 _client._returnSocketConnection(_socketConn); | |
1564 _socket = null; | |
1565 _socketConn = null; | |
1566 assert(_pendingRedirect == null || _pendingRetry == null); | |
1567 if (_pendingRedirect != null) { | |
1568 _doRedirect(_pendingRedirect); | |
1569 _pendingRedirect = null; | |
1570 } else if (_pendingRetry != null) { | |
1571 _doRetry(_pendingRetry); | |
1572 _pendingRetry = null; | |
1573 } | |
1574 } | |
1575 } | |
1576 } | |
1577 | |
1578 void _requestClosed() { | |
1579 _state |= _HttpConnectionBase.REQUEST_DONE; | |
1580 _checkSocketDone(); | |
1581 } | |
1582 | |
1583 HttpClientRequest open(String method, Uri uri) { | |
1584 _method = method; | |
1585 // Tell the HTTP parser the method it is expecting a response to. | |
1586 _httpParser.responseToMethod = method; | |
1587 // If the connection already have a request this is a retry of a | |
1588 // request. In this case the request object is reused to ensure | |
1589 // that the same headers are send. | |
1590 if (_request != null) { | |
1591 _request._method = method; | |
1592 _request._uri = uri; | |
1593 _request._headers._mutable = true; | |
1594 _request._state = _HttpRequestResponseBase.START; | |
1595 } else { | |
1596 _request = new _HttpClientRequest(method, uri, this); | |
1597 } | |
1598 _response = new _HttpClientResponse(this); | |
1599 return _request; | |
1600 } | |
1601 | |
1602 DetachedSocket detachSocket() { | |
1603 return _detachSocket(); | |
1604 } | |
1605 | |
1606 void _onClosed() { | |
1607 _state |= _HttpConnectionBase.READ_CLOSED; | |
1608 _checkSocketDone(); | |
1609 } | |
1610 | |
1611 void _onError(e) { | |
1612 // Cancel any pending data in the HTTP parser. | |
1613 _httpParser.cancel(); | |
1614 if (_socketConn != null) { | |
1615 _client._closeSocketConnection(_socketConn); | |
1616 } | |
1617 | |
1618 // If it looks as if we got a bad connection from the connection | |
1619 // pool and the request can be retried do a retry. | |
1620 if (_socketConn != null && _socketConn._fromPool && _request._emptyBody) { | |
1621 String method = _request._method; | |
1622 Uri uri = _request._uri; | |
1623 _socketConn = null; | |
1624 | |
1625 // Retry the URL using the same connection instance. | |
1626 _httpParser.restart(); | |
1627 _client._openUrl(method, uri, this); | |
1628 } else { | |
1629 // Report the error. | |
1630 if (_response != null && _response._streamErrorHandler != null) { | |
1631 _response._streamErrorHandler(e); | |
1632 } else if (_onErrorCallback != null) { | |
1633 _onErrorCallback(e); | |
1634 } else { | |
1635 throw e; | |
1636 } | |
1637 } | |
1638 } | |
1639 | |
1640 void _onResponseReceived(int statusCode, | |
1641 String reasonPhrase, | |
1642 String version, | |
1643 _HttpHeaders headers, | |
1644 bool hasBody) { | |
1645 _response._onResponseReceived( | |
1646 statusCode, reasonPhrase, version, headers, hasBody); | |
1647 } | |
1648 | |
1649 void _onDataReceived(List<int> data) { | |
1650 _response._onDataReceived(data); | |
1651 } | |
1652 | |
1653 void _onDataEnd(bool close) { | |
1654 _state |= _HttpConnectionBase.RESPONSE_DONE; | |
1655 _response._onDataEnd(); | |
1656 _checkSocketDone(); | |
1657 } | |
1658 | |
1659 void _onClientShutdown() { | |
1660 if (!_isResponseDone) { | |
1661 _onError(new HttpException("Client shutdown")); | |
1662 } | |
1663 } | |
1664 | |
1665 void set onRequest(void handler(HttpClientRequest request)) { | |
1666 _onRequest = handler; | |
1667 } | |
1668 | |
1669 void set onResponse(void handler(HttpClientResponse response)) { | |
1670 _onResponse = handler; | |
1671 } | |
1672 | |
1673 void set onError(void callback(e)) { | |
1674 _onErrorCallback = callback; | |
1675 } | |
1676 | |
1677 void _doRetry(_RedirectInfo retry) { | |
1678 assert(_socketConn == null); | |
1679 | |
1680 // Retry the URL using the same connection instance. | |
1681 _state = _HttpConnectionBase.IDLE; | |
1682 _client._openUrl(retry.method, retry.location, this); | |
1683 } | |
1684 | |
1685 void _retry() { | |
1686 var retry = new _RedirectInfo(_response.statusCode, _method, _request._uri); | |
1687 // The actual retry is postponed until both response and request | |
1688 // are done. | |
1689 if (_isAllDone) { | |
1690 _doRetry(retry); | |
1691 } else { | |
1692 // Prepare for retry. | |
1693 assert(_pendingRedirect == null); | |
1694 _pendingRetry = retry; | |
1695 } | |
1696 } | |
1697 | |
1698 void _doRedirect(_RedirectInfo redirect) { | |
1699 assert(_socketConn == null); | |
1700 | |
1701 if (_redirects == null) { | |
1702 _redirects = new List<_RedirectInfo>(); | |
1703 } | |
1704 _redirects.add(redirect); | |
1705 _doRetry(redirect); | |
1706 } | |
1707 | |
1708 void redirect([String method, Uri url]) { | |
1709 if (method == null) method = _method; | |
1710 if (url == null) { | |
1711 url = Uri.parse(_response.headers.value(HttpHeaders.LOCATION)); | |
1712 } | |
1713 // Always set the content length to 0 for redirects. | |
1714 var mutable = _request._headers._mutable; | |
1715 _request._headers._mutable = true; | |
1716 _request._headers.contentLength = 0; | |
1717 _request._headers._mutable = mutable; | |
1718 _request._bodyBytesWritten = 0; | |
1719 var redirect = new _RedirectInfo(_response.statusCode, method, url); | |
1720 // The actual redirect is postponed until both response and | |
1721 // request are done. | |
1722 assert(_pendingRetry == null); | |
1723 _pendingRedirect = redirect; | |
1724 } | |
1725 | |
1726 List<RedirectInfo> get redirects => _redirects; | |
1727 | |
1728 Function _onRequest; | |
1729 Function _onResponse; | |
1730 Function _onErrorCallback; | |
1731 | |
1732 _HttpClient _client; | |
1733 _SocketConnection _socketConn; | |
1734 HttpClientRequest _request; | |
1735 HttpClientResponse _response; | |
1736 String _method; | |
1737 bool _usingProxy; | |
1738 | |
1739 // Redirect handling | |
1740 bool followRedirects = true; | |
1741 int maxRedirects = 5; | |
1742 List<_RedirectInfo> _redirects; | |
1743 _RedirectInfo _pendingRedirect; | |
1744 _RedirectInfo _pendingRetry; | |
1745 | |
1746 // Callbacks. | |
1747 var requestReceived; | |
1748 } | |
1749 | |
1750 | |
1751 // Class for holding keep-alive sockets in the cache for the HTTP | |
1752 // client together with the connection information. | |
1753 class _SocketConnection { | |
1754 _SocketConnection(String this._host, | |
1755 int this._port, | |
1756 Socket this._socket); | |
1757 | |
1758 void _markReturned() { | |
1759 // Any activity on the socket while waiting in the pool will | |
1760 // invalidate the connection os that it is not reused. | |
1761 _socket.onData = _invalidate; | |
1762 _socket.onClosed = _invalidate; | |
1763 _socket.onError = (_) => _invalidate(); | |
1764 _returnTime = new DateTime.now(); | |
1765 _httpClientConnection = null; | |
1766 } | |
1767 | |
1768 void _markRetrieved() { | |
1769 _socket.onData = null; | |
1770 _socket.onClosed = null; | |
1771 _socket.onError = null; | |
1772 _httpClientConnection = null; | |
1773 } | |
1774 | |
1775 void _close() { | |
1776 _socket.onData = null; | |
1777 _socket.onClosed = null; | |
1778 _socket.onError = null; | |
1779 _httpClientConnection = null; | |
1780 _socket.close(); | |
1781 } | |
1782 | |
1783 Duration _idleTime(DateTime now) => now.difference(_returnTime); | |
1784 | |
1785 bool get _fromPool => _returnTime != null; | |
1786 | |
1787 void _invalidate() { | |
1788 _valid = false; | |
1789 _close(); | |
1790 } | |
1791 | |
1792 int get hashCode => _socket.hashCode; | |
1793 | |
1794 String _host; | |
1795 int _port; | |
1796 Socket _socket; | |
1797 DateTime _returnTime; | |
1798 bool _valid = true; | |
1799 HttpClientConnection _httpClientConnection; | |
1800 } | |
1801 | 1442 |
1802 class _ProxyConfiguration { | 1443 class _ProxyConfiguration { |
1803 static const String PROXY_PREFIX = "PROXY "; | 1444 static const String PROXY_PREFIX = "PROXY "; |
1804 static const String DIRECT_PREFIX = "DIRECT"; | 1445 static const String DIRECT_PREFIX = "DIRECT"; |
1805 | 1446 |
1806 _ProxyConfiguration(String configuration) : proxies = new List<_Proxy>() { | 1447 _ProxyConfiguration(String configuration) : proxies = new List<_Proxy>() { |
1807 if (configuration == null) { | 1448 if (configuration == null) { |
1808 throw new HttpException("Invalid proxy configuration $configuration"); | 1449 throw new HttpException("Invalid proxy configuration $configuration"); |
1809 } | 1450 } |
1810 List<String> list = configuration.split(";"); | 1451 List<String> list = configuration.split(";"); |
(...skipping 26 matching lines...) Expand all Loading... |
1837 } | 1478 } |
1838 }); | 1479 }); |
1839 } | 1480 } |
1840 | 1481 |
1841 const _ProxyConfiguration.direct() | 1482 const _ProxyConfiguration.direct() |
1842 : proxies = const [const _Proxy.direct()]; | 1483 : proxies = const [const _Proxy.direct()]; |
1843 | 1484 |
1844 final List<_Proxy> proxies; | 1485 final List<_Proxy> proxies; |
1845 } | 1486 } |
1846 | 1487 |
| 1488 |
1847 class _Proxy { | 1489 class _Proxy { |
1848 const _Proxy(this.host, this.port) : isDirect = false; | 1490 const _Proxy(this.host, this.port) : isDirect = false; |
1849 const _Proxy.direct() : host = null, port = null, isDirect = true; | 1491 const _Proxy.direct() : host = null, port = null, isDirect = true; |
1850 | 1492 |
1851 final String host; | 1493 final String host; |
1852 final int port; | 1494 final int port; |
1853 final bool isDirect; | 1495 final bool isDirect; |
1854 } | 1496 } |
1855 | 1497 |
1856 class _HttpClient implements HttpClient { | |
1857 static const int DEFAULT_EVICTION_TIMEOUT = 60000; | |
1858 | |
1859 _HttpClient() : _openSockets = new Map(), | |
1860 _activeSockets = new Set(), | |
1861 _closeQueue = new _CloseQueue(), | |
1862 credentials = new List<_Credentials>(), | |
1863 _shutdown = false; | |
1864 | |
1865 HttpClientConnection open( | |
1866 String method, String host, int port, String path) { | |
1867 // TODO(sgjesse): The path set here can contain both query and | |
1868 // fragment. They should be cracked and set correctly. | |
1869 return _open(method, new Uri.fromComponents( | |
1870 scheme: "http", domain: host, port: port, path: path)); | |
1871 } | |
1872 | |
1873 HttpClientConnection _open(String method, | |
1874 Uri uri, | |
1875 [_HttpClientConnection connection]) { | |
1876 if (_shutdown) throw new HttpException("HttpClient shutdown"); | |
1877 if (method == null || uri.domain.isEmpty) { | |
1878 throw new ArgumentError(null); | |
1879 } | |
1880 return _prepareHttpClientConnection(method, uri, connection); | |
1881 } | |
1882 | |
1883 HttpClientConnection openUrl(String method, Uri url) { | |
1884 return _openUrl(method, url); | |
1885 } | |
1886 | |
1887 HttpClientConnection _openUrl(String method, | |
1888 Uri url, | |
1889 [_HttpClientConnection connection]) { | |
1890 if (url.scheme != "http" && url.scheme != "https") { | |
1891 throw new HttpException("Unsupported URL scheme ${url.scheme}"); | |
1892 } | |
1893 return _open(method, url, connection); | |
1894 } | |
1895 | |
1896 HttpClientConnection get(String host, int port, String path) { | |
1897 return open("GET", host, port, path); | |
1898 } | |
1899 | |
1900 HttpClientConnection getUrl(Uri url) => _openUrl("GET", url); | |
1901 | |
1902 HttpClientConnection post(String host, int port, String path) { | |
1903 return open("POST", host, port, path); | |
1904 } | |
1905 | |
1906 HttpClientConnection postUrl(Uri url) => _openUrl("POST", url); | |
1907 | |
1908 set authenticate(Future<bool> f(Uri url, String scheme, String realm)) { | |
1909 _authenticate = f; | |
1910 } | |
1911 | |
1912 void addCredentials( | |
1913 Uri url, String realm, HttpClientCredentials cr) { | |
1914 credentials.add(new _Credentials(url, realm, cr)); | |
1915 } | |
1916 | |
1917 set sendClientCertificate(bool send) => _sendClientCertificate = send; | |
1918 | |
1919 set clientCertificate(String nickname) => _clientCertificate = nickname; | |
1920 | |
1921 set findProxy(String f(Uri uri)) => _findProxy = f; | |
1922 | |
1923 void shutdown({bool force: false}) { | |
1924 if (force) _closeQueue.shutdown(); | |
1925 new Map.from(_openSockets).forEach( | |
1926 (String key, Queue<_SocketConnection> connections) { | |
1927 while (!connections.isEmpty) { | |
1928 _SocketConnection socketConn = connections.removeFirst(); | |
1929 socketConn._socket.close(); | |
1930 } | |
1931 }); | |
1932 if (force) { | |
1933 _activeSockets.toList().forEach((_SocketConnection socketConn) { | |
1934 socketConn._httpClientConnection._onClientShutdown(); | |
1935 socketConn._close(); | |
1936 }); | |
1937 } | |
1938 if (_evictionTimer != null) _cancelEvictionTimer(); | |
1939 _shutdown = true; | |
1940 } | |
1941 | |
1942 void _cancelEvictionTimer() { | |
1943 _evictionTimer.cancel(); | |
1944 _evictionTimer = null; | |
1945 } | |
1946 | |
1947 String _connectionKey(String host, int port) { | |
1948 return "$host:$port"; | |
1949 } | |
1950 | |
1951 HttpClientConnection _prepareHttpClientConnection( | |
1952 String method, | |
1953 Uri url, | |
1954 [_HttpClientConnection connection]) { | |
1955 | |
1956 void _establishConnection(String host, | |
1957 int port, | |
1958 _ProxyConfiguration proxyConfiguration, | |
1959 int proxyIndex, | |
1960 bool reusedConnection, | |
1961 bool secure) { | |
1962 | |
1963 void _connectionOpened(_SocketConnection socketConn, | |
1964 _HttpClientConnection connection, | |
1965 bool usingProxy) { | |
1966 socketConn._httpClientConnection = connection; | |
1967 connection._usingProxy = usingProxy; | |
1968 connection._connectionEstablished(socketConn); | |
1969 HttpClientRequest request = connection.open(method, url); | |
1970 request.headers.host = host; | |
1971 request.headers.port = port; | |
1972 if (url.userInfo != null && !url.userInfo.isEmpty) { | |
1973 // If the URL contains user information use that for basic | |
1974 // authorization | |
1975 _UTF8Encoder encoder = new _UTF8Encoder(); | |
1976 String auth = | |
1977 CryptoUtils.bytesToBase64(encoder.encodeString(url.userInfo)); | |
1978 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | |
1979 } else { | |
1980 // Look for credentials. | |
1981 _Credentials cr = _findCredentials(url); | |
1982 if (cr != null) { | |
1983 cr.authorize(request); | |
1984 } | |
1985 } | |
1986 // A reused connection is indicating either redirect or retry | |
1987 // where the onRequest callback should not be issued again. | |
1988 if (connection._onRequest != null && !reusedConnection) { | |
1989 connection._onRequest(request); | |
1990 } else { | |
1991 request.outputStream.close(); | |
1992 } | |
1993 } | |
1994 | |
1995 assert(proxyIndex < proxyConfiguration.proxies.length); | |
1996 | |
1997 // Determine the actual host to connect to. | |
1998 String connectHost; | |
1999 int connectPort; | |
2000 _Proxy proxy = proxyConfiguration.proxies[proxyIndex]; | |
2001 if (proxy.isDirect) { | |
2002 connectHost = host; | |
2003 connectPort = port; | |
2004 } else { | |
2005 connectHost = proxy.host; | |
2006 connectPort = proxy.port; | |
2007 } | |
2008 | |
2009 // If there are active connections for this key get the first one | |
2010 // otherwise create a new one. | |
2011 String key = _connectionKey(connectHost, connectPort); | |
2012 Queue socketConnections = _openSockets[key]; | |
2013 // Remove active connections that are not valid any more or of | |
2014 // the wrong type (HTTP or HTTPS). | |
2015 if (socketConnections != null) { | |
2016 while (!socketConnections.isEmpty) { | |
2017 if (socketConnections.first._valid) { | |
2018 // If socket has the same properties, exit loop with found socket. | |
2019 var socket = socketConnections.first._socket; | |
2020 if (!secure && socket is! SecureSocket) break; | |
2021 if (secure && socket is SecureSocket && | |
2022 _sendClientCertificate == socket.sendClientCertificate && | |
2023 _clientCertificate == socket.certificateName) break; | |
2024 } | |
2025 socketConnections.removeFirst()._close(); | |
2026 } | |
2027 } | |
2028 if (socketConnections == null || socketConnections.isEmpty) { | |
2029 Socket socket = secure && proxy.isDirect ? | |
2030 new SecureSocket(connectHost, | |
2031 connectPort, | |
2032 sendClientCertificate: _sendClientCertificate, | |
2033 certificateName: _clientCertificate) : | |
2034 new Socket(connectHost, connectPort); | |
2035 // Until the connection is established handle connection errors | |
2036 // here as the HttpClientConnection object is not yet associated | |
2037 // with the socket. | |
2038 socket.onError = (e) { | |
2039 proxyIndex++; | |
2040 if (proxyIndex < proxyConfiguration.proxies.length) { | |
2041 // Try the next proxy in the list. | |
2042 _establishConnection( | |
2043 host, port, proxyConfiguration, proxyIndex, false, secure); | |
2044 } else { | |
2045 // Report the error through the HttpClientConnection object to | |
2046 // the client. | |
2047 connection._onError(e); | |
2048 } | |
2049 }; | |
2050 socket.onConnect = () { | |
2051 // When the connection is established, clear the error | |
2052 // callback as it will now be handled by the | |
2053 // HttpClientConnection object which will be associated with | |
2054 // the connected socket. | |
2055 socket.onError = null; | |
2056 _SocketConnection socketConn = | |
2057 new _SocketConnection(connectHost, connectPort, socket); | |
2058 _activeSockets.add(socketConn); | |
2059 _connectionOpened(socketConn, connection, !proxy.isDirect); | |
2060 }; | |
2061 } else { | |
2062 _SocketConnection socketConn = socketConnections.removeFirst(); | |
2063 socketConn._markRetrieved(); | |
2064 _activeSockets.add(socketConn); | |
2065 Timer.run(() => | |
2066 _connectionOpened(socketConn, connection, !proxy.isDirect)); | |
2067 | |
2068 // Get rid of eviction timer if there are no more active connections. | |
2069 if (socketConnections.isEmpty) _openSockets.remove(key); | |
2070 if (_openSockets.isEmpty) _cancelEvictionTimer(); | |
2071 } | |
2072 } | |
2073 | |
2074 // Find out if we want a secure socket. | |
2075 bool is_secure = (url.scheme == "https"); | |
2076 | |
2077 // Find the TCP host and port. | |
2078 String host = url.domain; | |
2079 int port = url.port; | |
2080 if (port == 0) { | |
2081 port = is_secure ? | |
2082 HttpClient.DEFAULT_HTTPS_PORT : | |
2083 HttpClient.DEFAULT_HTTP_PORT; | |
2084 } | |
2085 // Create a new connection object if we are not re-using an existing one. | |
2086 var reusedConnection = false; | |
2087 if (connection == null) { | |
2088 connection = new _HttpClientConnection(this); | |
2089 } else { | |
2090 reusedConnection = true; | |
2091 } | |
2092 connection.onDetach = () => _activeSockets.remove(connection._socketConn); | |
2093 | |
2094 // Check to see if a proxy server should be used for this connection. | |
2095 _ProxyConfiguration proxyConfiguration = const _ProxyConfiguration.direct(); | |
2096 if (_findProxy != null) { | |
2097 // TODO(sgjesse): Keep a map of these as normally only a few | |
2098 // configuration strings will be used. | |
2099 proxyConfiguration = new _ProxyConfiguration(_findProxy(url)); | |
2100 } | |
2101 | |
2102 // Establish the connection starting with the first proxy configured. | |
2103 _establishConnection(host, | |
2104 port, | |
2105 proxyConfiguration, | |
2106 0, | |
2107 reusedConnection, | |
2108 is_secure); | |
2109 | |
2110 return connection; | |
2111 } | |
2112 | |
2113 void _returnSocketConnection(_SocketConnection socketConn) { | |
2114 // If the HTTP client is being shutdown don't return the connection. | |
2115 if (_shutdown) { | |
2116 socketConn._close(); | |
2117 return; | |
2118 }; | |
2119 | |
2120 // Mark socket as returned to unregister from the old connection. | |
2121 socketConn._markReturned(); | |
2122 | |
2123 String key = _connectionKey(socketConn._host, socketConn._port); | |
2124 | |
2125 // Get or create the connection list for this key. | |
2126 Queue sockets = _openSockets[key]; | |
2127 if (sockets == null) { | |
2128 sockets = new Queue(); | |
2129 _openSockets[key] = sockets; | |
2130 } | |
2131 | |
2132 // If there is currently no eviction timer start one. | |
2133 if (_evictionTimer == null) { | |
2134 void _handleEviction(Timer timer) { | |
2135 DateTime now = new DateTime.now(); | |
2136 List<String> emptyKeys = new List<String>(); | |
2137 _openSockets.forEach( | |
2138 (String key, Queue<_SocketConnection> connections) { | |
2139 // As returned connections are added at the head of the | |
2140 // list remove from the tail. | |
2141 while (!connections.isEmpty) { | |
2142 _SocketConnection socketConn = connections.last; | |
2143 if (socketConn._idleTime(now).inMilliseconds > | |
2144 DEFAULT_EVICTION_TIMEOUT) { | |
2145 connections.removeLast(); | |
2146 socketConn._socket.close(); | |
2147 if (connections.isEmpty) emptyKeys.add(key); | |
2148 } else { | |
2149 break; | |
2150 } | |
2151 } | |
2152 }); | |
2153 | |
2154 // Remove the keys for which here are no more open connections. | |
2155 emptyKeys.forEach((String key) => _openSockets.remove(key)); | |
2156 | |
2157 // If all connections where evicted cancel the eviction timer. | |
2158 if (_openSockets.isEmpty) _cancelEvictionTimer(); | |
2159 } | |
2160 _evictionTimer = new Timer.repeating(const Duration(seconds: 10), | |
2161 _handleEviction); | |
2162 } | |
2163 | |
2164 // Return connection. | |
2165 _activeSockets.remove(socketConn); | |
2166 sockets.addFirst(socketConn); | |
2167 } | |
2168 | |
2169 void _closeSocketConnection(_SocketConnection socketConn) { | |
2170 socketConn._close(); | |
2171 _activeSockets.remove(socketConn); | |
2172 } | |
2173 | |
2174 void _closedSocketConnection(_SocketConnection socketConn) { | |
2175 _activeSockets.remove(socketConn); | |
2176 } | |
2177 | |
2178 _Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) { | |
2179 // Look for credentials. | |
2180 _Credentials cr = | |
2181 credentials.reduce(null, (_Credentials prev, _Credentials value) { | |
2182 if (value.applies(url, scheme)) { | |
2183 if (prev == null) return value; | |
2184 return value.uri.path.length > prev.uri.path.length ? value : prev; | |
2185 } else { | |
2186 return prev; | |
2187 } | |
2188 }); | |
2189 return cr; | |
2190 } | |
2191 | |
2192 void _removeCredentials(_Credentials cr) { | |
2193 int index = credentials.indexOf(cr); | |
2194 if (index != -1) { | |
2195 credentials.removeAt(index); | |
2196 } | |
2197 } | |
2198 | |
2199 Function _onOpen; | |
2200 Map<String, Queue<_SocketConnection>> _openSockets; | |
2201 Set<_SocketConnection> _activeSockets; | |
2202 _CloseQueue _closeQueue; | |
2203 List<_Credentials> credentials; | |
2204 Timer _evictionTimer; | |
2205 Function _findProxy; | |
2206 Function _authenticate; | |
2207 bool _sendClientCertificate = false; | |
2208 String _clientCertificate; | |
2209 bool _shutdown; // Has this HTTP client been shutdown? | |
2210 } | |
2211 | |
2212 | 1498 |
2213 class _HttpConnectionInfo implements HttpConnectionInfo { | 1499 class _HttpConnectionInfo implements HttpConnectionInfo { |
| 1500 static _HttpConnectionInfo create(Socket socket) { |
| 1501 if (socket == null) return null; |
| 1502 try { |
| 1503 _HttpConnectionInfo info = new _HttpConnectionInfo._(); |
| 1504 info.remoteHost = socket.remoteHost; |
| 1505 info.remotePort = socket.remotePort; |
| 1506 info.localPort = socket.port; |
| 1507 return info; |
| 1508 } catch (e) { } |
| 1509 return null; |
| 1510 } |
| 1511 |
| 1512 _HttpConnectionInfo._(); |
| 1513 |
2214 String remoteHost; | 1514 String remoteHost; |
2215 int remotePort; | 1515 int remotePort; |
2216 int localPort; | 1516 int localPort; |
2217 } | 1517 } |
2218 | 1518 |
2219 | 1519 |
2220 class _DetachedSocket implements DetachedSocket { | 1520 class _DetachedSocket implements Socket { |
2221 _DetachedSocket(this._socket, this._unparsedData); | 1521 final Stream<List<int>> _incoming; |
2222 Socket get socket => _socket; | 1522 final Socket _socket; |
2223 List<int> get unparsedData => _unparsedData; | 1523 |
2224 Socket _socket; | 1524 _DetachedSocket(this._socket, this._incoming); |
2225 List<int> _unparsedData; | 1525 |
| 1526 StreamSubscription<List<int>> listen(void onData(List<int> event), |
| 1527 {void onError(AsyncError error), |
| 1528 void onDone(), |
| 1529 bool unsubscribeOnError}) { |
| 1530 return _incoming.listen(onData, |
| 1531 onError: onError, |
| 1532 onDone: onDone, |
| 1533 unsubscribeOnError: unsubscribeOnError); |
| 1534 } |
| 1535 |
| 1536 Future<Socket> consume(Stream<List<int>> stream) { |
| 1537 return _socket.consume(stream); |
| 1538 } |
| 1539 |
| 1540 Future<Socket> addStream(Stream<List<int>> stream) { |
| 1541 return _socket.addStream(stream); |
| 1542 } |
| 1543 |
| 1544 void addString(String string, [Encoding encoding = Encoding.UTF_8]) { |
| 1545 return _socket.addString(string, encoding); |
| 1546 } |
| 1547 |
| 1548 void destroy() => _socket.destroy(); |
| 1549 void add(List<int> data) => _socket.add(data); |
| 1550 Future<Socket> close() => _socket.close(); |
2226 } | 1551 } |
2227 | 1552 |
2228 | 1553 |
2229 class _AuthenticationScheme { | 1554 class _AuthenticationScheme { |
2230 static const UNKNOWN = const _AuthenticationScheme(-1); | 1555 static const UNKNOWN = const _AuthenticationScheme(-1); |
2231 static const BASIC = const _AuthenticationScheme(0); | 1556 static const BASIC = const _AuthenticationScheme(0); |
2232 static const DIGEST = const _AuthenticationScheme(1); | 1557 static const DIGEST = const _AuthenticationScheme(1); |
2233 | 1558 |
2234 const _AuthenticationScheme(this._scheme); | 1559 const _AuthenticationScheme(this._scheme); |
2235 | 1560 |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2293 | 1618 |
2294 _AuthenticationScheme get scheme => _AuthenticationScheme.BASIC; | 1619 _AuthenticationScheme get scheme => _AuthenticationScheme.BASIC; |
2295 | 1620 |
2296 void authorize(_Credentials _, HttpClientRequest request) { | 1621 void authorize(_Credentials _, HttpClientRequest request) { |
2297 // There is no mentioning of username/password encoding in RFC | 1622 // There is no mentioning of username/password encoding in RFC |
2298 // 2617. However there is an open draft for adding an additional | 1623 // 2617. However there is an open draft for adding an additional |
2299 // accept-charset parameter to the WWW-Authenticate and | 1624 // accept-charset parameter to the WWW-Authenticate and |
2300 // Proxy-Authenticate headers, see | 1625 // Proxy-Authenticate headers, see |
2301 // http://tools.ietf.org/html/draft-reschke-basicauth-enc-06. For | 1626 // http://tools.ietf.org/html/draft-reschke-basicauth-enc-06. For |
2302 // now always use UTF-8 encoding. | 1627 // now always use UTF-8 encoding. |
2303 _UTF8Encoder encoder = new _UTF8Encoder(); | |
2304 String auth = | 1628 String auth = |
2305 CryptoUtils.bytesToBase64(encoder.encodeString( | 1629 CryptoUtils.bytesToBase64(_encodeString("$username:$password")); |
2306 "$username:$password")); | |
2307 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 1630 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
2308 } | 1631 } |
2309 | 1632 |
2310 String username; | 1633 String username; |
2311 String password; | 1634 String password; |
2312 } | 1635 } |
2313 | 1636 |
2314 | 1637 |
2315 class _HttpClientDigestCredentials implements HttpClientDigestCredentials { | 1638 class _HttpClientDigestCredentials implements HttpClientDigestCredentials { |
2316 _HttpClientDigestCredentials(this.username, | 1639 _HttpClientDigestCredentials(this.username, |
2317 this.password); | 1640 this.password); |
2318 | 1641 |
2319 _AuthenticationScheme get scheme => _AuthenticationScheme.DIGEST; | 1642 _AuthenticationScheme get scheme => _AuthenticationScheme.DIGEST; |
2320 | 1643 |
2321 void authorize(_Credentials credentials, HttpClientRequest request) { | 1644 void authorize(_Credentials credentials, HttpClientRequest request) { |
2322 // TODO(sgjesse): Implement!!! | 1645 // TODO(sgjesse): Implement!!! |
2323 throw new UnsupportedError("Digest authentication not yet supported"); | 1646 throw new UnsupportedError("Digest authentication not yet supported"); |
2324 } | 1647 } |
2325 | 1648 |
2326 String username; | 1649 String username; |
2327 String password; | 1650 String password; |
2328 } | 1651 } |
2329 | 1652 |
2330 | 1653 |
2331 | |
2332 class _RedirectInfo implements RedirectInfo { | 1654 class _RedirectInfo implements RedirectInfo { |
2333 const _RedirectInfo(int this.statusCode, | 1655 const _RedirectInfo(int this.statusCode, |
2334 String this.method, | 1656 String this.method, |
2335 Uri this.location); | 1657 Uri this.location); |
2336 final int statusCode; | 1658 final int statusCode; |
2337 final String method; | 1659 final String method; |
2338 final Uri location; | 1660 final Uri location; |
2339 } | 1661 } |
OLD | NEW |