Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(378)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Add new test file. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 return new Future.immediate(this); 319 return new Future.immediate(this);
320 } 320 }
321 } 321 }
322 322
323 323
324 abstract class _HttpOutboundMessage<T> implements IOSink { 324 abstract class _HttpOutboundMessage<T> implements IOSink {
325 // Used to mark when the body should be written. This is used for HEAD 325 // Used to mark when the body should be written. This is used for HEAD
326 // requests and in error handling. 326 // requests and in error handling.
327 bool _ignoreBody = false; 327 bool _ignoreBody = false;
328 bool _headersWritten = false; 328 bool _headersWritten = false;
329 bool _asGZip = false;
329 330
330 IOSink _ioSink; 331 IOSink _headersSink;
332 IOSink _dataSink;
333
331 final _HttpOutgoing _outgoing; 334 final _HttpOutgoing _outgoing;
332 335
333 final _HttpHeaders headers; 336 final _HttpHeaders headers;
334 337
335 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) 338 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing)
336 : _outgoing = outgoing, 339 : _outgoing = outgoing,
337 _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), 340 _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII),
338 headers = new _HttpHeaders(protocolVersion); 341 headers = new _HttpHeaders(protocolVersion) {
342 _dataSink = new IOSink(
343 new _HttpOutboundConsumer(_headersSink, _addStream, this));
344 }
339 345
340 int get contentLength => headers.contentLength; 346 int get contentLength => headers.contentLength;
341 void set contentLength(int contentLength) { 347 void set contentLength(int contentLength) {
342 headers.contentLength = contentLength; 348 headers.contentLength = contentLength;
343 } 349 }
344 350
345 bool get persistentConnection => headers.persistentConnection; 351 bool get persistentConnection => headers.persistentConnection;
346 void set persistentConnection(bool p) { 352 void set persistentConnection(bool p) {
347 headers.persistentConnection = p; 353 headers.persistentConnection = p;
348 } 354 }
(...skipping 20 matching lines...) Expand all
369 String string; 375 String string;
370 if (obj is String) { 376 if (obj is String) {
371 string = obj; 377 string = obj;
372 } else { 378 } else {
373 string = obj.toString(); 379 string = obj.toString();
374 if (string is! String) { 380 if (string is! String) {
375 throw new ArgumentError('toString() did not return a string'); 381 throw new ArgumentError('toString() did not return a string');
376 } 382 }
377 } 383 }
378 if (string.isEmpty) return; 384 if (string.isEmpty) return;
379 _ioSink.write(string); 385 _dataSink.write(string);
380 } 386 }
381 387
382 void writeAll(Iterable objects, [String separator = ""]) { 388 void writeAll(Iterable objects, [String separator = ""]) {
383 bool isFirst = true; 389 bool isFirst = true;
384 for (Object obj in objects) { 390 for (Object obj in objects) {
385 if (isFirst) { 391 if (isFirst) {
386 isFirst = false; 392 isFirst = false;
387 } else { 393 } else {
388 if (!separator.isEmpty) write(separator); 394 if (!separator.isEmpty) write(separator);
389 } 395 }
390 write(obj); 396 write(obj);
391 } 397 }
392 } 398 }
393 399
394 void writeln([Object obj = ""]) { 400 void writeln([Object obj = ""]) {
395 write(obj); 401 write(obj);
396 write("\n"); 402 write("\n");
397 } 403 }
398 404
399 void writeCharCode(int charCode) { 405 void writeCharCode(int charCode) {
400 write(new String.fromCharCode(charCode)); 406 write(new String.fromCharCode(charCode));
401 } 407 }
402 408
403 void add(List<int> data) { 409 void add(List<int> data) {
404 _writeHeaders(); 410 _writeHeaders();
405 if (data.length == 0) return; 411 if (data.length == 0) return;
406 _ioSink.add(data); 412 _dataSink.add(data);
407 } 413 }
408 414
409 void addError(AsyncError error) { 415 void addError(AsyncError error) {
410 _writeHeaders(); 416 _writeHeaders();
411 _ioSink.addError(error); 417 _dataSink.addError(error);
412 }
413
414 Future<T> consume(Stream<List<int>> stream) {
415 _writeHeaders();
416 return _ioSink.consume(stream);
417 } 418 }
418 419
419 Future<T> addStream(Stream<List<int>> stream) { 420 Future<T> addStream(Stream<List<int>> stream) {
420 _writeHeaders(); 421 _writeHeaders();
421 return _ioSink.writeStream(stream).then((_) => this); 422 return _dataSink.addStream(stream);
422 }
423
424 Future<T> writeStream(Stream<List<int>> stream) {
425 return addStream(stream);
426 } 423 }
427 424
428 Future close() { 425 Future close() {
429 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and 426 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
430 // persistentConnection is not guaranteed to be in sync. 427 // persistentConnection is not guaranteed to be in sync.
431 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { 428 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) {
432 // If no body was written, _ignoreBody is false (it's not a HEAD 429 // If no body was written, _ignoreBody is false (it's not a HEAD
433 // request) and the content-length is unspecified, set contentLength to 0. 430 // request) and the content-length is unspecified, set contentLength to 0.
434 headers.chunkedTransferEncoding = false; 431 headers.chunkedTransferEncoding = false;
435 headers.contentLength = 0; 432 headers.contentLength = 0;
436 } 433 }
437 _writeHeaders(); 434 _writeHeaders();
438 return _ioSink.close(); 435 return _dataSink.close();
439 } 436 }
440 437
441 Future<T> get done { 438 Future<T> get done => _dataSink.done.then((_) => this);
442 _writeHeaders();
443 return _ioSink.done;
444 }
445 439
446 void _writeHeaders() { 440 void _writeHeaders() {
447 if (_headersWritten) return; 441 if (_headersWritten) return;
448 _headersWritten = true; 442 _headersWritten = true;
449 _ioSink.encoding = Encoding.ASCII;
450 headers._synchronize(); // Be sure the 'chunked' option is updated. 443 headers._synchronize(); // Be sure the 'chunked' option is updated.
451 bool asGZip = false;
452 bool isServerSide = this is _HttpResponse; 444 bool isServerSide = this is _HttpResponse;
453 if (isServerSide && headers.chunkedTransferEncoding) { 445 if (isServerSide && headers.chunkedTransferEncoding) {
454 var response = this; 446 var response = this;
455 List acceptEncodings = 447 List acceptEncodings =
456 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; 448 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING];
457 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; 449 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING];
458 if (acceptEncodings != null && 450 if (acceptEncodings != null &&
459 acceptEncodings 451 acceptEncodings
460 .expand((list) => list.split(",")) 452 .expand((list) => list.split(","))
461 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && 453 .any((encoding) => encoding.trim().toLowerCase() == "gzip") &&
462 contentEncoding == null) { 454 contentEncoding == null) {
463 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); 455 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip");
464 asGZip = true; 456 _asGZip = true;
465 } 457 }
466 } 458 }
467 _writeHeader(); 459 _writeHeader();
468 _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip));
469 _ioSink.encoding = encoding;
470 } 460 }
471 461
472 Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { 462 Future _addStream(IOSink ioSink, Stream<List<int>> stream) {
473 int contentLength = headers.contentLength; 463 int contentLength = headers.contentLength;
474 if (_ignoreBody) { 464 if (_ignoreBody) {
475 ioSink.close(); 465 stream.fold(null, (x, y) {}).catchError((_) {});
476 return stream.fold(null, (x, y) {}).then((_) => this); 466 return ioSink.close().then((_) => this);
477 } 467 }
478 stream = stream.transform(new _BufferTransformer()); 468 stream = stream.transform(new _BufferTransformer());
479 if (headers.chunkedTransferEncoding) { 469 if (headers.chunkedTransferEncoding) {
480 if (asGZip) { 470 if (_asGZip) {
481 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); 471 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6));
482 } 472 }
483 stream = stream.transform(new _ChunkedTransformer()); 473 stream = stream.transform(new _ChunkedTransformer());
484 } else if (contentLength >= 0) { 474 } else if (contentLength >= 0) {
485 stream = stream.transform(new _ContentLengthValidator(contentLength)); 475 stream = stream.transform(new _ContentLengthValidator(contentLength));
486 } 476 }
487 return stream.pipe(ioSink).then((_) => this); 477 return ioSink.addStream(stream).then((_) => this);
488 } 478 }
489 479
490 void _writeHeader(); // TODO(ajohnsen): Better name. 480 void _writeHeader(); // TODO(ajohnsen): Better name.
491 } 481 }
492 482
493 483
494 class _HttpOutboundConsumer implements StreamConsumer { 484 class _HttpOutboundConsumer implements StreamConsumer {
495 Function _consume; 485 StreamController _controller;
486 StreamSubscription _subscription;
487 Function _addStream;
496 IOSink _ioSink; 488 IOSink _ioSink;
497 bool _asGZip; 489 Completer _closeCompleter = new Completer();
490 Completer _completer;
491
492 _HttpOutboundMessage _outbound;
498 _HttpOutboundConsumer(IOSink this._ioSink, 493 _HttpOutboundConsumer(IOSink this._ioSink,
499 Function this._consume, 494 Function this._addStream,
500 bool this._asGZip); 495 _HttpOutboundMessage this._outbound);
501 496
502 Future consume(var stream) => _consume(_ioSink, stream, _asGZip); 497 void _onPause() {
498 if (_controller.isPaused) {
499 _subscription.pause();
500 } else {
501 _subscription.resume();
502 }
503 }
504
505 void _onListen() {
506 if (!_controller.hasListener && _subscription != null) {
507 _subscription.cancel();
508 }
509 }
510
511 _ensureController() {
512 if (_controller != null) return;
513 _controller = new StreamController(onPauseStateChange: _onPause,
514 onSubscriptionStateChange: _onListen);
515 _addStream(_ioSink, _controller.stream)
516 .then((v) {
Søren Gjesse 2013/04/15 06:56:30 v -> _
Anders Johnsen 2013/04/15 07:35:20 Done.
517 _done();
518 _closeCompleter.complete(_outbound);
519 },
520 onError: (error) {
521 if (!_done(error)) {
522 _closeCompleter.completeError(error);
523 }
524 });
525 }
526
527 bool _done([error]) {
528 if (_completer == null) return false;
529 var tmp = _completer;
530 _completer = null;
531 if (error != null) {
532 tmp.completeError(error);
533 } else {
534 tmp.complete(_outbound);
535 }
536 return true;
537 }
503 538
504 Future addStream(var stream) { 539 Future addStream(var stream) {
505 throw new UnimplementedError("_HttpOutboundConsumer.addStream"); 540 _ensureController();
541 _completer = new Completer();
542 _subscription = stream.listen(
543 (data) {
544 _controller.add(data);
545 },
546 onDone: () {
547 _done();
548 },
549 onError: (error) {
550 _done(error);
551 },
552 unsubscribeOnError: true);
553 return _completer.future;
506 } 554 }
507 555
508 Future close() { 556 Future close() {
509 throw new UnimplementedError("_HttpOutboundConsumer.close"); 557 _ensureController();
558 _controller.close();
559 return _closeCompleter.future.then((_) {
560 return _ioSink.close().then((_) => _outbound);
561 });
510 } 562 }
511 } 563 }
512 564
513 565
514 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { 566 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> {
515 const int MIN_CHUNK_SIZE = 4 * 1024; 567 const int MIN_CHUNK_SIZE = 4 * 1024;
516 const int MAX_BUFFER_SIZE = 16 * 1024; 568 const int MAX_BUFFER_SIZE = 16 * 1024;
517 569
518 final _BufferList _buffer = new _BufferList(); 570 final _BufferList _buffer = new _BufferList();
519 571
(...skipping 11 matching lines...) Expand all
531 } 583 }
532 } 584 }
533 585
534 void handleDone(EventSink<List<int>> sink) { 586 void handleDone(EventSink<List<int>> sink) {
535 flush(sink); 587 flush(sink);
536 sink.close(); 588 sink.close();
537 } 589 }
538 590
539 void flush(EventSink<List<int>> sink) { 591 void flush(EventSink<List<int>> sink) {
540 if (_buffer.length > 0) { 592 if (_buffer.length > 0) {
541 sink.add(_buffer.readBytes()); 593 var data = _buffer.readBytes();
Søren Gjesse 2013/04/15 06:56:30 Why this change?
Anders Johnsen 2013/04/15 07:35:20 Done.
594 sink.add(data);
542 _buffer.clear(); 595 _buffer.clear();
543 } 596 }
544 } 597 }
545 } 598 }
546 599
547 600
548 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> 601 class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
549 implements HttpResponse { 602 implements HttpResponse {
550 int statusCode = 200; 603 int statusCode = 200;
551 String _reasonPhrase; 604 String _reasonPhrase;
(...skipping 15 matching lines...) Expand all
567 _reasonPhrase = reasonPhrase; 620 _reasonPhrase = reasonPhrase;
568 } 621 }
569 622
570 Future<Socket> detachSocket() { 623 Future<Socket> detachSocket() {
571 if (_headersWritten) throw new StateError("Headers already sent"); 624 if (_headersWritten) throw new StateError("Headers already sent");
572 _writeHeaders(); 625 _writeHeaders();
573 var future = _httpRequest._httpConnection.detachSocket(); 626 var future = _httpRequest._httpConnection.detachSocket();
574 // Close connection so the socket is 'free'. 627 // Close connection so the socket is 'free'.
575 close(); 628 close();
576 done.catchError((_) { 629 done.catchError((_) {
577 // Catch any error on done, as they automatically will be propegated to 630 // Catch any error on done, as they automatically will be
578 // the websocket. 631 // propegated to the websocket.
Søren Gjesse 2013/04/15 06:56:30 propagated
Anders Johnsen 2013/04/15 07:35:20 Done.
579 }); 632 });
580 return future; 633 return future;
581 } 634 }
582 635
583 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; 636 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
584 637
585 void _writeHeader() { 638 void _writeHeader() {
586 var buffer = new _BufferList(); 639 var buffer = new _BufferList();
587 writeSP() => buffer.add(const [_CharCode.SP]); 640 writeSP() => buffer.add(const [_CharCode.SP]);
588 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); 641 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]);
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
625 _cookies.forEach((cookie) { 678 _cookies.forEach((cookie) {
626 headers.add(HttpHeaders.SET_COOKIE, cookie); 679 headers.add(HttpHeaders.SET_COOKIE, cookie);
627 }); 680 });
628 } 681 }
629 682
630 headers._finalize(); 683 headers._finalize();
631 684
632 // Write headers. 685 // Write headers.
633 headers._write(buffer); 686 headers._write(buffer);
634 writeCRLF(); 687 writeCRLF();
635 _ioSink.add(buffer.readBytes()); 688 _headersSink.add(buffer.readBytes());
636 } 689 }
637 690
638 String _findReasonPhrase(int statusCode) { 691 String _findReasonPhrase(int statusCode) {
639 if (_reasonPhrase != null) { 692 if (_reasonPhrase != null) {
640 return _reasonPhrase; 693 return _reasonPhrase;
641 } 694 }
642 695
643 switch (statusCode) { 696 switch (statusCode) {
644 case HttpStatus.CONTINUE: return "Continue"; 697 case HttpStatus.CONTINUE: return "Continue";
645 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; 698 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols";
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
724 _HttpClientConnection this._httpClientConnection) 777 _HttpClientConnection this._httpClientConnection)
725 : super("1.1", outgoing) { 778 : super("1.1", outgoing) {
726 // GET and HEAD have 'content-length: 0' by default. 779 // GET and HEAD have 'content-length: 0' by default.
727 if (method == "GET" || method == "HEAD") { 780 if (method == "GET" || method == "HEAD") {
728 contentLength = 0; 781 contentLength = 0;
729 } 782 }
730 } 783 }
731 784
732 Future<HttpClientResponse> get done { 785 Future<HttpClientResponse> get done {
733 if (_response == null) { 786 if (_response == null) {
734 _response = Future.wait([_responseCompleter.future, super.done]) 787 _response = Future.wait([_responseCompleter.future,
788 super.done])
735 .then((list) => list[0]); 789 .then((list) => list[0]);
736 } 790 }
737 return _response; 791 return _response;
738 } 792 }
739 793
740 Future<HttpClientResponse> consume(Stream<List<int>> stream) {
741 super.consume(stream);
742 return done;
743 }
744
745 Future<HttpClientResponse> close() { 794 Future<HttpClientResponse> close() {
746 super.close(); 795 super.close();
747 return done; 796 return done;
748 } 797 }
749 798
750 int get maxRedirects => _maxRedirects; 799 int get maxRedirects => _maxRedirects;
751 void set maxRedirects(int maxRedirects) { 800 void set maxRedirects(int maxRedirects) {
752 if (_headersWritten) throw new StateError("Request already sent"); 801 if (_headersWritten) throw new StateError("Request already sent");
753 _maxRedirects = maxRedirects; 802 _maxRedirects = maxRedirects;
754 } 803 }
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
830 sb.write(cookies[i].value); 879 sb.write(cookies[i].value);
831 } 880 }
832 headers.add(HttpHeaders.COOKIE, sb.toString()); 881 headers.add(HttpHeaders.COOKIE, sb.toString());
833 } 882 }
834 883
835 headers._finalize(); 884 headers._finalize();
836 885
837 // Write headers. 886 // Write headers.
838 headers._write(buffer); 887 headers._write(buffer);
839 writeCRLF(); 888 writeCRLF();
840 _ioSink.add(buffer.readBytes()); 889 _headersSink.add(buffer.readBytes());
841 } 890 }
842 } 891 }
843 892
844 893
845 // Transformer that transforms data to HTTP Chunked Encoding. 894 // Transformer that transforms data to HTTP Chunked Encoding.
846 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { 895 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> {
847 void handleData(List<int> data, EventSink<List<int>> sink) { 896 void handleData(List<int> data, EventSink<List<int>> sink) {
848 sink.add(_chunkHeader(data.length)); 897 sink.add(_chunkHeader(data.length));
849 if (data.length > 0) sink.add(data); 898 if (data.length > 0) sink.add(data);
850 sink.add(_chunkFooter); 899 sink.add(_chunkFooter);
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
906 " $_bytesWritten bytes written while expected " 955 " $_bytesWritten bytes written while expected "
907 "$expectedContentLength."))); 956 "$expectedContentLength.")));
908 } 957 }
909 sink.close(); 958 sink.close();
910 } 959 }
911 } 960 }
912 961
913 962
914 // Extends StreamConsumer as this is an internal type, only used to pipe to. 963 // Extends StreamConsumer as this is an internal type, only used to pipe to.
915 class _HttpOutgoing implements StreamConsumer<List<int>> { 964 class _HttpOutgoing implements StreamConsumer<List<int>> {
916 Function _onStream; 965 final Completer _doneCompleter = new Completer();
917 final Completer _consumeCompleter = new Completer(); 966 final StreamConsumer _consumer;
918 967
919 Future onStream(Future callback(Stream<List<int>> stream)) { 968 _HttpOutgoing(StreamConsumer this._consumer);
920 _onStream = callback;
921 return _consumeCompleter.future;
922 }
923
924 Future consume(Stream<List<int>> stream) {
925 _onStream(stream)
926 .then((_) => _consumeCompleter.complete(),
927 onError: _consumeCompleter.completeError);
928 // Use .then to ensure a Future branch.
929 return _consumeCompleter.future.then((_) => this);
930 }
931 969
932 Future addStream(Stream<List<int>> stream) { 970 Future addStream(Stream<List<int>> stream) {
933 throw new UnimplementedError("_HttpOutgoing.addStream"); 971 return _consumer.addStream(stream)
972 .catchError((error) {
973 _doneCompleter.completeError(error);
974 throw error;
975 });
934 } 976 }
935 977
936 Future close() { 978 Future close() {
937 throw new UnimplementedError("_HttpOutgoing.close"); 979 _doneCompleter.complete(_consumer);
980 return new Future.immediate(null);
938 } 981 }
982
983 Future get done => _doneCompleter.future;
939 } 984 }
940 985
941 986
942 class _HttpClientConnection { 987 class _HttpClientConnection {
943 final String key; 988 final String key;
944 final Socket _socket; 989 final Socket _socket;
945 final _HttpParser _httpParser; 990 final _HttpParser _httpParser;
946 StreamSubscription _subscription; 991 StreamSubscription _subscription;
947 final _HttpClient _httpClient; 992 final _HttpClient _httpClient;
948 993
949 Completer<_HttpIncoming> _nextResponseCompleter; 994 Completer<_HttpIncoming> _nextResponseCompleter;
950 Future _streamFuture; 995 Future _streamFuture;
951 996
952 _HttpClientConnection(String this.key, 997 _HttpClientConnection(String this.key,
953 Socket this._socket, 998 Socket this._socket,
954 _HttpClient this._httpClient) 999 _HttpClient this._httpClient)
955 : _httpParser = new _HttpParser.responseParser() { 1000 : _httpParser = new _HttpParser.responseParser() {
956 _socket.pipe(_httpParser); 1001 _socket.pipe(_httpParser);
957 _socket.done.catchError((e) { destroy(); });
958 1002
959 // Set up handlers on the parser here, so we are sure to get 'onDone' from 1003 // Set up handlers on the parser here, so we are sure to get 'onDone' from
960 // the parser. 1004 // the parser.
961 _subscription = _httpParser.listen( 1005 _subscription = _httpParser.listen(
962 (incoming) { 1006 (incoming) {
963 // Only handle one incoming response at the time. Keep the 1007 // Only handle one incoming response at the time. Keep the
964 // stream paused until the response have been processed. 1008 // stream paused until the response have been processed.
965 _subscription.pause(); 1009 _subscription.pause();
966 // We assume the response is not here, until we have send the request. 1010 // We assume the response is not here, until we have send the request.
967 assert(_nextResponseCompleter != null); 1011 assert(_nextResponseCompleter != null);
968 var completer = _nextResponseCompleter; 1012 var completer = _nextResponseCompleter;
969 _nextResponseCompleter = null; 1013 _nextResponseCompleter = null;
970 completer.complete(incoming); 1014 completer.complete(incoming);
971 }, 1015 },
972 onError: (error) { 1016 onError: (error) {
973 if (_nextResponseCompleter != null) { 1017 if (_nextResponseCompleter != null) {
974 _nextResponseCompleter.completeError(error); 1018 _nextResponseCompleter.completeError(error);
975 _nextResponseCompleter = null; 1019 _nextResponseCompleter = null;
976 } 1020 }
977 }, 1021 },
978 onDone: () { 1022 onDone: () {
979 close(); 1023 close();
980 }); 1024 });
981 } 1025 }
982 1026
983 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { 1027 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) {
984 // Start with pausing the parser. 1028 // Start with pausing the parser.
985 _subscription.pause(); 1029 _subscription.pause();
986 var outgoing = new _HttpOutgoing(); 1030 var outgoing = new _HttpOutgoing(_socket);
987 // Create new request object, wrapping the outgoing connection. 1031 // Create new request object, wrapping the outgoing connection.
988 var request = new _HttpClientRequest(outgoing, 1032 var request = new _HttpClientRequest(outgoing,
989 uri, 1033 uri,
990 method, 1034 method,
991 !isDirect, 1035 !isDirect,
992 _httpClient, 1036 _httpClient,
993 this); 1037 this);
994 request.headers.host = uri.domain; 1038 request.headers.host = uri.domain;
995 request.headers.port = port; 1039 request.headers.port = port;
996 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); 1040 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip");
997 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 1041 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
998 // If the URL contains user information use that for basic 1042 // If the URL contains user information use that for basic
999 // authorization 1043 // authorization
1000 String auth = 1044 String auth =
1001 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); 1045 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo));
1002 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 1046 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
1003 } else { 1047 } else {
1004 // Look for credentials. 1048 // Look for credentials.
1005 _Credentials cr = _httpClient._findCredentials(uri); 1049 _Credentials cr = _httpClient._findCredentials(uri);
1006 if (cr != null) { 1050 if (cr != null) {
1007 cr.authorize(request); 1051 cr.authorize(request);
1008 } 1052 }
1009 } 1053 }
1010 // Start sending the request (lazy, delayed until the user provides 1054 // Start sending the request (lazy, delayed until the user provides
1011 // data). 1055 // data).
1012 _httpParser.responseToMethod = method; 1056 _httpParser.responseToMethod = method;
1013 _streamFuture = outgoing.onStream((stream) { 1057 _streamFuture = outgoing.done
1014 return _socket.writeStream(stream) 1058 .then((s) {
1015 .then((s) { 1059 // Request sent, set up response completer.
1016 // Request sent, set up response completer. 1060 _nextResponseCompleter = new Completer();
1017 _nextResponseCompleter = new Completer();
1018 1061
1019 // Listen for response. 1062 // Listen for response.
1020 _nextResponseCompleter.future 1063 _nextResponseCompleter.future
1021 .then((incoming) { 1064 .then((incoming) {
1022 incoming.dataDone.then((_) { 1065 incoming.dataDone.then((_) {
1023 if (incoming.headers.persistentConnection && 1066 if (incoming.headers.persistentConnection &&
1024 request.persistentConnection) { 1067 request.persistentConnection) {
1025 // Return connection, now we are done. 1068 // Return connection, now we are done.
1026 _httpClient._returnConnection(this); 1069 _httpClient._returnConnection(this);
1027 _subscription.resume(); 1070 _subscription.resume();
1028 } else { 1071 } else {
1029 destroy();
1030 }
1031 });
1032 request._onIncoming(incoming);
1033 })
1034 // If we see a state error, we failed to get the 'first'
1035 // element.
1036 // Transform the error to a HttpParserException, for
1037 // consistency.
1038 .catchError((error) {
1039 throw new HttpParserException(
1040 "Connection closed before data was received");
1041 }, test: (error) => error is StateError)
1042 .catchError((error) {
1043 // We are done with the socket.
1044 destroy(); 1072 destroy();
1045 request._onError(error); 1073 }
1046 }); 1074 });
1075 request._onIncoming(incoming);
1076 })
1077 // If we see a state error, we failed to get the 'first'
1078 // element.
1079 // Transform the error to a HttpParserException, for
1080 // consistency.
1081 .catchError((error) {
1082 throw new HttpParserException(
1083 "Connection closed before data was received");
1084 }, test: (error) => error is StateError)
1085 .catchError((error) {
1086 // We are done with the socket.
1087 destroy();
1088 request._onError(error);
1089 });
1047 1090
1048 // Resume the parser now we have a handler. 1091 // Resume the parser now we have a handler.
1049 _subscription.resume(); 1092 _subscription.resume();
1050 return s; 1093 return s;
1051 }, onError: (e) { 1094 }, onError: (e) {
1052 destroy(); 1095 destroy();
1053 throw e; 1096 });
1054 });
1055 });
1056 return request; 1097 return request;
1057 } 1098 }
1058 1099
1059 Future<Socket> detachSocket() { 1100 Future<Socket> detachSocket() {
1060 return _streamFuture 1101 return _streamFuture.then(
1061 .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), 1102 (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()));
1062 onError: (_) {});
1063 } 1103 }
1064 1104
1065 void destroy() { 1105 void destroy() {
1066 _httpClient._connectionClosed(this); 1106 _httpClient._connectionClosed(this);
1067 _socket.destroy(); 1107 _socket.destroy();
1068 } 1108 }
1069 1109
1070 void close() { 1110 void close() {
1071 _httpClient._connectionClosed(this); 1111 _httpClient._connectionClosed(this);
1072 _streamFuture 1112 _streamFuture
1073 // TODO(ajohnsen): Add timeout. 1113 // TODO(ajohnsen): Add timeout.
1074 .then((_) => _socket.destroy(), 1114 .then((_) => _socket.destroy());
1075 onError: (_) {});
1076 } 1115 }
1077 1116
1078 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); 1117 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
1079 } 1118 }
1080 1119
1081 class _ConnnectionInfo { 1120 class _ConnnectionInfo {
1082 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); 1121 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy);
1083 final _HttpClientConnection connection; 1122 final _HttpClientConnection connection;
1084 final _Proxy proxy; 1123 final _Proxy proxy;
1085 } 1124 }
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after
1370 final Socket _socket; 1409 final Socket _socket;
1371 final _HttpServer _httpServer; 1410 final _HttpServer _httpServer;
1372 final _HttpParser _httpParser; 1411 final _HttpParser _httpParser;
1373 StreamSubscription _subscription; 1412 StreamSubscription _subscription;
1374 1413
1375 Future _streamFuture; 1414 Future _streamFuture;
1376 1415
1377 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) 1416 _HttpConnection(Socket this._socket, _HttpServer this._httpServer)
1378 : _httpParser = new _HttpParser.requestParser() { 1417 : _httpParser = new _HttpParser.requestParser() {
1379 _socket.pipe(_httpParser); 1418 _socket.pipe(_httpParser);
1380 _socket.done.catchError((e) => destroy());
1381 _subscription = _httpParser.listen( 1419 _subscription = _httpParser.listen(
1382 (incoming) { 1420 (incoming) {
1383 // Only handle one incoming request at the time. Keep the 1421 // Only handle one incoming request at the time. Keep the
1384 // stream paused until the request has been send. 1422 // stream paused until the request has been send.
1385 _subscription.pause(); 1423 _subscription.pause();
1386 _state = _ACTIVE; 1424 _state = _ACTIVE;
1387 var outgoing = new _HttpOutgoing(); 1425 var outgoing = new _HttpOutgoing(_socket);
1388 var response = new _HttpResponse(incoming.headers.protocolVersion, 1426 var response = new _HttpResponse(incoming.headers.protocolVersion,
1389 outgoing); 1427 outgoing);
1390 var request = new _HttpRequest(response, incoming, _httpServer, this); 1428 var request = new _HttpRequest(response, incoming, _httpServer, this);
1391 outgoing.onStream((stream) { 1429 _streamFuture = outgoing.done
1392 return _streamFuture = _socket.writeStream(stream) 1430 .then((_) {
1393 .then((_) { 1431 if (_state == _DETACHED) return;
1394 if (_state == _DETACHED) return; 1432 if (response.persistentConnection &&
1395 if (response.persistentConnection && 1433 request.persistentConnection &&
1396 request.persistentConnection && 1434 incoming.fullBodyRead) {
1397 incoming.fullBodyRead) { 1435 _state = _IDLE;
1398 _state = _IDLE; 1436 // Resume the subscription for incoming requests as the
1399 // Resume the subscription for incoming requests as the 1437 // request is now processed.
1400 // request is now processed. 1438 _subscription.resume();
1401 _subscription.resume(); 1439 } else {
1402 } else { 1440 // Close socket, keep-alive not used or body sent before
1403 // Close socket, keep-alive not used or body sent before 1441 // received data was handled.
1404 // received data was handled.
1405 destroy();
1406 }
1407 })
1408 .catchError((e) {
1409 destroy(); 1442 destroy();
1410 throw e; 1443 }
1411 }); 1444 })
1412 }); 1445 .catchError((e) {
1446 destroy();
1447 });
1413 response._ignoreBody = request.method == "HEAD"; 1448 response._ignoreBody = request.method == "HEAD";
1414 response._httpRequest = request; 1449 response._httpRequest = request;
1415 _httpServer._handleRequest(request); 1450 _httpServer._handleRequest(request);
1416 }, 1451 },
1417 onDone: () { 1452 onDone: () {
1418 destroy(); 1453 destroy();
1419 }, 1454 },
1420 onError: (error) { 1455 onError: (error) {
1421 _httpServer._handleError(error); 1456 _httpServer._handleError(error);
1422 destroy(); 1457 destroy();
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after
1684 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); 1719 void writeCharCode(int charCode) => _socket.writeCharCode(charCode);
1685 1720
1686 void writeAll(Iterable objects, [String separator = ""]) { 1721 void writeAll(Iterable objects, [String separator = ""]) {
1687 _socket.writeAll(objects, separator); 1722 _socket.writeAll(objects, separator);
1688 } 1723 }
1689 1724
1690 void add(List<int> bytes) => _socket.add(bytes); 1725 void add(List<int> bytes) => _socket.add(bytes);
1691 1726
1692 void addError(AsyncError error) => _socket.addError(error); 1727 void addError(AsyncError error) => _socket.addError(error);
1693 1728
1694 Future<Socket> consume(Stream<List<int>> stream) {
1695 return _socket.consume(stream);
1696 }
1697
1698 Future<Socket> addStream(Stream<List<int>> stream) { 1729 Future<Socket> addStream(Stream<List<int>> stream) {
1699 return _socket.addStream(stream); 1730 return _socket.addStream(stream);
1700 } 1731 }
1701 1732
1702 Future<Socket> writeStream(Stream<List<int>> stream) {
1703 return _socket.writeStream(stream);
1704 }
1705
1706 void destroy() => _socket.destroy(); 1733 void destroy() => _socket.destroy();
1707 1734
1708 Future close() => _socket.close(); 1735 Future close() => _socket.close();
1709 1736
1710 Future<Socket> get done => _socket.done; 1737 Future<Socket> get done => _socket.done;
1711 1738
1712 int get port => _socket.port; 1739 int get port => _socket.port;
1713 1740
1714 String get remoteHost => _socket.remoteHost; 1741 String get remoteHost => _socket.remoteHost;
1715 1742
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
1826 1853
1827 1854
1828 class _RedirectInfo implements RedirectInfo { 1855 class _RedirectInfo implements RedirectInfo {
1829 const _RedirectInfo(int this.statusCode, 1856 const _RedirectInfo(int this.statusCode,
1830 String this.method, 1857 String this.method,
1831 Uri this.location); 1858 Uri this.location);
1832 final int statusCode; 1859 final int statusCode;
1833 final String method; 1860 final String method;
1834 final Uri location; 1861 final Uri location;
1835 } 1862 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698