Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(168)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/file_impl.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 19 matching lines...) Expand all
30 // the length of the massage body is not known due to transfer 30 // the length of the massage body is not known due to transfer
31 // codings. 31 // codings.
32 int get transferLength => _transferLength; 32 int get transferLength => _transferLength;
33 33
34 _HttpIncoming(_HttpHeaders this.headers, 34 _HttpIncoming(_HttpHeaders this.headers,
35 int this._transferLength, 35 int this._transferLength,
36 Stream<List<int>> this._stream) { 36 Stream<List<int>> this._stream) {
37 } 37 }
38 38
39 StreamSubscription<List<int>> listen(void onData(List<int> event), 39 StreamSubscription<List<int>> listen(void onData(List<int> event),
40 {void onError(error), 40 {Function onError,
41 void onDone(), 41 void onDone(),
42 bool cancelOnError}) { 42 bool cancelOnError}) {
43 hasSubscriber = true; 43 hasSubscriber = true;
44 return _stream 44 return _stream
45 .handleError((error) { 45 .handleError((error) {
46 throw new HttpException(error.message, uri: uri); 46 throw new HttpException(error.message, uri: uri);
47 }) 47 })
48 .listen(onData, 48 .listen(onData,
49 onError: onError, 49 onError: onError,
50 onDone: onDone, 50 onDone: onDone,
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
104 _session = _httpServer._sessionManager.getSession(sessionId); 104 _session = _httpServer._sessionManager.getSession(sessionId);
105 if (_session != null) { 105 if (_session != null) {
106 _session._markSeen(); 106 _session._markSeen();
107 break; 107 break;
108 } 108 }
109 } 109 }
110 } 110 }
111 } 111 }
112 112
113 StreamSubscription<List<int>> listen(void onData(List<int> event), 113 StreamSubscription<List<int>> listen(void onData(List<int> event),
114 {void onError(error), 114 {Function onError,
115 void onDone(), 115 void onDone(),
116 bool cancelOnError}) { 116 bool cancelOnError}) {
117 return _incoming.listen(onData, 117 return _incoming.listen(onData,
118 onError: onError, 118 onError: onError,
119 onDone: onDone, 119 onDone: onDone,
120 cancelOnError: cancelOnError); 120 cancelOnError: cancelOnError);
121 } 121 }
122 122
123 Uri get uri => _incoming.uri; 123 Uri get uri => _incoming.uri;
124 124
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
231 .then((request) { 231 .then((request) {
232 request._responseRedirects.addAll(this.redirects); 232 request._responseRedirects.addAll(this.redirects);
233 request._responseRedirects.add(new _RedirectInfo(statusCode, 233 request._responseRedirects.add(new _RedirectInfo(statusCode,
234 method, 234 method,
235 url)); 235 url));
236 return request.close(); 236 return request.close();
237 }); 237 });
238 } 238 }
239 239
240 StreamSubscription<List<int>> listen(void onData(List<int> event), 240 StreamSubscription<List<int>> listen(void onData(List<int> event),
241 {void onError(error), 241 {Function onError,
242 void onDone(), 242 void onDone(),
243 bool cancelOnError}) { 243 bool cancelOnError}) {
244 var stream = _incoming; 244 var stream = _incoming;
245 if (headers.value(HttpHeaders.CONTENT_ENCODING) == "gzip") { 245 if (headers.value(HttpHeaders.CONTENT_ENCODING) == "gzip") {
246 stream = stream.transform(GZIP.decoder); 246 stream = stream.transform(GZIP.decoder);
247 } 247 }
248 return stream.listen(onData, 248 return stream.listen(onData,
249 onError: onError, 249 onError: onError,
250 onDone: onDone, 250 onDone: onDone,
251 cancelOnError: cancelOnError); 251 cancelOnError: cancelOnError);
(...skipping 327 matching lines...) Expand 10 before | Expand all | Expand 10 after
579 onPause: () => _subscription.pause(), 579 onPause: () => _subscription.pause(),
580 onResume: () => _subscription.resume(), 580 onResume: () => _subscription.resume(),
581 onListen: () => _subscription.resume(), 581 onListen: () => _subscription.resume(),
582 onCancel: _cancel); 582 onCancel: _cancel);
583 _outbound._addStream(_controller.stream) 583 _outbound._addStream(_controller.stream)
584 .then((_) { 584 .then((_) {
585 _cancel(); 585 _cancel();
586 _done(); 586 _done();
587 _closeCompleter.complete(_outbound); 587 _closeCompleter.complete(_outbound);
588 }, 588 },
589 onError: (error) { 589 onError: (error, [StackTrace stackTrace]) {
590 _socketError = true; 590 _socketError = true;
591 if (_ignoreError(error)) { 591 if (_ignoreError(error)) {
592 _cancel(); 592 _cancel();
593 _done(); 593 _done();
594 _closeCompleter.complete(_outbound); 594 _closeCompleter.complete(_outbound);
595 } else { 595 } else {
596 if (!_done(error)) { 596 if (!_done(error)) {
597 _closeCompleter.completeError(error); 597 _closeCompleter.completeError(error, stackTrace);
598 } 598 }
599 } 599 }
600 }); 600 });
601 } 601 }
602 602
603 bool _done([error]) { 603 bool _done([error, StackTrace stackTrace]) {
604 if (_completer == null) return false; 604 if (_completer == null) return false;
605 if (error != null) { 605 if (error != null) {
606 _completer.completeError(error); 606 _completer.completeError(error, stackTrace);
607 } else { 607 } else {
608 _completer.complete(_outbound); 608 _completer.complete(_outbound);
609 } 609 }
610 _completer = null; 610 _completer = null;
611 return true; 611 return true;
612 } 612 }
613 613
614 Future addStream(var stream) { 614 Future addStream(var stream) {
615 // If we saw a socket error subscribe and then cancel, to ignore any data 615 // If we saw a socket error subscribe and then cancel, to ignore any data
616 // on the stream. 616 // on the stream.
617 if (_socketError) { 617 if (_socketError) {
618 stream.listen(null).cancel(); 618 stream.listen(null).cancel();
619 return new Future.value(_outbound); 619 return new Future.value(_outbound);
620 } 620 }
621 _completer = new Completer(); 621 _completer = new Completer();
622 _subscription = stream.listen( 622 _subscription = stream.listen(
623 (data) { 623 (data) {
624 _controller.add(data); 624 _controller.add(data);
625 }, 625 },
626 onDone: () { 626 onDone: _done,
627 _done(); 627 onError: _done,
628 },
629 onError: (error) {
630 _done(error);
631 },
632 cancelOnError: true); 628 cancelOnError: true);
633 // Pause the first request. 629 // Pause the first request.
634 if (_controller == null) _subscription.pause(); 630 if (_controller == null) _subscription.pause();
635 _ensureController(); 631 _ensureController();
636 return _completer.future; 632 return _completer.future;
637 } 633 }
638 634
639 Future close() { 635 Future close() {
640 Future closeOutbound() { 636 Future closeOutbound() {
641 if (_socketError) return new Future.value(_outbound); 637 if (_socketError) return new Future.value(_outbound);
(...skipping 300 matching lines...) Expand 10 before | Expand all | Expand 10 after
942 } 938 }
943 } else if (response._shouldAuthenticateProxy) { 939 } else if (response._shouldAuthenticateProxy) {
944 future = response._authenticate(true); 940 future = response._authenticate(true);
945 } else if (response._shouldAuthenticate) { 941 } else if (response._shouldAuthenticate) {
946 future = response._authenticate(false); 942 future = response._authenticate(false);
947 } else { 943 } else {
948 future = new Future<HttpClientResponse>.value(response); 944 future = new Future<HttpClientResponse>.value(response);
949 } 945 }
950 future.then( 946 future.then(
951 (v) => _responseCompleter.complete(v), 947 (v) => _responseCompleter.complete(v),
952 onError: (e) { 948 onError: _responseCompleter.completeError);
953 _responseCompleter.completeError(e);
954 });
955 } 949 }
956 950
957 void _onError(error) { 951 void _onError(error, StackTrace stackTrace) {
958 _responseCompleter.completeError(error); 952 _responseCompleter.completeError(error, stackTrace);
959 } 953 }
960 954
961 // Generate the request URI based on the method and proxy. 955 // Generate the request URI based on the method and proxy.
962 String _requestUri() { 956 String _requestUri() {
963 // Generate the request URI starting from the path component. 957 // Generate the request URI starting from the path component.
964 String uriStartingFromPath() { 958 String uriStartingFromPath() {
965 String result = uri.path; 959 String result = uri.path;
966 if (result.length == 0) result = "/"; 960 if (result.length == 0) result = "/";
967 if (uri.query != "") { 961 if (uri.query != "") {
968 if (uri.fragment != "") { 962 if (uri.fragment != "") {
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after
1173 // Only handle one incoming response at the time. Keep the 1167 // Only handle one incoming response at the time. Keep the
1174 // stream paused until the response have been processed. 1168 // stream paused until the response have been processed.
1175 _subscription.pause(); 1169 _subscription.pause();
1176 // We assume the response is not here, until we have send the request. 1170 // We assume the response is not here, until we have send the request.
1177 if (_nextResponseCompleter == null) { 1171 if (_nextResponseCompleter == null) {
1178 throw new HttpException("Unexpected response.", uri: _currentUri); 1172 throw new HttpException("Unexpected response.", uri: _currentUri);
1179 } 1173 }
1180 _nextResponseCompleter.complete(incoming); 1174 _nextResponseCompleter.complete(incoming);
1181 _nextResponseCompleter = null; 1175 _nextResponseCompleter = null;
1182 }, 1176 },
1183 onError: (error) { 1177 onError: (error, [StackTrace stackTrace]) {
1184 if (_nextResponseCompleter != null) { 1178 if (_nextResponseCompleter != null) {
1185 _nextResponseCompleter.completeError( 1179 _nextResponseCompleter.completeError(
1186 new HttpException(error.message, uri: _currentUri)); 1180 new HttpException(error.message, uri: _currentUri),
1181 stackTrace);
1187 _nextResponseCompleter = null; 1182 _nextResponseCompleter = null;
1188 } 1183 }
1189 }, 1184 },
1190 onDone: () { 1185 onDone: () {
1191 if (_nextResponseCompleter != null) { 1186 if (_nextResponseCompleter != null) {
1192 _nextResponseCompleter.completeError(new HttpException( 1187 _nextResponseCompleter.completeError(new HttpException(
1193 "Connection closed before response was received", 1188 "Connection closed before response was received",
1194 uri: _currentUri)); 1189 uri: _currentUri));
1195 _nextResponseCompleter = null; 1190 _nextResponseCompleter = null;
1196 } 1191 }
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1298 } 1293 }
1299 } 1294 }
1300 request._onIncoming(incoming); 1295 request._onIncoming(incoming);
1301 }) 1296 })
1302 // If we see a state error, we failed to get the 'first' 1297 // If we see a state error, we failed to get the 'first'
1303 // element. 1298 // element.
1304 .catchError((error) { 1299 .catchError((error) {
1305 throw new HttpException( 1300 throw new HttpException(
1306 "Connection closed before data was received", uri: uri); 1301 "Connection closed before data was received", uri: uri);
1307 }, test: (error) => error is StateError) 1302 }, test: (error) => error is StateError)
1308 .catchError((error) { 1303 .catchError((error, stackTrace) {
1309 // We are done with the socket. 1304 // We are done with the socket.
1310 destroy(); 1305 destroy();
1311 request._onError(error); 1306 request._onError(error, stackTrace);
1312 }); 1307 });
1313 1308
1314 // Resume the parser now we have a handler. 1309 // Resume the parser now we have a handler.
1315 _subscription.resume(); 1310 _subscription.resume();
1316 return s; 1311 return s;
1317 }, onError: (e) { 1312 }, onError: (e) {
1318 destroy(); 1313 destroy();
1319 }); 1314 });
1320 return request; 1315 return request;
1321 } 1316 }
(...skipping 613 matching lines...) Expand 10 before | Expand all | Expand 10 after
1935 onCancel: close); 1930 onCancel: close);
1936 } 1931 }
1937 1932
1938 _HttpServer.listenOn(ServerSocket this._serverSocket) 1933 _HttpServer.listenOn(ServerSocket this._serverSocket)
1939 : _closeServer = false { 1934 : _closeServer = false {
1940 _controller = new StreamController<HttpRequest>(sync: true, 1935 _controller = new StreamController<HttpRequest>(sync: true,
1941 onCancel: close); 1936 onCancel: close);
1942 } 1937 }
1943 1938
1944 StreamSubscription<HttpRequest> listen(void onData(HttpRequest event), 1939 StreamSubscription<HttpRequest> listen(void onData(HttpRequest event),
1945 {void onError(error), 1940 {Function onError,
1946 void onDone(), 1941 void onDone(),
1947 bool cancelOnError}) { 1942 bool cancelOnError}) {
1948 _serverSocket.listen( 1943 _serverSocket.listen(
1949 (Socket socket) { 1944 (Socket socket) {
1950 socket.setOption(SocketOption.TCP_NODELAY, true); 1945 socket.setOption(SocketOption.TCP_NODELAY, true);
1951 // Accept the client connection. 1946 // Accept the client connection.
1952 _HttpConnection connection = new _HttpConnection(socket, this); 1947 _HttpConnection connection = new _HttpConnection(socket, this);
1953 _connections.add(connection); 1948 _connections.add(connection);
1954 }, 1949 },
1955 onError: _controller.addError, 1950 onError: _controller.addError,
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
2146 } 2141 }
2147 2142
2148 2143
2149 class _DetachedSocket extends Stream<List<int>> implements Socket { 2144 class _DetachedSocket extends Stream<List<int>> implements Socket {
2150 final Stream<List<int>> _incoming; 2145 final Stream<List<int>> _incoming;
2151 final Socket _socket; 2146 final Socket _socket;
2152 2147
2153 _DetachedSocket(this._socket, this._incoming); 2148 _DetachedSocket(this._socket, this._incoming);
2154 2149
2155 StreamSubscription<List<int>> listen(void onData(List<int> event), 2150 StreamSubscription<List<int>> listen(void onData(List<int> event),
2156 {void onError(error), 2151 {Function onError,
2157 void onDone(), 2152 void onDone(),
2158 bool cancelOnError}) { 2153 bool cancelOnError}) {
2159 return _incoming.listen(onData, 2154 return _incoming.listen(onData,
2160 onError: onError, 2155 onError: onError,
2161 onDone: onDone, 2156 onDone: onDone,
2162 cancelOnError: cancelOnError); 2157 cancelOnError: cancelOnError);
2163 } 2158 }
2164 2159
2165 Encoding get encoding => _socket.encoding; 2160 Encoding get encoding => _socket.encoding;
2166 2161
(...skipping 280 matching lines...) Expand 10 before | Expand all | Expand 10 after
2447 final Uri location; 2442 final Uri location;
2448 } 2443 }
2449 2444
2450 String _getHttpVersion() { 2445 String _getHttpVersion() {
2451 var version = Platform.version; 2446 var version = Platform.version;
2452 // Only include major and minor version numbers. 2447 // Only include major and minor version numbers.
2453 int index = version.indexOf('.', version.indexOf('.') + 1); 2448 int index = version.indexOf('.', version.indexOf('.') + 1);
2454 version = version.substring(0, index); 2449 version = version.substring(0, index);
2455 return 'Dart/$version (dart:io)'; 2450 return 'Dart/$version (dart:io)';
2456 } 2451 }
OLDNEW
« no previous file with comments | « sdk/lib/io/file_impl.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698