Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Review comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 return new Future.immediate(this); 319 return new Future.immediate(this);
320 } 320 }
321 } 321 }
322 322
323 323
324 abstract class _HttpOutboundMessage<T> implements IOSink { 324 abstract class _HttpOutboundMessage<T> implements IOSink {
325 // Used to mark when the body should be written. This is used for HEAD 325 // Used to mark when the body should be written. This is used for HEAD
326 // requests and in error handling. 326 // requests and in error handling.
327 bool _ignoreBody = false; 327 bool _ignoreBody = false;
328 bool _headersWritten = false; 328 bool _headersWritten = false;
329 bool _asGZip = false;
329 330
330 IOSink _ioSink; 331 IOSink _headersSink;
332 IOSink _dataSink;
333
331 final _HttpOutgoing _outgoing; 334 final _HttpOutgoing _outgoing;
332 335
333 final _HttpHeaders headers; 336 final _HttpHeaders headers;
334 337
335 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) 338 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing)
336 : _outgoing = outgoing, 339 : _outgoing = outgoing,
337 _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), 340 _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII),
338 headers = new _HttpHeaders(protocolVersion); 341 headers = new _HttpHeaders(protocolVersion) {
342 _dataSink = new IOSink(
343 new _HttpOutboundConsumer(_headersSink, _addStream, this));
344 }
339 345
340 int get contentLength => headers.contentLength; 346 int get contentLength => headers.contentLength;
341 void set contentLength(int contentLength) { 347 void set contentLength(int contentLength) {
342 headers.contentLength = contentLength; 348 headers.contentLength = contentLength;
343 } 349 }
344 350
345 bool get persistentConnection => headers.persistentConnection; 351 bool get persistentConnection => headers.persistentConnection;
346 void set persistentConnection(bool p) { 352 void set persistentConnection(bool p) {
347 headers.persistentConnection = p; 353 headers.persistentConnection = p;
348 } 354 }
(...skipping 20 matching lines...) Expand all
369 String string; 375 String string;
370 if (obj is String) { 376 if (obj is String) {
371 string = obj; 377 string = obj;
372 } else { 378 } else {
373 string = obj.toString(); 379 string = obj.toString();
374 if (string is! String) { 380 if (string is! String) {
375 throw new ArgumentError('toString() did not return a string'); 381 throw new ArgumentError('toString() did not return a string');
376 } 382 }
377 } 383 }
378 if (string.isEmpty) return; 384 if (string.isEmpty) return;
379 _ioSink.write(string); 385 _dataSink.write(string);
380 } 386 }
381 387
382 void writeAll(Iterable objects, [String separator = ""]) { 388 void writeAll(Iterable objects, [String separator = ""]) {
383 bool isFirst = true; 389 bool isFirst = true;
384 for (Object obj in objects) { 390 for (Object obj in objects) {
385 if (isFirst) { 391 if (isFirst) {
386 isFirst = false; 392 isFirst = false;
387 } else { 393 } else {
388 if (!separator.isEmpty) write(separator); 394 if (!separator.isEmpty) write(separator);
389 } 395 }
390 write(obj); 396 write(obj);
391 } 397 }
392 } 398 }
393 399
394 void writeln([Object obj = ""]) { 400 void writeln([Object obj = ""]) {
395 write(obj); 401 write(obj);
396 write("\n"); 402 write("\n");
397 } 403 }
398 404
399 void writeCharCode(int charCode) { 405 void writeCharCode(int charCode) {
400 write(new String.fromCharCode(charCode)); 406 write(new String.fromCharCode(charCode));
401 } 407 }
402 408
403 void add(List<int> data) { 409 void add(List<int> data) {
404 _writeHeaders(); 410 _writeHeaders();
405 if (data.length == 0) return; 411 if (data.length == 0) return;
406 _ioSink.add(data); 412 _dataSink.add(data);
407 } 413 }
408 414
409 void addError(AsyncError error) { 415 void addError(AsyncError error) {
410 _writeHeaders(); 416 _writeHeaders();
411 _ioSink.addError(error); 417 _dataSink.addError(error);
412 }
413
414 Future<T> consume(Stream<List<int>> stream) {
415 _writeHeaders();
416 return _ioSink.consume(stream);
417 } 418 }
418 419
419 Future<T> addStream(Stream<List<int>> stream) { 420 Future<T> addStream(Stream<List<int>> stream) {
420 _writeHeaders(); 421 _writeHeaders();
421 return _ioSink.writeStream(stream).then((_) => this); 422 return _dataSink.addStream(stream);
422 }
423
424 Future<T> writeStream(Stream<List<int>> stream) {
425 return addStream(stream);
426 } 423 }
427 424
428 Future close() { 425 Future close() {
429 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and 426 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
430 // persistentConnection is not guaranteed to be in sync. 427 // persistentConnection is not guaranteed to be in sync.
431 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { 428 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) {
432 // If no body was written, _ignoreBody is false (it's not a HEAD 429 // If no body was written, _ignoreBody is false (it's not a HEAD
433 // request) and the content-length is unspecified, set contentLength to 0. 430 // request) and the content-length is unspecified, set contentLength to 0.
434 headers.chunkedTransferEncoding = false; 431 headers.chunkedTransferEncoding = false;
435 headers.contentLength = 0; 432 headers.contentLength = 0;
436 } 433 }
437 _writeHeaders(); 434 _writeHeaders();
438 return _ioSink.close(); 435 return _dataSink.close();
439 } 436 }
440 437
441 Future<T> get done { 438 Future<T> get done => _dataSink.done.then((_) => this);
442 _writeHeaders();
443 return _ioSink.done;
444 }
445 439
446 void _writeHeaders() { 440 void _writeHeaders() {
447 if (_headersWritten) return; 441 if (_headersWritten) return;
448 _headersWritten = true; 442 _headersWritten = true;
449 _ioSink.encoding = Encoding.ASCII;
450 headers._synchronize(); // Be sure the 'chunked' option is updated. 443 headers._synchronize(); // Be sure the 'chunked' option is updated.
451 bool asGZip = false;
452 bool isServerSide = this is _HttpResponse; 444 bool isServerSide = this is _HttpResponse;
453 if (isServerSide && headers.chunkedTransferEncoding) { 445 if (isServerSide && headers.chunkedTransferEncoding) {
454 var response = this; 446 var response = this;
455 List acceptEncodings = 447 List acceptEncodings =
456 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; 448 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
457 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; 449 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
458 if (acceptEncodings != null && 450 if (acceptEncodings != null &&
459 acceptEncodings 451 acceptEncodings
460 .expand((list) => list.split(",")) 452 .expand((list) => list.split(","))
461 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && 453 .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
462 contentEncoding == null) { 454 contentEncoding == null) {
463 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); 455 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
464 asGZip = true; 456 _asGZip = true;
465 } 457 }
466 } 458 }
467 _writeHeader(); 459 _writeHeader();
468 _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip));
469 _ioSink.encoding = encoding;
470 } 460 }
471 461
472 Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { 462 Future _addStream(IOSink ioSink, Stream<List<int>> stream) {
473 int contentLength = headers.contentLength; 463 int contentLength = headers.contentLength;
474 if (_ignoreBody) { 464 if (_ignoreBody) {
475 ioSink.close(); 465 stream.fold(null, (x, y) {}).catchError((_) {});
476 return stream.fold(null, (x, y) {}).then((_) => this); 466 return ioSink.close().then((_) => this);
477 } 467 }
478 stream = stream.transform(new _BufferTransformer()); 468 stream = stream.transform(new _BufferTransformer());
479 if (headers.chunkedTransferEncoding) { 469 if (headers.chunkedTransferEncoding) {
480 if (asGZip) { 470 if (_asGZip) {
481 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); 471 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
482 } 472 }
483 stream = stream.transform(new _ChunkedTransformer()); 473 stream = stream.transform(new _ChunkedTransformer());
484 } else if (contentLength >= 0) { 474 } else if (contentLength >= 0) {
485 stream = stream.transform(new _ContentLengthValidator(contentLength)); 475 stream = stream.transform(new _ContentLengthValidator(contentLength));
486 } 476 }
487 return stream.pipe(ioSink).then((_) => this); 477 return ioSink.addStream(stream).then((_) => this);
488 } 478 }
489 479
490 void _writeHeader(); // TODO(ajohnsen): Better name. 480 void _writeHeader(); // TODO(ajohnsen): Better name.
491 } 481 }
492 482
493 483
494 class _HttpOutboundConsumer implements StreamConsumer { 484 class _HttpOutboundConsumer implements StreamConsumer {
495 Function _consume; 485 StreamController _controller;
486 StreamSubscription _subscription;
487 Function _addStream;
496 IOSink _ioSink; 488 IOSink _ioSink;
497 bool _asGZip; 489 Completer _closeCompleter = new Completer();
490 Completer _completer;
491
492 _HttpOutboundMessage _outbound;
498 _HttpOutboundConsumer(IOSink this._ioSink, 493 _HttpOutboundConsumer(IOSink this._ioSink,
499 Function this._consume, 494 Function this._addStream,
500 bool this._asGZip); 495 _HttpOutboundMessage this._outbound);
501 496
502 Future consume(var stream) => _consume(_ioSink, stream, _asGZip); 497 void _onPause() {
498 if (_controller.isPaused) {
499 _subscription.pause();
500 } else {
501 _subscription.resume();
502 }
503 }
504
505 void _onListen() {
506 if (!_controller.hasListener && _subscription != null) {
507 _subscription.cancel();
508 }
509 }
510
511 _ensureController() {
512 if (_controller != null) return;
513 _controller = new StreamController(onPauseStateChange: _onPause,
514 onSubscriptionStateChange: _onListen);
515 _addStream(_ioSink, _controller.stream)
516 .then((_) {
517 _done();
518 _closeCompleter.complete(_outbound);
519 },
520 onError: (error) {
521 if (!_done(error)) {
522 _closeCompleter.completeError(error);
523 }
524 });
525 }
526
527 bool _done([error]) {
528 if (_completer == null) return false;
529 var tmp = _completer;
530 _completer = null;
531 if (error != null) {
532 tmp.completeError(error);
533 } else {
534 tmp.complete(_outbound);
535 }
536 return true;
537 }
503 538
504 Future addStream(var stream) { 539 Future addStream(var stream) {
505 throw new UnimplementedError("_HttpOutboundConsumer.addStream"); 540 _ensureController();
541 _completer = new Completer();
542 _subscription = stream.listen(
543 (data) {
544 _controller.add(data);
545 },
546 onDone: () {
547 _done();
548 },
549 onError: (error) {
550 _done(error);
551 },
552 unsubscribeOnError: true);
553 return _completer.future;
506 } 554 }
507 555
508 Future close() { 556 Future close() {
509 throw new UnimplementedError("_HttpOutboundConsumer.close"); 557 _ensureController();
558 _controller.close();
559 return _closeCompleter.future.then((_) {
560 return _ioSink.close().then((_) => _outbound);
561 });
510 } 562 }
511 } 563 }
512 564
513 565
514 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { 566 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> {
515 const int MIN_CHUNK_SIZE = 4 * 1024; 567 const int MIN_CHUNK_SIZE = 4 * 1024;
516 const int MAX_BUFFER_SIZE = 16 * 1024; 568 const int MAX_BUFFER_SIZE = 16 * 1024;
517 569
518 final _BufferList _buffer = new _BufferList(); 570 final _BufferList _buffer = new _BufferList();
519 571
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
567 _reasonPhrase = reasonPhrase; 619 _reasonPhrase = reasonPhrase;
568 } 620 }
569 621
570 Future<Socket> detachSocket() { 622 Future<Socket> detachSocket() {
571 if (_headersWritten) throw new StateError("Headers already sent"); 623 if (_headersWritten) throw new StateError("Headers already sent");
572 _writeHeaders(); 624 _writeHeaders();
573 var future = _httpRequest._httpConnection.detachSocket(); 625 var future = _httpRequest._httpConnection.detachSocket();
574 // Close connection so the socket is 'free'. 626 // Close connection so the socket is 'free'.
575 close(); 627 close();
576 done.catchError((_) { 628 done.catchError((_) {
577 // Catch any error on done, as they automatically will be propegated to 629 // Catch any error on done, as they automatically will be
578 // the websocket. 630 // propagated to the websocket.
579 }); 631 });
580 return future; 632 return future;
581 } 633 }
582 634
583 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; 635 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
584 636
585 void _writeHeader() { 637 void _writeHeader() {
586 var buffer = new _BufferList(); 638 var buffer = new _BufferList();
587 writeSP() => buffer.add(const [_CharCode.SP]); 639 writeSP() => buffer.add(const [_CharCode.SP]);
588 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); 640 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]);
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
625 _cookies.forEach((cookie) { 677 _cookies.forEach((cookie) {
626 headers.add(HttpHeaders.SET_COOKIE, cookie); 678 headers.add(HttpHeaders.SET_COOKIE, cookie);
627 }); 679 });
628 } 680 }
629 681
630 headers._finalize(); 682 headers._finalize();
631 683
632 // Write headers. 684 // Write headers.
633 headers._write(buffer); 685 headers._write(buffer);
634 writeCRLF(); 686 writeCRLF();
635 _ioSink.add(buffer.readBytes()); 687 _headersSink.add(buffer.readBytes());
636 } 688 }
637 689
638 String _findReasonPhrase(int statusCode) { 690 String _findReasonPhrase(int statusCode) {
639 if (_reasonPhrase != null) { 691 if (_reasonPhrase != null) {
640 return _reasonPhrase; 692 return _reasonPhrase;
641 } 693 }
642 694
643 switch (statusCode) { 695 switch (statusCode) {
644 case HttpStatus.CONTINUE: return "Continue"; 696 case HttpStatus.CONTINUE: return "Continue";
645 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; 697 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols";
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
724 _HttpClientConnection this._httpClientConnection) 776 _HttpClientConnection this._httpClientConnection)
725 : super("1.1", outgoing) { 777 : super("1.1", outgoing) {
726 // GET and HEAD have 'content-length: 0' by default. 778 // GET and HEAD have 'content-length: 0' by default.
727 if (method == "GET" || method == "HEAD") { 779 if (method == "GET" || method == "HEAD") {
728 contentLength = 0; 780 contentLength = 0;
729 } 781 }
730 } 782 }
731 783
732 Future<HttpClientResponse> get done { 784 Future<HttpClientResponse> get done {
733 if (_response == null) { 785 if (_response == null) {
734 _response = Future.wait([_responseCompleter.future, super.done]) 786 _response = Future.wait([_responseCompleter.future,
787 super.done])
735 .then((list) => list[0]); 788 .then((list) => list[0]);
736 } 789 }
737 return _response; 790 return _response;
738 } 791 }
739 792
740 Future<HttpClientResponse> consume(Stream<List<int>> stream) {
741 super.consume(stream);
742 return done;
743 }
744
745 Future<HttpClientResponse> close() { 793 Future<HttpClientResponse> close() {
746 super.close(); 794 super.close();
747 return done; 795 return done;
748 } 796 }
749 797
750 int get maxRedirects => _maxRedirects; 798 int get maxRedirects => _maxRedirects;
751 void set maxRedirects(int maxRedirects) { 799 void set maxRedirects(int maxRedirects) {
752 if (_headersWritten) throw new StateError("Request already sent"); 800 if (_headersWritten) throw new StateError("Request already sent");
753 _maxRedirects = maxRedirects; 801 _maxRedirects = maxRedirects;
754 } 802 }
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
830 sb.write(cookies[i].value); 878 sb.write(cookies[i].value);
831 } 879 }
832 headers.add(HttpHeaders.COOKIE, sb.toString()); 880 headers.add(HttpHeaders.COOKIE, sb.toString());
833 } 881 }
834 882
835 headers._finalize(); 883 headers._finalize();
836 884
837 // Write headers. 885 // Write headers.
838 headers._write(buffer); 886 headers._write(buffer);
839 writeCRLF(); 887 writeCRLF();
840 _ioSink.add(buffer.readBytes()); 888 _headersSink.add(buffer.readBytes());
841 } 889 }
842 } 890 }
843 891
844 892
845 // Transformer that transforms data to HTTP Chunked Encoding. 893 // Transformer that transforms data to HTTP Chunked Encoding.
846 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { 894 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> {
847 void handleData(List<int> data, EventSink<List<int>> sink) { 895 void handleData(List<int> data, EventSink<List<int>> sink) {
848 sink.add(_chunkHeader(data.length)); 896 sink.add(_chunkHeader(data.length));
849 if (data.length > 0) sink.add(data); 897 if (data.length > 0) sink.add(data);
850 sink.add(_chunkFooter); 898 sink.add(_chunkFooter);
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
906 " $_bytesWritten bytes written while expected " 954 " $_bytesWritten bytes written while expected "
907 "$expectedContentLength."))); 955 "$expectedContentLength.")));
908 } 956 }
909 sink.close(); 957 sink.close();
910 } 958 }
911 } 959 }
912 960
913 961
914 // Extends StreamConsumer as this is an internal type, only used to pipe to. 962 // Extends StreamConsumer as this is an internal type, only used to pipe to.
915 class _HttpOutgoing implements StreamConsumer<List<int>> { 963 class _HttpOutgoing implements StreamConsumer<List<int>> {
916 Function _onStream; 964 final Completer _doneCompleter = new Completer();
917 final Completer _consumeCompleter = new Completer(); 965 final StreamConsumer _consumer;
918 966
919 Future onStream(Future callback(Stream<List<int>> stream)) { 967 _HttpOutgoing(StreamConsumer this._consumer);
920 _onStream = callback;
921 return _consumeCompleter.future;
922 }
923
924 Future consume(Stream<List<int>> stream) {
925 _onStream(stream)
926 .then((_) => _consumeCompleter.complete(),
927 onError: _consumeCompleter.completeError);
928 // Use .then to ensure a Future branch.
929 return _consumeCompleter.future.then((_) => this);
930 }
931 968
932 Future addStream(Stream<List<int>> stream) { 969 Future addStream(Stream<List<int>> stream) {
933 throw new UnimplementedError("_HttpOutgoing.addStream"); 970 return _consumer.addStream(stream)
971 .catchError((error) {
972 _doneCompleter.completeError(error);
973 throw error;
974 });
934 } 975 }
935 976
936 Future close() { 977 Future close() {
937 throw new UnimplementedError("_HttpOutgoing.close"); 978 _doneCompleter.complete(_consumer);
979 return new Future.immediate(null);
938 } 980 }
981
982 Future get done => _doneCompleter.future;
939 } 983 }
940 984
941 985
942 class _HttpClientConnection { 986 class _HttpClientConnection {
943 final String key; 987 final String key;
944 final Socket _socket; 988 final Socket _socket;
945 final _HttpParser _httpParser; 989 final _HttpParser _httpParser;
946 StreamSubscription _subscription; 990 StreamSubscription _subscription;
947 final _HttpClient _httpClient; 991 final _HttpClient _httpClient;
948 992
949 Completer<_HttpIncoming> _nextResponseCompleter; 993 Completer<_HttpIncoming> _nextResponseCompleter;
950 Future _streamFuture; 994 Future _streamFuture;
951 995
952 _HttpClientConnection(String this.key, 996 _HttpClientConnection(String this.key,
953 Socket this._socket, 997 Socket this._socket,
954 _HttpClient this._httpClient) 998 _HttpClient this._httpClient)
955 : _httpParser = new _HttpParser.responseParser() { 999 : _httpParser = new _HttpParser.responseParser() {
956 _socket.pipe(_httpParser); 1000 _socket.pipe(_httpParser);
957 _socket.done.catchError((e) { destroy(); });
958 1001
959 // Set up handlers on the parser here, so we are sure to get 'onDone' from 1002 // Set up handlers on the parser here, so we are sure to get 'onDone' from
960 // the parser. 1003 // the parser.
961 _subscription = _httpParser.listen( 1004 _subscription = _httpParser.listen(
962 (incoming) { 1005 (incoming) {
963 // Only handle one incoming response at the time. Keep the 1006 // Only handle one incoming response at the time. Keep the
964 // stream paused until the response have been processed. 1007 // stream paused until the response have been processed.
965 _subscription.pause(); 1008 _subscription.pause();
966 // We assume the response is not here, until we have send the request. 1009 // We assume the response is not here, until we have send the request.
967 assert(_nextResponseCompleter != null); 1010 assert(_nextResponseCompleter != null);
968 var completer = _nextResponseCompleter; 1011 var completer = _nextResponseCompleter;
969 _nextResponseCompleter = null; 1012 _nextResponseCompleter = null;
970 completer.complete(incoming); 1013 completer.complete(incoming);
971 }, 1014 },
972 onError: (error) { 1015 onError: (error) {
973 if (_nextResponseCompleter != null) { 1016 if (_nextResponseCompleter != null) {
974 _nextResponseCompleter.completeError(error); 1017 _nextResponseCompleter.completeError(error);
975 _nextResponseCompleter = null; 1018 _nextResponseCompleter = null;
976 } 1019 }
977 }, 1020 },
978 onDone: () { 1021 onDone: () {
979 close(); 1022 close();
980 }); 1023 });
981 } 1024 }
982 1025
983 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { 1026 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) {
984 // Start with pausing the parser. 1027 // Start with pausing the parser.
985 _subscription.pause(); 1028 _subscription.pause();
986 var outgoing = new _HttpOutgoing(); 1029 var outgoing = new _HttpOutgoing(_socket);
987 // Create new request object, wrapping the outgoing connection. 1030 // Create new request object, wrapping the outgoing connection.
988 var request = new _HttpClientRequest(outgoing, 1031 var request = new _HttpClientRequest(outgoing,
989 uri, 1032 uri,
990 method, 1033 method,
991 !isDirect, 1034 !isDirect,
992 _httpClient, 1035 _httpClient,
993 this); 1036 this);
994 request.headers.host = uri.domain; 1037 request.headers.host = uri.domain;
995 request.headers.port = port; 1038 request.headers.port = port;
996 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); 1039 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip");
997 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 1040 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
998 // If the URL contains user information use that for basic 1041 // If the URL contains user information use that for basic
999 // authorization 1042 // authorization
1000 String auth = 1043 String auth =
1001 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); 1044 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo));
1002 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 1045 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
1003 } else { 1046 } else {
1004 // Look for credentials. 1047 // Look for credentials.
1005 _Credentials cr = _httpClient._findCredentials(uri); 1048 _Credentials cr = _httpClient._findCredentials(uri);
1006 if (cr != null) { 1049 if (cr != null) {
1007 cr.authorize(request); 1050 cr.authorize(request);
1008 } 1051 }
1009 } 1052 }
1010 // Start sending the request (lazy, delayed until the user provides 1053 // Start sending the request (lazy, delayed until the user provides
1011 // data). 1054 // data).
1012 _httpParser.responseToMethod = method; 1055 _httpParser.responseToMethod = method;
1013 _streamFuture = outgoing.onStream((stream) { 1056 _streamFuture = outgoing.done
1014 return _socket.writeStream(stream) 1057 .then((s) {
1015 .then((s) { 1058 // Request sent, set up response completer.
1016 // Request sent, set up response completer. 1059 _nextResponseCompleter = new Completer();
1017 _nextResponseCompleter = new Completer();
1018 1060
1019 // Listen for response. 1061 // Listen for response.
1020 _nextResponseCompleter.future 1062 _nextResponseCompleter.future
1021 .then((incoming) { 1063 .then((incoming) {
1022 incoming.dataDone.then((_) { 1064 incoming.dataDone.then((_) {
1023 if (incoming.headers.persistentConnection && 1065 if (incoming.headers.persistentConnection &&
1024 request.persistentConnection) { 1066 request.persistentConnection) {
1025 // Return connection, now we are done. 1067 // Return connection, now we are done.
1026 _httpClient._returnConnection(this); 1068 _httpClient._returnConnection(this);
1027 _subscription.resume(); 1069 _subscription.resume();
1028 } else { 1070 } else {
1029 destroy();
1030 }
1031 });
1032 request._onIncoming(incoming);
1033 })
1034 // If we see a state error, we failed to get the 'first'
1035 // element.
1036 // Transform the error to a HttpParserException, for
1037 // consistency.
1038 .catchError((error) {
1039 throw new HttpParserException(
1040 "Connection closed before data was received");
1041 }, test: (error) => error is StateError)
1042 .catchError((error) {
1043 // We are done with the socket.
1044 destroy(); 1071 destroy();
1045 request._onError(error); 1072 }
1046 }); 1073 });
1074 request._onIncoming(incoming);
1075 })
1076 // If we see a state error, we failed to get the 'first'
1077 // element.
1078 // Transform the error to a HttpParserException, for
1079 // consistency.
1080 .catchError((error) {
1081 throw new HttpParserException(
1082 "Connection closed before data was received");
1083 }, test: (error) => error is StateError)
1084 .catchError((error) {
1085 // We are done with the socket.
1086 destroy();
1087 request._onError(error);
1088 });
1047 1089
1048 // Resume the parser now we have a handler. 1090 // Resume the parser now we have a handler.
1049 _subscription.resume(); 1091 _subscription.resume();
1050 return s; 1092 return s;
1051 }, onError: (e) { 1093 }, onError: (e) {
1052 destroy(); 1094 destroy();
1053 throw e; 1095 });
1054 });
1055 });
1056 return request; 1096 return request;
1057 } 1097 }
1058 1098
1059 Future<Socket> detachSocket() { 1099 Future<Socket> detachSocket() {
1060 return _streamFuture 1100 return _streamFuture.then(
1061 .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), 1101 (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()));
1062 onError: (_) {});
1063 } 1102 }
1064 1103
1065 void destroy() { 1104 void destroy() {
1066 _httpClient._connectionClosed(this); 1105 _httpClient._connectionClosed(this);
1067 _socket.destroy(); 1106 _socket.destroy();
1068 } 1107 }
1069 1108
1070 void close() { 1109 void close() {
1071 _httpClient._connectionClosed(this); 1110 _httpClient._connectionClosed(this);
1072 _streamFuture 1111 _streamFuture
1073 // TODO(ajohnsen): Add timeout. 1112 // TODO(ajohnsen): Add timeout.
1074 .then((_) => _socket.destroy(), 1113 .then((_) => _socket.destroy());
1075 onError: (_) {});
1076 } 1114 }
1077 1115
1078 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); 1116 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
1079 } 1117 }
1080 1118
1081 class _ConnnectionInfo { 1119 class _ConnnectionInfo {
1082 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); 1120 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy);
1083 final _HttpClientConnection connection; 1121 final _HttpClientConnection connection;
1084 final _Proxy proxy; 1122 final _Proxy proxy;
1085 } 1123 }
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after
1370 final Socket _socket; 1408 final Socket _socket;
1371 final _HttpServer _httpServer; 1409 final _HttpServer _httpServer;
1372 final _HttpParser _httpParser; 1410 final _HttpParser _httpParser;
1373 StreamSubscription _subscription; 1411 StreamSubscription _subscription;
1374 1412
1375 Future _streamFuture; 1413 Future _streamFuture;
1376 1414
1377 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) 1415 _HttpConnection(Socket this._socket, _HttpServer this._httpServer)
1378 : _httpParser = new _HttpParser.requestParser() { 1416 : _httpParser = new _HttpParser.requestParser() {
1379 _socket.pipe(_httpParser); 1417 _socket.pipe(_httpParser);
1380 _socket.done.catchError((e) => destroy());
1381 _subscription = _httpParser.listen( 1418 _subscription = _httpParser.listen(
1382 (incoming) { 1419 (incoming) {
1383 // Only handle one incoming request at the time. Keep the 1420 // Only handle one incoming request at the time. Keep the
1384 // stream paused until the request has been send. 1421 // stream paused until the request has been send.
1385 _subscription.pause(); 1422 _subscription.pause();
1386 _state = _ACTIVE; 1423 _state = _ACTIVE;
1387 var outgoing = new _HttpOutgoing(); 1424 var outgoing = new _HttpOutgoing(_socket);
1388 var response = new _HttpResponse(incoming.headers.protocolVersion, 1425 var response = new _HttpResponse(incoming.headers.protocolVersion,
1389 outgoing); 1426 outgoing);
1390 var request = new _HttpRequest(response, incoming, _httpServer, this); 1427 var request = new _HttpRequest(response, incoming, _httpServer, this);
1391 outgoing.onStream((stream) { 1428 _streamFuture = outgoing.done
1392 return _streamFuture = _socket.writeStream(stream) 1429 .then((_) {
1393 .then((_) { 1430 if (_state == _DETACHED) return;
1394 if (_state == _DETACHED) return; 1431 if (response.persistentConnection &&
1395 if (response.persistentConnection && 1432 request.persistentConnection &&
1396 request.persistentConnection && 1433 incoming.fullBodyRead) {
1397 incoming.fullBodyRead) { 1434 _state = _IDLE;
1398 _state = _IDLE; 1435 // Resume the subscription for incoming requests as the
1399 // Resume the subscription for incoming requests as the 1436 // request is now processed.
1400 // request is now processed. 1437 _subscription.resume();
1401 _subscription.resume(); 1438 } else {
1402 } else { 1439 // Close socket, keep-alive not used or body sent before
1403 // Close socket, keep-alive not used or body sent before 1440 // received data was handled.
1404 // received data was handled.
1405 destroy();
1406 }
1407 })
1408 .catchError((e) {
1409 destroy(); 1441 destroy();
1410 throw e; 1442 }
1411 }); 1443 })
1412 }); 1444 .catchError((e) {
1445 destroy();
1446 });
1413 response._ignoreBody = request.method == "HEAD"; 1447 response._ignoreBody = request.method == "HEAD";
1414 response._httpRequest = request; 1448 response._httpRequest = request;
1415 _httpServer._handleRequest(request); 1449 _httpServer._handleRequest(request);
1416 }, 1450 },
1417 onDone: () { 1451 onDone: () {
1418 destroy(); 1452 destroy();
1419 }, 1453 },
1420 onError: (error) { 1454 onError: (error) {
1421 _httpServer._handleError(error); 1455 _httpServer._handleError(error);
1422 destroy(); 1456 destroy();
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after
1684 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); 1718 void writeCharCode(int charCode) => _socket.writeCharCode(charCode);
1685 1719
1686 void writeAll(Iterable objects, [String separator = ""]) { 1720 void writeAll(Iterable objects, [String separator = ""]) {
1687 _socket.writeAll(objects, separator); 1721 _socket.writeAll(objects, separator);
1688 } 1722 }
1689 1723
1690 void add(List<int> bytes) => _socket.add(bytes); 1724 void add(List<int> bytes) => _socket.add(bytes);
1691 1725
1692 void addError(AsyncError error) => _socket.addError(error); 1726 void addError(AsyncError error) => _socket.addError(error);
1693 1727
1694 Future<Socket> consume(Stream<List<int>> stream) {
1695 return _socket.consume(stream);
1696 }
1697
1698 Future<Socket> addStream(Stream<List<int>> stream) { 1728 Future<Socket> addStream(Stream<List<int>> stream) {
1699 return _socket.addStream(stream); 1729 return _socket.addStream(stream);
1700 } 1730 }
1701 1731
1702 Future<Socket> writeStream(Stream<List<int>> stream) {
1703 return _socket.writeStream(stream);
1704 }
1705
1706 void destroy() => _socket.destroy(); 1732 void destroy() => _socket.destroy();
1707 1733
1708 Future close() => _socket.close(); 1734 Future close() => _socket.close();
1709 1735
1710 Future<Socket> get done => _socket.done; 1736 Future<Socket> get done => _socket.done;
1711 1737
1712 int get port => _socket.port; 1738 int get port => _socket.port;
1713 1739
1714 String get remoteHost => _socket.remoteHost; 1740 String get remoteHost => _socket.remoteHost;
1715 1741
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
1826 1852
1827 1853
1828 class _RedirectInfo implements RedirectInfo { 1854 class _RedirectInfo implements RedirectInfo {
1829 const _RedirectInfo(int this.statusCode, 1855 const _RedirectInfo(int this.statusCode,
1830 String this.method, 1856 String this.method,
1831 Uri this.location); 1857 Uri this.location);
1832 final int statusCode; 1858 final int statusCode;
1833 final String method; 1859 final String method;
1834 final Uri location; 1860 final Uri location;
1835 } 1861 }
OLDNEW
« no previous file with comments | « sdk/lib/io/http.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698