OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
8 final int _transferLength; | 8 final int _transferLength; |
9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
319 return new Future.immediate(this); | 319 return new Future.immediate(this); |
320 } | 320 } |
321 } | 321 } |
322 | 322 |
323 | 323 |
324 abstract class _HttpOutboundMessage<T> implements IOSink { | 324 abstract class _HttpOutboundMessage<T> implements IOSink { |
325 // Used to mark when the body should be written. This is used for HEAD | 325 // Used to mark when the body should be written. This is used for HEAD |
326 // requests and in error handling. | 326 // requests and in error handling. |
327 bool _ignoreBody = false; | 327 bool _ignoreBody = false; |
328 bool _headersWritten = false; | 328 bool _headersWritten = false; |
| 329 bool _asGZip = false; |
329 | 330 |
330 IOSink _ioSink; | 331 IOSink _headersSink; |
| 332 IOSink _dataSink; |
| 333 |
331 final _HttpOutgoing _outgoing; | 334 final _HttpOutgoing _outgoing; |
332 | 335 |
333 final _HttpHeaders headers; | 336 final _HttpHeaders headers; |
334 | 337 |
335 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) | 338 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
336 : _outgoing = outgoing, | 339 : _outgoing = outgoing, |
337 _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), | 340 _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
338 headers = new _HttpHeaders(protocolVersion); | 341 headers = new _HttpHeaders(protocolVersion) { |
| 342 _dataSink = new IOSink( |
| 343 new _HttpOutboundConsumer(_headersSink, _addStream, this)); |
| 344 } |
339 | 345 |
340 int get contentLength => headers.contentLength; | 346 int get contentLength => headers.contentLength; |
341 void set contentLength(int contentLength) { | 347 void set contentLength(int contentLength) { |
342 headers.contentLength = contentLength; | 348 headers.contentLength = contentLength; |
343 } | 349 } |
344 | 350 |
345 bool get persistentConnection => headers.persistentConnection; | 351 bool get persistentConnection => headers.persistentConnection; |
346 void set persistentConnection(bool p) { | 352 void set persistentConnection(bool p) { |
347 headers.persistentConnection = p; | 353 headers.persistentConnection = p; |
348 } | 354 } |
(...skipping 20 matching lines...) Expand all Loading... |
369 String string; | 375 String string; |
370 if (obj is String) { | 376 if (obj is String) { |
371 string = obj; | 377 string = obj; |
372 } else { | 378 } else { |
373 string = obj.toString(); | 379 string = obj.toString(); |
374 if (string is! String) { | 380 if (string is! String) { |
375 throw new ArgumentError('toString() did not return a string'); | 381 throw new ArgumentError('toString() did not return a string'); |
376 } | 382 } |
377 } | 383 } |
378 if (string.isEmpty) return; | 384 if (string.isEmpty) return; |
379 _ioSink.write(string); | 385 _dataSink.write(string); |
380 } | 386 } |
381 | 387 |
382 void writeAll(Iterable objects, [String separator = ""]) { | 388 void writeAll(Iterable objects, [String separator = ""]) { |
383 bool isFirst = true; | 389 bool isFirst = true; |
384 for (Object obj in objects) { | 390 for (Object obj in objects) { |
385 if (isFirst) { | 391 if (isFirst) { |
386 isFirst = false; | 392 isFirst = false; |
387 } else { | 393 } else { |
388 if (!separator.isEmpty) write(separator); | 394 if (!separator.isEmpty) write(separator); |
389 } | 395 } |
390 write(obj); | 396 write(obj); |
391 } | 397 } |
392 } | 398 } |
393 | 399 |
394 void writeln([Object obj = ""]) { | 400 void writeln([Object obj = ""]) { |
395 write(obj); | 401 write(obj); |
396 write("\n"); | 402 write("\n"); |
397 } | 403 } |
398 | 404 |
399 void writeCharCode(int charCode) { | 405 void writeCharCode(int charCode) { |
400 write(new String.fromCharCode(charCode)); | 406 write(new String.fromCharCode(charCode)); |
401 } | 407 } |
402 | 408 |
403 void add(List<int> data) { | 409 void add(List<int> data) { |
404 _writeHeaders(); | 410 _writeHeaders(); |
405 if (data.length == 0) return; | 411 if (data.length == 0) return; |
406 _ioSink.add(data); | 412 _dataSink.add(data); |
407 } | 413 } |
408 | 414 |
409 void addError(AsyncError error) { | 415 void addError(AsyncError error) { |
410 _writeHeaders(); | 416 _writeHeaders(); |
411 _ioSink.addError(error); | 417 _dataSink.addError(error); |
412 } | |
413 | |
414 Future<T> consume(Stream<List<int>> stream) { | |
415 _writeHeaders(); | |
416 return _ioSink.consume(stream); | |
417 } | 418 } |
418 | 419 |
419 Future<T> addStream(Stream<List<int>> stream) { | 420 Future<T> addStream(Stream<List<int>> stream) { |
420 _writeHeaders(); | 421 _writeHeaders(); |
421 return _ioSink.writeStream(stream).then((_) => this); | 422 return _dataSink.addStream(stream); |
422 } | |
423 | |
424 Future<T> writeStream(Stream<List<int>> stream) { | |
425 return addStream(stream); | |
426 } | 423 } |
427 | 424 |
428 Future close() { | 425 Future close() { |
429 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and | 426 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and |
430 // persistentConnection is not guaranteed to be in sync. | 427 // persistentConnection is not guaranteed to be in sync. |
431 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { | 428 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { |
432 // If no body was written, _ignoreBody is false (it's not a HEAD | 429 // If no body was written, _ignoreBody is false (it's not a HEAD |
433 // request) and the content-length is unspecified, set contentLength to 0. | 430 // request) and the content-length is unspecified, set contentLength to 0. |
434 headers.chunkedTransferEncoding = false; | 431 headers.chunkedTransferEncoding = false; |
435 headers.contentLength = 0; | 432 headers.contentLength = 0; |
436 } | 433 } |
437 _writeHeaders(); | 434 _writeHeaders(); |
438 return _ioSink.close(); | 435 return _dataSink.close(); |
439 } | 436 } |
440 | 437 |
441 Future<T> get done { | 438 Future<T> get done => _dataSink.done.then((_) => this); |
442 _writeHeaders(); | |
443 return _ioSink.done; | |
444 } | |
445 | 439 |
446 void _writeHeaders() { | 440 void _writeHeaders() { |
447 if (_headersWritten) return; | 441 if (_headersWritten) return; |
448 _headersWritten = true; | 442 _headersWritten = true; |
449 _ioSink.encoding = Encoding.ASCII; | |
450 headers._synchronize(); // Be sure the 'chunked' option is updated. | 443 headers._synchronize(); // Be sure the 'chunked' option is updated. |
451 bool asGZip = false; | |
452 bool isServerSide = this is _HttpResponse; | 444 bool isServerSide = this is _HttpResponse; |
453 if (isServerSide && headers.chunkedTransferEncoding) { | 445 if (isServerSide && headers.chunkedTransferEncoding) { |
454 var response = this; | 446 var response = this; |
455 List acceptEncodings = | 447 List acceptEncodings = |
456 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; | 448 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
457 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; | 449 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
458 if (acceptEncodings != null && | 450 if (acceptEncodings != null && |
459 acceptEncodings | 451 acceptEncodings |
460 .expand((list) => list.split(",")) | 452 .expand((list) => list.split(",")) |
461 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && | 453 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
462 contentEncoding == null) { | 454 contentEncoding == null) { |
463 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); | 455 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
464 asGZip = true; | 456 _asGZip = true; |
465 } | 457 } |
466 } | 458 } |
467 _writeHeader(); | 459 _writeHeader(); |
468 _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip)); | |
469 _ioSink.encoding = encoding; | |
470 } | 460 } |
471 | 461 |
472 Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { | 462 Future _addStream(IOSink ioSink, Stream<List<int>> stream) { |
473 int contentLength = headers.contentLength; | 463 int contentLength = headers.contentLength; |
474 if (_ignoreBody) { | 464 if (_ignoreBody) { |
475 ioSink.close(); | 465 stream.fold(null, (x, y) {}).catchError((_) {}); |
476 return stream.fold(null, (x, y) {}).then((_) => this); | 466 return ioSink.close().then((_) => this); |
477 } | 467 } |
478 stream = stream.transform(new _BufferTransformer()); | 468 stream = stream.transform(new _BufferTransformer()); |
479 if (headers.chunkedTransferEncoding) { | 469 if (headers.chunkedTransferEncoding) { |
480 if (asGZip) { | 470 if (_asGZip) { |
481 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); | 471 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
482 } | 472 } |
483 stream = stream.transform(new _ChunkedTransformer()); | 473 stream = stream.transform(new _ChunkedTransformer()); |
484 } else if (contentLength >= 0) { | 474 } else if (contentLength >= 0) { |
485 stream = stream.transform(new _ContentLengthValidator(contentLength)); | 475 stream = stream.transform(new _ContentLengthValidator(contentLength)); |
486 } | 476 } |
487 return stream.pipe(ioSink).then((_) => this); | 477 return ioSink.addStream(stream).then((_) => this); |
488 } | 478 } |
489 | 479 |
490 void _writeHeader(); // TODO(ajohnsen): Better name. | 480 void _writeHeader(); // TODO(ajohnsen): Better name. |
491 } | 481 } |
492 | 482 |
493 | 483 |
494 class _HttpOutboundConsumer implements StreamConsumer { | 484 class _HttpOutboundConsumer implements StreamConsumer { |
495 Function _consume; | 485 StreamController _controller; |
| 486 StreamSubscription _subscription; |
| 487 Function _addStream; |
496 IOSink _ioSink; | 488 IOSink _ioSink; |
497 bool _asGZip; | 489 Completer _closeCompleter = new Completer(); |
| 490 Completer _completer; |
| 491 |
| 492 _HttpOutboundMessage _outbound; |
498 _HttpOutboundConsumer(IOSink this._ioSink, | 493 _HttpOutboundConsumer(IOSink this._ioSink, |
499 Function this._consume, | 494 Function this._addStream, |
500 bool this._asGZip); | 495 _HttpOutboundMessage this._outbound); |
501 | 496 |
502 Future consume(var stream) => _consume(_ioSink, stream, _asGZip); | 497 void _onPause() { |
| 498 if (_controller.isPaused) { |
| 499 _subscription.pause(); |
| 500 } else { |
| 501 _subscription.resume(); |
| 502 } |
| 503 } |
| 504 |
| 505 void _onListen() { |
| 506 if (!_controller.hasListener && _subscription != null) { |
| 507 _subscription.cancel(); |
| 508 } |
| 509 } |
| 510 |
| 511 _ensureController() { |
| 512 if (_controller != null) return; |
| 513 _controller = new StreamController(onPauseStateChange: _onPause, |
| 514 onSubscriptionStateChange: _onListen); |
| 515 _addStream(_ioSink, _controller.stream) |
| 516 .then((_) { |
| 517 _done(); |
| 518 _closeCompleter.complete(_outbound); |
| 519 }, |
| 520 onError: (error) { |
| 521 if (!_done(error)) { |
| 522 _closeCompleter.completeError(error); |
| 523 } |
| 524 }); |
| 525 } |
| 526 |
| 527 bool _done([error]) { |
| 528 if (_completer == null) return false; |
| 529 var tmp = _completer; |
| 530 _completer = null; |
| 531 if (error != null) { |
| 532 tmp.completeError(error); |
| 533 } else { |
| 534 tmp.complete(_outbound); |
| 535 } |
| 536 return true; |
| 537 } |
503 | 538 |
504 Future addStream(var stream) { | 539 Future addStream(var stream) { |
505 throw new UnimplementedError("_HttpOutboundConsumer.addStream"); | 540 _ensureController(); |
| 541 _completer = new Completer(); |
| 542 _subscription = stream.listen( |
| 543 (data) { |
| 544 _controller.add(data); |
| 545 }, |
| 546 onDone: () { |
| 547 _done(); |
| 548 }, |
| 549 onError: (error) { |
| 550 _done(error); |
| 551 }, |
| 552 unsubscribeOnError: true); |
| 553 return _completer.future; |
506 } | 554 } |
507 | 555 |
508 Future close() { | 556 Future close() { |
509 throw new UnimplementedError("_HttpOutboundConsumer.close"); | 557 _ensureController(); |
| 558 _controller.close(); |
| 559 return _closeCompleter.future.then((_) { |
| 560 return _ioSink.close().then((_) => _outbound); |
| 561 }); |
510 } | 562 } |
511 } | 563 } |
512 | 564 |
513 | 565 |
514 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { | 566 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { |
515 const int MIN_CHUNK_SIZE = 4 * 1024; | 567 const int MIN_CHUNK_SIZE = 4 * 1024; |
516 const int MAX_BUFFER_SIZE = 16 * 1024; | 568 const int MAX_BUFFER_SIZE = 16 * 1024; |
517 | 569 |
518 final _BufferList _buffer = new _BufferList(); | 570 final _BufferList _buffer = new _BufferList(); |
519 | 571 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
567 _reasonPhrase = reasonPhrase; | 619 _reasonPhrase = reasonPhrase; |
568 } | 620 } |
569 | 621 |
570 Future<Socket> detachSocket() { | 622 Future<Socket> detachSocket() { |
571 if (_headersWritten) throw new StateError("Headers already sent"); | 623 if (_headersWritten) throw new StateError("Headers already sent"); |
572 _writeHeaders(); | 624 _writeHeaders(); |
573 var future = _httpRequest._httpConnection.detachSocket(); | 625 var future = _httpRequest._httpConnection.detachSocket(); |
574 // Close connection so the socket is 'free'. | 626 // Close connection so the socket is 'free'. |
575 close(); | 627 close(); |
576 done.catchError((_) { | 628 done.catchError((_) { |
577 // Catch any error on done, as they automatically will be propegated to | 629 // Catch any error on done, as they automatically will be |
578 // the websocket. | 630 // propagated to the websocket. |
579 }); | 631 }); |
580 return future; | 632 return future; |
581 } | 633 } |
582 | 634 |
583 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; | 635 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
584 | 636 |
585 void _writeHeader() { | 637 void _writeHeader() { |
586 var buffer = new _BufferList(); | 638 var buffer = new _BufferList(); |
587 writeSP() => buffer.add(const [_CharCode.SP]); | 639 writeSP() => buffer.add(const [_CharCode.SP]); |
588 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); | 640 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
625 _cookies.forEach((cookie) { | 677 _cookies.forEach((cookie) { |
626 headers.add(HttpHeaders.SET_COOKIE, cookie); | 678 headers.add(HttpHeaders.SET_COOKIE, cookie); |
627 }); | 679 }); |
628 } | 680 } |
629 | 681 |
630 headers._finalize(); | 682 headers._finalize(); |
631 | 683 |
632 // Write headers. | 684 // Write headers. |
633 headers._write(buffer); | 685 headers._write(buffer); |
634 writeCRLF(); | 686 writeCRLF(); |
635 _ioSink.add(buffer.readBytes()); | 687 _headersSink.add(buffer.readBytes()); |
636 } | 688 } |
637 | 689 |
638 String _findReasonPhrase(int statusCode) { | 690 String _findReasonPhrase(int statusCode) { |
639 if (_reasonPhrase != null) { | 691 if (_reasonPhrase != null) { |
640 return _reasonPhrase; | 692 return _reasonPhrase; |
641 } | 693 } |
642 | 694 |
643 switch (statusCode) { | 695 switch (statusCode) { |
644 case HttpStatus.CONTINUE: return "Continue"; | 696 case HttpStatus.CONTINUE: return "Continue"; |
645 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; | 697 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
724 _HttpClientConnection this._httpClientConnection) | 776 _HttpClientConnection this._httpClientConnection) |
725 : super("1.1", outgoing) { | 777 : super("1.1", outgoing) { |
726 // GET and HEAD have 'content-length: 0' by default. | 778 // GET and HEAD have 'content-length: 0' by default. |
727 if (method == "GET" || method == "HEAD") { | 779 if (method == "GET" || method == "HEAD") { |
728 contentLength = 0; | 780 contentLength = 0; |
729 } | 781 } |
730 } | 782 } |
731 | 783 |
732 Future<HttpClientResponse> get done { | 784 Future<HttpClientResponse> get done { |
733 if (_response == null) { | 785 if (_response == null) { |
734 _response = Future.wait([_responseCompleter.future, super.done]) | 786 _response = Future.wait([_responseCompleter.future, |
| 787 super.done]) |
735 .then((list) => list[0]); | 788 .then((list) => list[0]); |
736 } | 789 } |
737 return _response; | 790 return _response; |
738 } | 791 } |
739 | 792 |
740 Future<HttpClientResponse> consume(Stream<List<int>> stream) { | |
741 super.consume(stream); | |
742 return done; | |
743 } | |
744 | |
745 Future<HttpClientResponse> close() { | 793 Future<HttpClientResponse> close() { |
746 super.close(); | 794 super.close(); |
747 return done; | 795 return done; |
748 } | 796 } |
749 | 797 |
750 int get maxRedirects => _maxRedirects; | 798 int get maxRedirects => _maxRedirects; |
751 void set maxRedirects(int maxRedirects) { | 799 void set maxRedirects(int maxRedirects) { |
752 if (_headersWritten) throw new StateError("Request already sent"); | 800 if (_headersWritten) throw new StateError("Request already sent"); |
753 _maxRedirects = maxRedirects; | 801 _maxRedirects = maxRedirects; |
754 } | 802 } |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
830 sb.write(cookies[i].value); | 878 sb.write(cookies[i].value); |
831 } | 879 } |
832 headers.add(HttpHeaders.COOKIE, sb.toString()); | 880 headers.add(HttpHeaders.COOKIE, sb.toString()); |
833 } | 881 } |
834 | 882 |
835 headers._finalize(); | 883 headers._finalize(); |
836 | 884 |
837 // Write headers. | 885 // Write headers. |
838 headers._write(buffer); | 886 headers._write(buffer); |
839 writeCRLF(); | 887 writeCRLF(); |
840 _ioSink.add(buffer.readBytes()); | 888 _headersSink.add(buffer.readBytes()); |
841 } | 889 } |
842 } | 890 } |
843 | 891 |
844 | 892 |
845 // Transformer that transforms data to HTTP Chunked Encoding. | 893 // Transformer that transforms data to HTTP Chunked Encoding. |
846 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { | 894 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
847 void handleData(List<int> data, EventSink<List<int>> sink) { | 895 void handleData(List<int> data, EventSink<List<int>> sink) { |
848 sink.add(_chunkHeader(data.length)); | 896 sink.add(_chunkHeader(data.length)); |
849 if (data.length > 0) sink.add(data); | 897 if (data.length > 0) sink.add(data); |
850 sink.add(_chunkFooter); | 898 sink.add(_chunkFooter); |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
906 " $_bytesWritten bytes written while expected " | 954 " $_bytesWritten bytes written while expected " |
907 "$expectedContentLength."))); | 955 "$expectedContentLength."))); |
908 } | 956 } |
909 sink.close(); | 957 sink.close(); |
910 } | 958 } |
911 } | 959 } |
912 | 960 |
913 | 961 |
914 // Extends StreamConsumer as this is an internal type, only used to pipe to. | 962 // Extends StreamConsumer as this is an internal type, only used to pipe to. |
915 class _HttpOutgoing implements StreamConsumer<List<int>> { | 963 class _HttpOutgoing implements StreamConsumer<List<int>> { |
916 Function _onStream; | 964 final Completer _doneCompleter = new Completer(); |
917 final Completer _consumeCompleter = new Completer(); | 965 final StreamConsumer _consumer; |
918 | 966 |
919 Future onStream(Future callback(Stream<List<int>> stream)) { | 967 _HttpOutgoing(StreamConsumer this._consumer); |
920 _onStream = callback; | |
921 return _consumeCompleter.future; | |
922 } | |
923 | |
924 Future consume(Stream<List<int>> stream) { | |
925 _onStream(stream) | |
926 .then((_) => _consumeCompleter.complete(), | |
927 onError: _consumeCompleter.completeError); | |
928 // Use .then to ensure a Future branch. | |
929 return _consumeCompleter.future.then((_) => this); | |
930 } | |
931 | 968 |
932 Future addStream(Stream<List<int>> stream) { | 969 Future addStream(Stream<List<int>> stream) { |
933 throw new UnimplementedError("_HttpOutgoing.addStream"); | 970 return _consumer.addStream(stream) |
| 971 .catchError((error) { |
| 972 _doneCompleter.completeError(error); |
| 973 throw error; |
| 974 }); |
934 } | 975 } |
935 | 976 |
936 Future close() { | 977 Future close() { |
937 throw new UnimplementedError("_HttpOutgoing.close"); | 978 _doneCompleter.complete(_consumer); |
| 979 return new Future.immediate(null); |
938 } | 980 } |
| 981 |
| 982 Future get done => _doneCompleter.future; |
939 } | 983 } |
940 | 984 |
941 | 985 |
942 class _HttpClientConnection { | 986 class _HttpClientConnection { |
943 final String key; | 987 final String key; |
944 final Socket _socket; | 988 final Socket _socket; |
945 final _HttpParser _httpParser; | 989 final _HttpParser _httpParser; |
946 StreamSubscription _subscription; | 990 StreamSubscription _subscription; |
947 final _HttpClient _httpClient; | 991 final _HttpClient _httpClient; |
948 | 992 |
949 Completer<_HttpIncoming> _nextResponseCompleter; | 993 Completer<_HttpIncoming> _nextResponseCompleter; |
950 Future _streamFuture; | 994 Future _streamFuture; |
951 | 995 |
952 _HttpClientConnection(String this.key, | 996 _HttpClientConnection(String this.key, |
953 Socket this._socket, | 997 Socket this._socket, |
954 _HttpClient this._httpClient) | 998 _HttpClient this._httpClient) |
955 : _httpParser = new _HttpParser.responseParser() { | 999 : _httpParser = new _HttpParser.responseParser() { |
956 _socket.pipe(_httpParser); | 1000 _socket.pipe(_httpParser); |
957 _socket.done.catchError((e) { destroy(); }); | |
958 | 1001 |
959 // Set up handlers on the parser here, so we are sure to get 'onDone' from | 1002 // Set up handlers on the parser here, so we are sure to get 'onDone' from |
960 // the parser. | 1003 // the parser. |
961 _subscription = _httpParser.listen( | 1004 _subscription = _httpParser.listen( |
962 (incoming) { | 1005 (incoming) { |
963 // Only handle one incoming response at the time. Keep the | 1006 // Only handle one incoming response at the time. Keep the |
964 // stream paused until the response have been processed. | 1007 // stream paused until the response have been processed. |
965 _subscription.pause(); | 1008 _subscription.pause(); |
966 // We assume the response is not here, until we have send the request. | 1009 // We assume the response is not here, until we have send the request. |
967 assert(_nextResponseCompleter != null); | 1010 assert(_nextResponseCompleter != null); |
968 var completer = _nextResponseCompleter; | 1011 var completer = _nextResponseCompleter; |
969 _nextResponseCompleter = null; | 1012 _nextResponseCompleter = null; |
970 completer.complete(incoming); | 1013 completer.complete(incoming); |
971 }, | 1014 }, |
972 onError: (error) { | 1015 onError: (error) { |
973 if (_nextResponseCompleter != null) { | 1016 if (_nextResponseCompleter != null) { |
974 _nextResponseCompleter.completeError(error); | 1017 _nextResponseCompleter.completeError(error); |
975 _nextResponseCompleter = null; | 1018 _nextResponseCompleter = null; |
976 } | 1019 } |
977 }, | 1020 }, |
978 onDone: () { | 1021 onDone: () { |
979 close(); | 1022 close(); |
980 }); | 1023 }); |
981 } | 1024 } |
982 | 1025 |
983 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { | 1026 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { |
984 // Start with pausing the parser. | 1027 // Start with pausing the parser. |
985 _subscription.pause(); | 1028 _subscription.pause(); |
986 var outgoing = new _HttpOutgoing(); | 1029 var outgoing = new _HttpOutgoing(_socket); |
987 // Create new request object, wrapping the outgoing connection. | 1030 // Create new request object, wrapping the outgoing connection. |
988 var request = new _HttpClientRequest(outgoing, | 1031 var request = new _HttpClientRequest(outgoing, |
989 uri, | 1032 uri, |
990 method, | 1033 method, |
991 !isDirect, | 1034 !isDirect, |
992 _httpClient, | 1035 _httpClient, |
993 this); | 1036 this); |
994 request.headers.host = uri.domain; | 1037 request.headers.host = uri.domain; |
995 request.headers.port = port; | 1038 request.headers.port = port; |
996 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); | 1039 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); |
997 if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 1040 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
998 // If the URL contains user information use that for basic | 1041 // If the URL contains user information use that for basic |
999 // authorization | 1042 // authorization |
1000 String auth = | 1043 String auth = |
1001 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); | 1044 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); |
1002 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 1045 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
1003 } else { | 1046 } else { |
1004 // Look for credentials. | 1047 // Look for credentials. |
1005 _Credentials cr = _httpClient._findCredentials(uri); | 1048 _Credentials cr = _httpClient._findCredentials(uri); |
1006 if (cr != null) { | 1049 if (cr != null) { |
1007 cr.authorize(request); | 1050 cr.authorize(request); |
1008 } | 1051 } |
1009 } | 1052 } |
1010 // Start sending the request (lazy, delayed until the user provides | 1053 // Start sending the request (lazy, delayed until the user provides |
1011 // data). | 1054 // data). |
1012 _httpParser.responseToMethod = method; | 1055 _httpParser.responseToMethod = method; |
1013 _streamFuture = outgoing.onStream((stream) { | 1056 _streamFuture = outgoing.done |
1014 return _socket.writeStream(stream) | 1057 .then((s) { |
1015 .then((s) { | 1058 // Request sent, set up response completer. |
1016 // Request sent, set up response completer. | 1059 _nextResponseCompleter = new Completer(); |
1017 _nextResponseCompleter = new Completer(); | |
1018 | 1060 |
1019 // Listen for response. | 1061 // Listen for response. |
1020 _nextResponseCompleter.future | 1062 _nextResponseCompleter.future |
1021 .then((incoming) { | 1063 .then((incoming) { |
1022 incoming.dataDone.then((_) { | 1064 incoming.dataDone.then((_) { |
1023 if (incoming.headers.persistentConnection && | 1065 if (incoming.headers.persistentConnection && |
1024 request.persistentConnection) { | 1066 request.persistentConnection) { |
1025 // Return connection, now we are done. | 1067 // Return connection, now we are done. |
1026 _httpClient._returnConnection(this); | 1068 _httpClient._returnConnection(this); |
1027 _subscription.resume(); | 1069 _subscription.resume(); |
1028 } else { | 1070 } else { |
1029 destroy(); | |
1030 } | |
1031 }); | |
1032 request._onIncoming(incoming); | |
1033 }) | |
1034 // If we see a state error, we failed to get the 'first' | |
1035 // element. | |
1036 // Transform the error to a HttpParserException, for | |
1037 // consistency. | |
1038 .catchError((error) { | |
1039 throw new HttpParserException( | |
1040 "Connection closed before data was received"); | |
1041 }, test: (error) => error is StateError) | |
1042 .catchError((error) { | |
1043 // We are done with the socket. | |
1044 destroy(); | 1071 destroy(); |
1045 request._onError(error); | 1072 } |
1046 }); | 1073 }); |
| 1074 request._onIncoming(incoming); |
| 1075 }) |
| 1076 // If we see a state error, we failed to get the 'first' |
| 1077 // element. |
| 1078 // Transform the error to a HttpParserException, for |
| 1079 // consistency. |
| 1080 .catchError((error) { |
| 1081 throw new HttpParserException( |
| 1082 "Connection closed before data was received"); |
| 1083 }, test: (error) => error is StateError) |
| 1084 .catchError((error) { |
| 1085 // We are done with the socket. |
| 1086 destroy(); |
| 1087 request._onError(error); |
| 1088 }); |
1047 | 1089 |
1048 // Resume the parser now we have a handler. | 1090 // Resume the parser now we have a handler. |
1049 _subscription.resume(); | 1091 _subscription.resume(); |
1050 return s; | 1092 return s; |
1051 }, onError: (e) { | 1093 }, onError: (e) { |
1052 destroy(); | 1094 destroy(); |
1053 throw e; | 1095 }); |
1054 }); | |
1055 }); | |
1056 return request; | 1096 return request; |
1057 } | 1097 } |
1058 | 1098 |
1059 Future<Socket> detachSocket() { | 1099 Future<Socket> detachSocket() { |
1060 return _streamFuture | 1100 return _streamFuture.then( |
1061 .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), | 1101 (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
1062 onError: (_) {}); | |
1063 } | 1102 } |
1064 | 1103 |
1065 void destroy() { | 1104 void destroy() { |
1066 _httpClient._connectionClosed(this); | 1105 _httpClient._connectionClosed(this); |
1067 _socket.destroy(); | 1106 _socket.destroy(); |
1068 } | 1107 } |
1069 | 1108 |
1070 void close() { | 1109 void close() { |
1071 _httpClient._connectionClosed(this); | 1110 _httpClient._connectionClosed(this); |
1072 _streamFuture | 1111 _streamFuture |
1073 // TODO(ajohnsen): Add timeout. | 1112 // TODO(ajohnsen): Add timeout. |
1074 .then((_) => _socket.destroy(), | 1113 .then((_) => _socket.destroy()); |
1075 onError: (_) {}); | |
1076 } | 1114 } |
1077 | 1115 |
1078 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); | 1116 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
1079 } | 1117 } |
1080 | 1118 |
1081 class _ConnnectionInfo { | 1119 class _ConnnectionInfo { |
1082 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); | 1120 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); |
1083 final _HttpClientConnection connection; | 1121 final _HttpClientConnection connection; |
1084 final _Proxy proxy; | 1122 final _Proxy proxy; |
1085 } | 1123 } |
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1370 final Socket _socket; | 1408 final Socket _socket; |
1371 final _HttpServer _httpServer; | 1409 final _HttpServer _httpServer; |
1372 final _HttpParser _httpParser; | 1410 final _HttpParser _httpParser; |
1373 StreamSubscription _subscription; | 1411 StreamSubscription _subscription; |
1374 | 1412 |
1375 Future _streamFuture; | 1413 Future _streamFuture; |
1376 | 1414 |
1377 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) | 1415 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
1378 : _httpParser = new _HttpParser.requestParser() { | 1416 : _httpParser = new _HttpParser.requestParser() { |
1379 _socket.pipe(_httpParser); | 1417 _socket.pipe(_httpParser); |
1380 _socket.done.catchError((e) => destroy()); | |
1381 _subscription = _httpParser.listen( | 1418 _subscription = _httpParser.listen( |
1382 (incoming) { | 1419 (incoming) { |
1383 // Only handle one incoming request at the time. Keep the | 1420 // Only handle one incoming request at the time. Keep the |
1384 // stream paused until the request has been send. | 1421 // stream paused until the request has been send. |
1385 _subscription.pause(); | 1422 _subscription.pause(); |
1386 _state = _ACTIVE; | 1423 _state = _ACTIVE; |
1387 var outgoing = new _HttpOutgoing(); | 1424 var outgoing = new _HttpOutgoing(_socket); |
1388 var response = new _HttpResponse(incoming.headers.protocolVersion, | 1425 var response = new _HttpResponse(incoming.headers.protocolVersion, |
1389 outgoing); | 1426 outgoing); |
1390 var request = new _HttpRequest(response, incoming, _httpServer, this); | 1427 var request = new _HttpRequest(response, incoming, _httpServer, this); |
1391 outgoing.onStream((stream) { | 1428 _streamFuture = outgoing.done |
1392 return _streamFuture = _socket.writeStream(stream) | 1429 .then((_) { |
1393 .then((_) { | 1430 if (_state == _DETACHED) return; |
1394 if (_state == _DETACHED) return; | 1431 if (response.persistentConnection && |
1395 if (response.persistentConnection && | 1432 request.persistentConnection && |
1396 request.persistentConnection && | 1433 incoming.fullBodyRead) { |
1397 incoming.fullBodyRead) { | 1434 _state = _IDLE; |
1398 _state = _IDLE; | 1435 // Resume the subscription for incoming requests as the |
1399 // Resume the subscription for incoming requests as the | 1436 // request is now processed. |
1400 // request is now processed. | 1437 _subscription.resume(); |
1401 _subscription.resume(); | 1438 } else { |
1402 } else { | 1439 // Close socket, keep-alive not used or body sent before |
1403 // Close socket, keep-alive not used or body sent before | 1440 // received data was handled. |
1404 // received data was handled. | |
1405 destroy(); | |
1406 } | |
1407 }) | |
1408 .catchError((e) { | |
1409 destroy(); | 1441 destroy(); |
1410 throw e; | 1442 } |
1411 }); | 1443 }) |
1412 }); | 1444 .catchError((e) { |
| 1445 destroy(); |
| 1446 }); |
1413 response._ignoreBody = request.method == "HEAD"; | 1447 response._ignoreBody = request.method == "HEAD"; |
1414 response._httpRequest = request; | 1448 response._httpRequest = request; |
1415 _httpServer._handleRequest(request); | 1449 _httpServer._handleRequest(request); |
1416 }, | 1450 }, |
1417 onDone: () { | 1451 onDone: () { |
1418 destroy(); | 1452 destroy(); |
1419 }, | 1453 }, |
1420 onError: (error) { | 1454 onError: (error) { |
1421 _httpServer._handleError(error); | 1455 _httpServer._handleError(error); |
1422 destroy(); | 1456 destroy(); |
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1684 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); | 1718 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); |
1685 | 1719 |
1686 void writeAll(Iterable objects, [String separator = ""]) { | 1720 void writeAll(Iterable objects, [String separator = ""]) { |
1687 _socket.writeAll(objects, separator); | 1721 _socket.writeAll(objects, separator); |
1688 } | 1722 } |
1689 | 1723 |
1690 void add(List<int> bytes) => _socket.add(bytes); | 1724 void add(List<int> bytes) => _socket.add(bytes); |
1691 | 1725 |
1692 void addError(AsyncError error) => _socket.addError(error); | 1726 void addError(AsyncError error) => _socket.addError(error); |
1693 | 1727 |
1694 Future<Socket> consume(Stream<List<int>> stream) { | |
1695 return _socket.consume(stream); | |
1696 } | |
1697 | |
1698 Future<Socket> addStream(Stream<List<int>> stream) { | 1728 Future<Socket> addStream(Stream<List<int>> stream) { |
1699 return _socket.addStream(stream); | 1729 return _socket.addStream(stream); |
1700 } | 1730 } |
1701 | 1731 |
1702 Future<Socket> writeStream(Stream<List<int>> stream) { | |
1703 return _socket.writeStream(stream); | |
1704 } | |
1705 | |
1706 void destroy() => _socket.destroy(); | 1732 void destroy() => _socket.destroy(); |
1707 | 1733 |
1708 Future close() => _socket.close(); | 1734 Future close() => _socket.close(); |
1709 | 1735 |
1710 Future<Socket> get done => _socket.done; | 1736 Future<Socket> get done => _socket.done; |
1711 | 1737 |
1712 int get port => _socket.port; | 1738 int get port => _socket.port; |
1713 | 1739 |
1714 String get remoteHost => _socket.remoteHost; | 1740 String get remoteHost => _socket.remoteHost; |
1715 | 1741 |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1826 | 1852 |
1827 | 1853 |
1828 class _RedirectInfo implements RedirectInfo { | 1854 class _RedirectInfo implements RedirectInfo { |
1829 const _RedirectInfo(int this.statusCode, | 1855 const _RedirectInfo(int this.statusCode, |
1830 String this.method, | 1856 String this.method, |
1831 Uri this.location); | 1857 Uri this.location); |
1832 final int statusCode; | 1858 final int statusCode; |
1833 final String method; | 1859 final String method; |
1834 final Uri location; | 1860 final Uri location; |
1835 } | 1861 } |
OLD | NEW |