Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(776)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 36643002: Don't close existing connection on HttpServer close (and stream cancel). (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Always close idle connections. Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 520 matching lines...) Expand 10 before | Expand all | Expand 10 after
531 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and 531 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
532 // persistentConnection is not guaranteed to be in sync. 532 // persistentConnection is not guaranteed to be in sync.
533 if (!_headersWritten) { 533 if (!_headersWritten) {
534 if (!_ignoreBody && headers.contentLength == -1) { 534 if (!_ignoreBody && headers.contentLength == -1) {
535 // If no body was written, _ignoreBody is false (it's not a HEAD 535 // If no body was written, _ignoreBody is false (it's not a HEAD
536 // request) and the content-length is unspecified, set contentLength to 536 // request) and the content-length is unspecified, set contentLength to
537 // 0. 537 // 0.
538 headers.chunkedTransferEncoding = false; 538 headers.chunkedTransferEncoding = false;
539 headers.contentLength = 0; 539 headers.contentLength = 0;
540 } else if (!_ignoreBody && headers.contentLength > 0) { 540 } else if (!_ignoreBody && headers.contentLength > 0) {
541 _headersSink.close().catchError((_) {}); 541 _headersSink.addError(new HttpException(
542 return new Future.error(new HttpException(
543 "No content while contentLength was specified to be greater " 542 "No content while contentLength was specified to be greater "
544 " than 0: ${headers.contentLength}.", 543 "than 0: ${headers.contentLength}.",
545 uri: _uri)); 544 uri: _uri));
545 return _headersSink.done;
546 } 546 }
547 } 547 }
548 return _writeHeaders().then((_) => _headersSink.close()); 548 return _writeHeaders().then((_) => _headersSink.close());
549 } 549 }
550 550
551 void _writeHeader(); // TODO(ajohnsen): Better name. 551 void _writeHeader(); // TODO(ajohnsen): Better name.
552 } 552 }
553 553
554 554
555 class _HttpOutboundConsumer implements StreamConsumer { 555 class _HttpOutboundConsumer implements StreamConsumer {
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
613 613
614 Future addStream(var stream) { 614 Future addStream(var stream) {
615 // If we saw a socket error subscribe and then cancel, to ignore any data 615 // If we saw a socket error subscribe and then cancel, to ignore any data
616 // on the stream. 616 // on the stream.
617 if (_socketError) { 617 if (_socketError) {
618 stream.listen(null).cancel(); 618 stream.listen(null).cancel();
619 return new Future.value(_outbound); 619 return new Future.value(_outbound);
620 } 620 }
621 _completer = new Completer(); 621 _completer = new Completer();
622 _subscription = stream.listen( 622 _subscription = stream.listen(
623 (data) { 623 (data) => _controller.add(data),
624 _controller.add(data);
625 },
626 onDone: _done, 624 onDone: _done,
627 onError: _done, 625 onError: (e, s) => _controller.addError(e, s),
628 cancelOnError: true); 626 cancelOnError: true);
629 // Pause the first request. 627 // Pause the first request.
630 if (_controller == null) _subscription.pause(); 628 if (_controller == null) _subscription.pause();
631 _ensureController(); 629 _ensureController();
632 return _completer.future; 630 return _completer.future;
633 } 631 }
634 632
635 Future close() { 633 Future close() {
636 Future closeOutbound() { 634 Future closeOutbound() {
637 if (_socketError) return new Future.value(_outbound); 635 if (_socketError) return new Future.value(_outbound);
(...skipping 1233 matching lines...) Expand 10 before | Expand all | Expand 10 after
1871 incoming.headers.protocolVersion, 1869 incoming.headers.protocolVersion,
1872 outgoing, 1870 outgoing,
1873 _httpServer.serverHeader); 1871 _httpServer.serverHeader);
1874 var request = new _HttpRequest(response, incoming, _httpServer, this); 1872 var request = new _HttpRequest(response, incoming, _httpServer, this);
1875 _streamFuture = outgoing.done 1873 _streamFuture = outgoing.done
1876 .then((_) { 1874 .then((_) {
1877 response.deadline = null; 1875 response.deadline = null;
1878 if (_state == _DETACHED) return; 1876 if (_state == _DETACHED) return;
1879 if (response.persistentConnection && 1877 if (response.persistentConnection &&
1880 request.persistentConnection && 1878 request.persistentConnection &&
1881 incoming.fullBodyRead) { 1879 incoming.fullBodyRead &&
1880 !_httpParser.upgrade) {
1882 _state = _IDLE; 1881 _state = _IDLE;
1883 _startTimeout(); 1882 _startTimeout();
1884 // Resume the subscription for incoming requests as the 1883 // Resume the subscription for incoming requests as the
1885 // request is now processed. 1884 // request is now processed.
1886 _subscription.resume(); 1885 _subscription.resume();
1887 } else { 1886 } else {
1888 // Close socket, keep-alive not used or body sent before 1887 // Close socket, keep-alive not used or body sent before
1889 // received data was handled. 1888 // received data was handled.
1890 destroy(); 1889 destroy();
1891 } 1890 }
1892 }) 1891 })
1893 .catchError((e) { 1892 .catchError((e) {
1894 destroy(); 1893 destroy();
1895 }); 1894 });
1896 response._ignoreBody = request.method == "HEAD"; 1895 response._ignoreBody = request.method == "HEAD";
1897 response._httpRequest = request; 1896 response._httpRequest = request;
1898 _httpServer._handleRequest(request); 1897 _httpServer._handleRequest(request, this);
1899 }, 1898 },
1900 onDone: () { 1899 onDone: () {
1901 destroy(); 1900 destroy();
1902 }, 1901 },
1903 onError: (error) { 1902 onError: (error) {
1904 // Ignore failed requests that was closed before headers was received. 1903 // Ignore failed requests that was closed before headers was received.
1905 destroy(); 1904 destroy();
1906 }); 1905 });
1907 } 1906 }
1908 1907
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
2000 _connections.add(connection); 1999 _connections.add(connection);
2001 }, 2000 },
2002 onError: _controller.addError, 2001 onError: _controller.addError,
2003 onDone: _controller.close); 2002 onDone: _controller.close);
2004 return _controller.stream.listen(onData, 2003 return _controller.stream.listen(onData,
2005 onError: onError, 2004 onError: onError,
2006 onDone: onDone, 2005 onDone: onDone,
2007 cancelOnError: cancelOnError); 2006 cancelOnError: cancelOnError);
2008 } 2007 }
2009 2008
2010 Future close() { 2009 Future close({bool force: false}) {
2011 closed = true; 2010 closed = true;
2012 Future result; 2011 Future result;
2013 if (_serverSocket != null && _closeServer) { 2012 if (_serverSocket != null && _closeServer) {
2014 result = _serverSocket.close(); 2013 result = _serverSocket.close();
2015 } else { 2014 } else {
2016 result = new Future.value(); 2015 result = new Future.value();
2017 } 2016 }
2018 if (_sessionManagerInstance != null) { 2017 if (force) {
2018 for (var c in _connections.toList()) {
2019 c.destroy();
2020 }
2021 assert(_connections.isEmpty);
2022 } else {
2023 for (var c in _connections.where((c) => c._isIdle).toList()) {
2024 c.destroy();
2025 }
2026 }
2027 _maybeCloseSessionManager();
2028 return result;
2029 }
2030
2031 void _maybeCloseSessionManager() {
2032 if (closed &&
2033 _connections.isEmpty &&
2034 _sessionManagerInstance != null) {
2019 _sessionManagerInstance.close(); 2035 _sessionManagerInstance.close();
2020 _sessionManagerInstance = null; 2036 _sessionManagerInstance = null;
2021 } 2037 }
2022 for (_HttpConnection connection in _connections.toList()) {
2023 connection.destroy();
2024 }
2025 _connections.clear();
2026 return result;
2027 } 2038 }
2028 2039
2029 int get port { 2040 int get port {
2030 if (closed) throw new HttpException("HttpServer is not bound to a socket"); 2041 if (closed) throw new HttpException("HttpServer is not bound to a socket");
2031 return _serverSocket.port; 2042 return _serverSocket.port;
2032 } 2043 }
2033 2044
2034 InternetAddress get address { 2045 InternetAddress get address {
2035 if (closed) throw new HttpException("HttpServer is not bound to a socket"); 2046 if (closed) throw new HttpException("HttpServer is not bound to a socket");
2036 return _serverSocket.address; 2047 return _serverSocket.address;
2037 } 2048 }
2038 2049
2039 set sessionTimeout(int timeout) { 2050 set sessionTimeout(int timeout) {
2040 _sessionManager.sessionTimeout = timeout; 2051 _sessionManager.sessionTimeout = timeout;
2041 } 2052 }
2042 2053
2043 void _handleRequest(HttpRequest request) { 2054 void _handleRequest(HttpRequest request, _HttpConnection connection) {
2055 if (closed) {
2056 connection.destroy();
2057 return;
2058 }
2044 _controller.add(request); 2059 _controller.add(request);
2045 } 2060 }
2046 2061
2047 void _handleError(error) { 2062 void _handleError(error) {
2048 if (!closed) _controller.addError(error); 2063 if (!closed) _controller.addError(error);
2049 } 2064 }
2050 2065
2051 void _connectionClosed(_HttpConnection connection) { 2066 void _connectionClosed(_HttpConnection connection) {
2052 _connections.remove(connection); 2067 _connections.remove(connection);
2068 _maybeCloseSessionManager();
2053 } 2069 }
2054 2070
2055 _HttpSessionManager get _sessionManager { 2071 _HttpSessionManager get _sessionManager {
2056 // Lazy init. 2072 // Lazy init.
2057 if (_sessionManagerInstance == null) { 2073 if (_sessionManagerInstance == null) {
2058 _sessionManagerInstance = new _HttpSessionManager(); 2074 _sessionManagerInstance = new _HttpSessionManager();
2059 } 2075 }
2060 return _sessionManagerInstance; 2076 return _sessionManagerInstance;
2061 } 2077 }
2062 2078
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
2169 final String username; 2185 final String username;
2170 final String password; 2186 final String password;
2171 final bool isDirect; 2187 final bool isDirect;
2172 } 2188 }
2173 2189
2174 2190
2175 class _HttpConnectionInfo implements HttpConnectionInfo { 2191 class _HttpConnectionInfo implements HttpConnectionInfo {
2176 static _HttpConnectionInfo create(Socket socket) { 2192 static _HttpConnectionInfo create(Socket socket) {
2177 if (socket == null) return null; 2193 if (socket == null) return null;
2178 try { 2194 try {
2179 _HttpConnectionInfo info = new _HttpConnectionInfo._(); 2195 _HttpConnectionInfo info = new _HttpConnectionInfo();
2180 info.remoteHost = socket.remoteHost; 2196 info.remoteHost = socket.remoteHost;
2181 info.remotePort = socket.remotePort; 2197 info.remotePort = socket.remotePort;
2182 info.localPort = socket.port; 2198 info.localPort = socket.port;
2183 return info; 2199 return info;
2184 } catch (e) { } 2200 } catch (e) { }
2185 return null; 2201 return null;
2186 } 2202 }
2187 2203
2188 _HttpConnectionInfo._();
2189
2190 String remoteHost; 2204 String remoteHost;
2191 int remotePort; 2205 int remotePort;
2192 int localPort; 2206 int localPort;
2193 } 2207 }
2194 2208
2195 2209
2196 class _DetachedSocket extends Stream<List<int>> implements Socket { 2210 class _DetachedSocket extends Stream<List<int>> implements Socket {
2197 final Stream<List<int>> _incoming; 2211 final Stream<List<int>> _incoming;
2198 final Socket _socket; 2212 final Socket _socket;
2199 2213
(...skipping 294 matching lines...) Expand 10 before | Expand all | Expand 10 after
2494 final Uri location; 2508 final Uri location;
2495 } 2509 }
2496 2510
2497 String _getHttpVersion() { 2511 String _getHttpVersion() {
2498 var version = Platform.version; 2512 var version = Platform.version;
2499 // Only include major and minor version numbers. 2513 // Only include major and minor version numbers.
2500 int index = version.indexOf('.', version.indexOf('.') + 1); 2514 int index = version.indexOf('.', version.indexOf('.') + 1);
2501 version = version.substring(0, index); 2515 version = version.substring(0, index);
2502 return 'Dart/$version (dart:io)'; 2516 return 'Dart/$version (dart:io)';
2503 } 2517 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698