OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
8 final int _transferLength; | 8 final int _transferLength; |
9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
(...skipping 19 matching lines...) Expand all Loading... |
30 // the length of the massage body is not known due to transfer | 30 // the length of the massage body is not known due to transfer |
31 // codings. | 31 // codings. |
32 int get transferLength => _transferLength; | 32 int get transferLength => _transferLength; |
33 | 33 |
34 _HttpIncoming(_HttpHeaders this.headers, | 34 _HttpIncoming(_HttpHeaders this.headers, |
35 int this._transferLength, | 35 int this._transferLength, |
36 Stream<List<int>> this._stream) { | 36 Stream<List<int>> this._stream) { |
37 } | 37 } |
38 | 38 |
39 StreamSubscription<List<int>> listen(void onData(List<int> event), | 39 StreamSubscription<List<int>> listen(void onData(List<int> event), |
40 {void onError(error), | 40 {Function onError, |
41 void onDone(), | 41 void onDone(), |
42 bool cancelOnError}) { | 42 bool cancelOnError}) { |
43 hasSubscriber = true; | 43 hasSubscriber = true; |
44 return _stream | 44 return _stream |
45 .handleError((error) { | 45 .handleError((error) { |
46 throw new HttpException(error.message, uri: uri); | 46 throw new HttpException(error.message, uri: uri); |
47 }) | 47 }) |
48 .listen(onData, | 48 .listen(onData, |
49 onError: onError, | 49 onError: onError, |
50 onDone: onDone, | 50 onDone: onDone, |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
104 _session = _httpServer._sessionManager.getSession(sessionId); | 104 _session = _httpServer._sessionManager.getSession(sessionId); |
105 if (_session != null) { | 105 if (_session != null) { |
106 _session._markSeen(); | 106 _session._markSeen(); |
107 break; | 107 break; |
108 } | 108 } |
109 } | 109 } |
110 } | 110 } |
111 } | 111 } |
112 | 112 |
113 StreamSubscription<List<int>> listen(void onData(List<int> event), | 113 StreamSubscription<List<int>> listen(void onData(List<int> event), |
114 {void onError(error), | 114 {Function onError, |
115 void onDone(), | 115 void onDone(), |
116 bool cancelOnError}) { | 116 bool cancelOnError}) { |
117 return _incoming.listen(onData, | 117 return _incoming.listen(onData, |
118 onError: onError, | 118 onError: onError, |
119 onDone: onDone, | 119 onDone: onDone, |
120 cancelOnError: cancelOnError); | 120 cancelOnError: cancelOnError); |
121 } | 121 } |
122 | 122 |
123 Uri get uri => _incoming.uri; | 123 Uri get uri => _incoming.uri; |
124 | 124 |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
231 .then((request) { | 231 .then((request) { |
232 request._responseRedirects.addAll(this.redirects); | 232 request._responseRedirects.addAll(this.redirects); |
233 request._responseRedirects.add(new _RedirectInfo(statusCode, | 233 request._responseRedirects.add(new _RedirectInfo(statusCode, |
234 method, | 234 method, |
235 url)); | 235 url)); |
236 return request.close(); | 236 return request.close(); |
237 }); | 237 }); |
238 } | 238 } |
239 | 239 |
240 StreamSubscription<List<int>> listen(void onData(List<int> event), | 240 StreamSubscription<List<int>> listen(void onData(List<int> event), |
241 {void onError(error), | 241 {Function onError, |
242 void onDone(), | 242 void onDone(), |
243 bool cancelOnError}) { | 243 bool cancelOnError}) { |
244 var stream = _incoming; | 244 var stream = _incoming; |
245 if (headers.value(HttpHeaders.CONTENT_ENCODING) == "gzip") { | 245 if (headers.value(HttpHeaders.CONTENT_ENCODING) == "gzip") { |
246 stream = stream.transform(GZIP.decoder); | 246 stream = stream.transform(GZIP.decoder); |
247 } | 247 } |
248 return stream.listen(onData, | 248 return stream.listen(onData, |
249 onError: onError, | 249 onError: onError, |
250 onDone: onDone, | 250 onDone: onDone, |
251 cancelOnError: cancelOnError); | 251 cancelOnError: cancelOnError); |
(...skipping 327 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
579 onPause: () => _subscription.pause(), | 579 onPause: () => _subscription.pause(), |
580 onResume: () => _subscription.resume(), | 580 onResume: () => _subscription.resume(), |
581 onListen: () => _subscription.resume(), | 581 onListen: () => _subscription.resume(), |
582 onCancel: _cancel); | 582 onCancel: _cancel); |
583 _outbound._addStream(_controller.stream) | 583 _outbound._addStream(_controller.stream) |
584 .then((_) { | 584 .then((_) { |
585 _cancel(); | 585 _cancel(); |
586 _done(); | 586 _done(); |
587 _closeCompleter.complete(_outbound); | 587 _closeCompleter.complete(_outbound); |
588 }, | 588 }, |
589 onError: (error) { | 589 onError: (error, [StackTrace stackTrace]) { |
590 _socketError = true; | 590 _socketError = true; |
591 if (_ignoreError(error)) { | 591 if (_ignoreError(error)) { |
592 _cancel(); | 592 _cancel(); |
593 _done(); | 593 _done(); |
594 _closeCompleter.complete(_outbound); | 594 _closeCompleter.complete(_outbound); |
595 } else { | 595 } else { |
596 if (!_done(error)) { | 596 if (!_done(error)) { |
597 _closeCompleter.completeError(error); | 597 _closeCompleter.completeError(error, stackTrace); |
598 } | 598 } |
599 } | 599 } |
600 }); | 600 }); |
601 } | 601 } |
602 | 602 |
603 bool _done([error]) { | 603 bool _done([error, StackTrace stackTrace]) { |
604 if (_completer == null) return false; | 604 if (_completer == null) return false; |
605 if (error != null) { | 605 if (error != null) { |
606 _completer.completeError(error); | 606 _completer.completeError(error, stackTrace); |
607 } else { | 607 } else { |
608 _completer.complete(_outbound); | 608 _completer.complete(_outbound); |
609 } | 609 } |
610 _completer = null; | 610 _completer = null; |
611 return true; | 611 return true; |
612 } | 612 } |
613 | 613 |
614 Future addStream(var stream) { | 614 Future addStream(var stream) { |
615 // If we saw a socket error subscribe and then cancel, to ignore any data | 615 // If we saw a socket error subscribe and then cancel, to ignore any data |
616 // on the stream. | 616 // on the stream. |
617 if (_socketError) { | 617 if (_socketError) { |
618 stream.listen(null).cancel(); | 618 stream.listen(null).cancel(); |
619 return new Future.value(_outbound); | 619 return new Future.value(_outbound); |
620 } | 620 } |
621 _completer = new Completer(); | 621 _completer = new Completer(); |
622 _subscription = stream.listen( | 622 _subscription = stream.listen( |
623 (data) { | 623 (data) { |
624 _controller.add(data); | 624 _controller.add(data); |
625 }, | 625 }, |
626 onDone: () { | 626 onDone: _done, |
627 _done(); | 627 onError: _done, |
628 }, | |
629 onError: (error) { | |
630 _done(error); | |
631 }, | |
632 cancelOnError: true); | 628 cancelOnError: true); |
633 // Pause the first request. | 629 // Pause the first request. |
634 if (_controller == null) _subscription.pause(); | 630 if (_controller == null) _subscription.pause(); |
635 _ensureController(); | 631 _ensureController(); |
636 return _completer.future; | 632 return _completer.future; |
637 } | 633 } |
638 | 634 |
639 Future close() { | 635 Future close() { |
640 Future closeOutbound() { | 636 Future closeOutbound() { |
641 if (_socketError) return new Future.value(_outbound); | 637 if (_socketError) return new Future.value(_outbound); |
(...skipping 300 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
942 } | 938 } |
943 } else if (response._shouldAuthenticateProxy) { | 939 } else if (response._shouldAuthenticateProxy) { |
944 future = response._authenticate(true); | 940 future = response._authenticate(true); |
945 } else if (response._shouldAuthenticate) { | 941 } else if (response._shouldAuthenticate) { |
946 future = response._authenticate(false); | 942 future = response._authenticate(false); |
947 } else { | 943 } else { |
948 future = new Future<HttpClientResponse>.value(response); | 944 future = new Future<HttpClientResponse>.value(response); |
949 } | 945 } |
950 future.then( | 946 future.then( |
951 (v) => _responseCompleter.complete(v), | 947 (v) => _responseCompleter.complete(v), |
952 onError: (e) { | 948 onError: _responseCompleter.completeError); |
953 _responseCompleter.completeError(e); | |
954 }); | |
955 } | 949 } |
956 | 950 |
957 void _onError(error) { | 951 void _onError(error, StackTrace stackTrace) { |
958 _responseCompleter.completeError(error); | 952 _responseCompleter.completeError(error, stackTrace); |
959 } | 953 } |
960 | 954 |
961 // Generate the request URI based on the method and proxy. | 955 // Generate the request URI based on the method and proxy. |
962 String _requestUri() { | 956 String _requestUri() { |
963 // Generate the request URI starting from the path component. | 957 // Generate the request URI starting from the path component. |
964 String uriStartingFromPath() { | 958 String uriStartingFromPath() { |
965 String result = uri.path; | 959 String result = uri.path; |
966 if (result.length == 0) result = "/"; | 960 if (result.length == 0) result = "/"; |
967 if (uri.query != "") { | 961 if (uri.query != "") { |
968 if (uri.fragment != "") { | 962 if (uri.fragment != "") { |
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1173 // Only handle one incoming response at the time. Keep the | 1167 // Only handle one incoming response at the time. Keep the |
1174 // stream paused until the response have been processed. | 1168 // stream paused until the response have been processed. |
1175 _subscription.pause(); | 1169 _subscription.pause(); |
1176 // We assume the response is not here, until we have send the request. | 1170 // We assume the response is not here, until we have send the request. |
1177 if (_nextResponseCompleter == null) { | 1171 if (_nextResponseCompleter == null) { |
1178 throw new HttpException("Unexpected response.", uri: _currentUri); | 1172 throw new HttpException("Unexpected response.", uri: _currentUri); |
1179 } | 1173 } |
1180 _nextResponseCompleter.complete(incoming); | 1174 _nextResponseCompleter.complete(incoming); |
1181 _nextResponseCompleter = null; | 1175 _nextResponseCompleter = null; |
1182 }, | 1176 }, |
1183 onError: (error) { | 1177 onError: (error, [StackTrace stackTrace]) { |
1184 if (_nextResponseCompleter != null) { | 1178 if (_nextResponseCompleter != null) { |
1185 _nextResponseCompleter.completeError( | 1179 _nextResponseCompleter.completeError( |
1186 new HttpException(error.message, uri: _currentUri)); | 1180 new HttpException(error.message, uri: _currentUri), |
| 1181 stackTrace); |
1187 _nextResponseCompleter = null; | 1182 _nextResponseCompleter = null; |
1188 } | 1183 } |
1189 }, | 1184 }, |
1190 onDone: () { | 1185 onDone: () { |
1191 if (_nextResponseCompleter != null) { | 1186 if (_nextResponseCompleter != null) { |
1192 _nextResponseCompleter.completeError(new HttpException( | 1187 _nextResponseCompleter.completeError(new HttpException( |
1193 "Connection closed before response was received", | 1188 "Connection closed before response was received", |
1194 uri: _currentUri)); | 1189 uri: _currentUri)); |
1195 _nextResponseCompleter = null; | 1190 _nextResponseCompleter = null; |
1196 } | 1191 } |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1298 } | 1293 } |
1299 } | 1294 } |
1300 request._onIncoming(incoming); | 1295 request._onIncoming(incoming); |
1301 }) | 1296 }) |
1302 // If we see a state error, we failed to get the 'first' | 1297 // If we see a state error, we failed to get the 'first' |
1303 // element. | 1298 // element. |
1304 .catchError((error) { | 1299 .catchError((error) { |
1305 throw new HttpException( | 1300 throw new HttpException( |
1306 "Connection closed before data was received", uri: uri); | 1301 "Connection closed before data was received", uri: uri); |
1307 }, test: (error) => error is StateError) | 1302 }, test: (error) => error is StateError) |
1308 .catchError((error) { | 1303 .catchError((error, stackTrace) { |
1309 // We are done with the socket. | 1304 // We are done with the socket. |
1310 destroy(); | 1305 destroy(); |
1311 request._onError(error); | 1306 request._onError(error, stackTrace); |
1312 }); | 1307 }); |
1313 | 1308 |
1314 // Resume the parser now we have a handler. | 1309 // Resume the parser now we have a handler. |
1315 _subscription.resume(); | 1310 _subscription.resume(); |
1316 return s; | 1311 return s; |
1317 }, onError: (e) { | 1312 }, onError: (e) { |
1318 destroy(); | 1313 destroy(); |
1319 }); | 1314 }); |
1320 return request; | 1315 return request; |
1321 } | 1316 } |
(...skipping 641 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1963 onCancel: close); | 1958 onCancel: close); |
1964 } | 1959 } |
1965 | 1960 |
1966 _HttpServer.listenOn(ServerSocket this._serverSocket) | 1961 _HttpServer.listenOn(ServerSocket this._serverSocket) |
1967 : _closeServer = false { | 1962 : _closeServer = false { |
1968 _controller = new StreamController<HttpRequest>(sync: true, | 1963 _controller = new StreamController<HttpRequest>(sync: true, |
1969 onCancel: close); | 1964 onCancel: close); |
1970 } | 1965 } |
1971 | 1966 |
1972 StreamSubscription<HttpRequest> listen(void onData(HttpRequest event), | 1967 StreamSubscription<HttpRequest> listen(void onData(HttpRequest event), |
1973 {void onError(error), | 1968 {Function onError, |
1974 void onDone(), | 1969 void onDone(), |
1975 bool cancelOnError}) { | 1970 bool cancelOnError}) { |
1976 _serverSocket.listen( | 1971 _serverSocket.listen( |
1977 (Socket socket) { | 1972 (Socket socket) { |
1978 socket.setOption(SocketOption.TCP_NODELAY, true); | 1973 socket.setOption(SocketOption.TCP_NODELAY, true); |
1979 // Accept the client connection. | 1974 // Accept the client connection. |
1980 _HttpConnection connection = new _HttpConnection(socket, this); | 1975 _HttpConnection connection = new _HttpConnection(socket, this); |
1981 _connections.add(connection); | 1976 _connections.add(connection); |
1982 }, | 1977 }, |
1983 onError: _controller.addError, | 1978 onError: _controller.addError, |
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2174 } | 2169 } |
2175 | 2170 |
2176 | 2171 |
2177 class _DetachedSocket extends Stream<List<int>> implements Socket { | 2172 class _DetachedSocket extends Stream<List<int>> implements Socket { |
2178 final Stream<List<int>> _incoming; | 2173 final Stream<List<int>> _incoming; |
2179 final Socket _socket; | 2174 final Socket _socket; |
2180 | 2175 |
2181 _DetachedSocket(this._socket, this._incoming); | 2176 _DetachedSocket(this._socket, this._incoming); |
2182 | 2177 |
2183 StreamSubscription<List<int>> listen(void onData(List<int> event), | 2178 StreamSubscription<List<int>> listen(void onData(List<int> event), |
2184 {void onError(error), | 2179 {Function onError, |
2185 void onDone(), | 2180 void onDone(), |
2186 bool cancelOnError}) { | 2181 bool cancelOnError}) { |
2187 return _incoming.listen(onData, | 2182 return _incoming.listen(onData, |
2188 onError: onError, | 2183 onError: onError, |
2189 onDone: onDone, | 2184 onDone: onDone, |
2190 cancelOnError: cancelOnError); | 2185 cancelOnError: cancelOnError); |
2191 } | 2186 } |
2192 | 2187 |
2193 Encoding get encoding => _socket.encoding; | 2188 Encoding get encoding => _socket.encoding; |
2194 | 2189 |
(...skipping 280 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2475 final Uri location; | 2470 final Uri location; |
2476 } | 2471 } |
2477 | 2472 |
2478 String _getHttpVersion() { | 2473 String _getHttpVersion() { |
2479 var version = Platform.version; | 2474 var version = Platform.version; |
2480 // Only include major and minor version numbers. | 2475 // Only include major and minor version numbers. |
2481 int index = version.indexOf('.', version.indexOf('.') + 1); | 2476 int index = version.indexOf('.', version.indexOf('.') + 1); |
2482 version = version.substring(0, index); | 2477 version = version.substring(0, index); |
2483 return 'Dart/$version (dart:io)'; | 2478 return 'Dart/$version (dart:io)'; |
2484 } | 2479 } |
OLD | NEW |