OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
8 final int _transferLength; | 8 final int _transferLength; |
9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
319 return new Future.immediate(this); | 319 return new Future.immediate(this); |
320 } | 320 } |
321 } | 321 } |
322 | 322 |
323 | 323 |
324 abstract class _HttpOutboundMessage<T> implements IOSink { | 324 abstract class _HttpOutboundMessage<T> implements IOSink { |
325 // Used to mark when the body should be written. This is used for HEAD | 325 // Used to mark when the body should be written. This is used for HEAD |
326 // requests and in error handling. | 326 // requests and in error handling. |
327 bool _ignoreBody = false; | 327 bool _ignoreBody = false; |
328 bool _headersWritten = false; | 328 bool _headersWritten = false; |
329 bool _asGZip = false; | |
329 | 330 |
330 IOSink _ioSink; | 331 IOSink _headersSink; |
332 IOSink _dataSink; | |
333 | |
331 final _HttpOutgoing _outgoing; | 334 final _HttpOutgoing _outgoing; |
332 | 335 |
333 final _HttpHeaders headers; | 336 final _HttpHeaders headers; |
334 | 337 |
335 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) | 338 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
336 : _outgoing = outgoing, | 339 : _outgoing = outgoing, |
337 _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), | 340 _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
338 headers = new _HttpHeaders(protocolVersion); | 341 headers = new _HttpHeaders(protocolVersion) { |
342 _dataSink = new IOSink( | |
343 new _HttpOutboundConsumer(_headersSink, _addStream, this)); | |
344 } | |
339 | 345 |
340 int get contentLength => headers.contentLength; | 346 int get contentLength => headers.contentLength; |
341 void set contentLength(int contentLength) { | 347 void set contentLength(int contentLength) { |
342 headers.contentLength = contentLength; | 348 headers.contentLength = contentLength; |
343 } | 349 } |
344 | 350 |
345 bool get persistentConnection => headers.persistentConnection; | 351 bool get persistentConnection => headers.persistentConnection; |
346 void set persistentConnection(bool p) { | 352 void set persistentConnection(bool p) { |
347 headers.persistentConnection = p; | 353 headers.persistentConnection = p; |
348 } | 354 } |
(...skipping 20 matching lines...) Expand all Loading... | |
369 String string; | 375 String string; |
370 if (obj is String) { | 376 if (obj is String) { |
371 string = obj; | 377 string = obj; |
372 } else { | 378 } else { |
373 string = obj.toString(); | 379 string = obj.toString(); |
374 if (string is! String) { | 380 if (string is! String) { |
375 throw new ArgumentError('toString() did not return a string'); | 381 throw new ArgumentError('toString() did not return a string'); |
376 } | 382 } |
377 } | 383 } |
378 if (string.isEmpty) return; | 384 if (string.isEmpty) return; |
379 _ioSink.write(string); | 385 _dataSink.write(string); |
380 } | 386 } |
381 | 387 |
382 void writeAll(Iterable objects, [String separator = ""]) { | 388 void writeAll(Iterable objects, [String separator = ""]) { |
383 bool isFirst = true; | 389 bool isFirst = true; |
384 for (Object obj in objects) { | 390 for (Object obj in objects) { |
385 if (isFirst) { | 391 if (isFirst) { |
386 isFirst = false; | 392 isFirst = false; |
387 } else { | 393 } else { |
388 if (!separator.isEmpty) write(separator); | 394 if (!separator.isEmpty) write(separator); |
389 } | 395 } |
390 write(obj); | 396 write(obj); |
391 } | 397 } |
392 } | 398 } |
393 | 399 |
394 void writeln([Object obj = ""]) { | 400 void writeln([Object obj = ""]) { |
395 write(obj); | 401 write(obj); |
396 write("\n"); | 402 write("\n"); |
397 } | 403 } |
398 | 404 |
399 void writeCharCode(int charCode) { | 405 void writeCharCode(int charCode) { |
400 write(new String.fromCharCode(charCode)); | 406 write(new String.fromCharCode(charCode)); |
401 } | 407 } |
402 | 408 |
403 void add(List<int> data) { | 409 void add(List<int> data) { |
404 _writeHeaders(); | 410 _writeHeaders(); |
405 if (data.length == 0) return; | 411 if (data.length == 0) return; |
406 _ioSink.add(data); | 412 _dataSink.add(data); |
407 } | 413 } |
408 | 414 |
409 void addError(AsyncError error) { | 415 void addError(AsyncError error) { |
410 _writeHeaders(); | 416 _writeHeaders(); |
411 _ioSink.addError(error); | 417 _dataSink.addError(error); |
412 } | |
413 | |
414 Future<T> consume(Stream<List<int>> stream) { | |
415 _writeHeaders(); | |
416 return _ioSink.consume(stream); | |
417 } | 418 } |
418 | 419 |
419 Future<T> addStream(Stream<List<int>> stream) { | 420 Future<T> addStream(Stream<List<int>> stream) { |
420 _writeHeaders(); | 421 _writeHeaders(); |
421 return _ioSink.writeStream(stream).then((_) => this); | 422 return _dataSink.addStream(stream); |
422 } | |
423 | |
424 Future<T> writeStream(Stream<List<int>> stream) { | |
425 return addStream(stream); | |
426 } | 423 } |
427 | 424 |
428 Future close() { | 425 Future close() { |
429 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and | 426 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and |
430 // persistentConnection is not guaranteed to be in sync. | 427 // persistentConnection is not guaranteed to be in sync. |
431 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { | 428 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { |
432 // If no body was written, _ignoreBody is false (it's not a HEAD | 429 // If no body was written, _ignoreBody is false (it's not a HEAD |
433 // request) and the content-length is unspecified, set contentLength to 0. | 430 // request) and the content-length is unspecified, set contentLength to 0. |
434 headers.chunkedTransferEncoding = false; | 431 headers.chunkedTransferEncoding = false; |
435 headers.contentLength = 0; | 432 headers.contentLength = 0; |
436 } | 433 } |
437 _writeHeaders(); | 434 _writeHeaders(); |
438 return _ioSink.close(); | 435 return _dataSink.close(); |
439 } | 436 } |
440 | 437 |
441 Future<T> get done { | 438 Future<T> get done => _dataSink.done.then((_) => this); |
442 _writeHeaders(); | |
443 return _ioSink.done; | |
444 } | |
445 | 439 |
446 void _writeHeaders() { | 440 void _writeHeaders() { |
447 if (_headersWritten) return; | 441 if (_headersWritten) return; |
448 _headersWritten = true; | 442 _headersWritten = true; |
449 _ioSink.encoding = Encoding.ASCII; | |
450 headers._synchronize(); // Be sure the 'chunked' option is updated. | 443 headers._synchronize(); // Be sure the 'chunked' option is updated. |
451 bool asGZip = false; | |
452 bool isServerSide = this is _HttpResponse; | 444 bool isServerSide = this is _HttpResponse; |
453 if (isServerSide && headers.chunkedTransferEncoding) { | 445 if (isServerSide && headers.chunkedTransferEncoding) { |
454 var response = this; | 446 var response = this; |
455 List acceptEncodings = | 447 List acceptEncodings = |
456 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; | 448 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
457 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; | 449 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
458 if (acceptEncodings != null && | 450 if (acceptEncodings != null && |
459 acceptEncodings | 451 acceptEncodings |
460 .expand((list) => list.split(",")) | 452 .expand((list) => list.split(",")) |
461 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && | 453 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
462 contentEncoding == null) { | 454 contentEncoding == null) { |
463 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); | 455 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
464 asGZip = true; | 456 _asGZip = true; |
465 } | 457 } |
466 } | 458 } |
467 _writeHeader(); | 459 _writeHeader(); |
468 _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip)); | |
469 _ioSink.encoding = encoding; | |
470 } | 460 } |
471 | 461 |
472 Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { | 462 Future _addStream(IOSink ioSink, Stream<List<int>> stream) { |
473 int contentLength = headers.contentLength; | 463 int contentLength = headers.contentLength; |
474 if (_ignoreBody) { | 464 if (_ignoreBody) { |
475 ioSink.close(); | 465 stream.fold(null, (x, y) {}).catchError((_) {}); |
476 return stream.fold(null, (x, y) {}).then((_) => this); | 466 return ioSink.close().then((_) => this); |
477 } | 467 } |
478 stream = stream.transform(new _BufferTransformer()); | 468 stream = stream.transform(new _BufferTransformer()); |
479 if (headers.chunkedTransferEncoding) { | 469 if (headers.chunkedTransferEncoding) { |
480 if (asGZip) { | 470 if (_asGZip) { |
481 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); | 471 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
482 } | 472 } |
483 stream = stream.transform(new _ChunkedTransformer()); | 473 stream = stream.transform(new _ChunkedTransformer()); |
484 } else if (contentLength >= 0) { | 474 } else if (contentLength >= 0) { |
485 stream = stream.transform(new _ContentLengthValidator(contentLength)); | 475 stream = stream.transform(new _ContentLengthValidator(contentLength)); |
486 } | 476 } |
487 return stream.pipe(ioSink).then((_) => this); | 477 return ioSink.addStream(stream).then((_) => this); |
488 } | 478 } |
489 | 479 |
490 void _writeHeader(); // TODO(ajohnsen): Better name. | 480 void _writeHeader(); // TODO(ajohnsen): Better name. |
491 } | 481 } |
492 | 482 |
493 | 483 |
494 class _HttpOutboundConsumer implements StreamConsumer { | 484 class _HttpOutboundConsumer implements StreamConsumer { |
495 Function _consume; | 485 StreamController _controller; |
486 StreamSubscription _subscription; | |
487 Function _addStream; | |
496 IOSink _ioSink; | 488 IOSink _ioSink; |
497 bool _asGZip; | 489 Completer _closeCompleter = new Completer(); |
490 Completer _completer; | |
491 | |
492 _HttpOutboundMessage _outbound; | |
498 _HttpOutboundConsumer(IOSink this._ioSink, | 493 _HttpOutboundConsumer(IOSink this._ioSink, |
499 Function this._consume, | 494 Function this._addStream, |
500 bool this._asGZip); | 495 _HttpOutboundMessage this._outbound); |
501 | 496 |
502 Future consume(var stream) => _consume(_ioSink, stream, _asGZip); | 497 void _onPause() { |
498 if (_controller.isPaused) { | |
499 _subscription.pause(); | |
500 } else { | |
501 _subscription.resume(); | |
502 } | |
503 } | |
504 | |
505 void _onListen() { | |
506 if (!_controller.hasListener && _subscription != null) { | |
507 _subscription.cancel(); | |
508 } | |
509 } | |
510 | |
511 _ensureController() { | |
512 if (_controller != null) return; | |
513 _controller = new StreamController(onPauseStateChange: _onPause, | |
514 onSubscriptionStateChange: _onListen); | |
515 _addStream(_ioSink, _controller.stream) | |
516 .then((v) { | |
Søren Gjesse
2013/04/15 06:56:30
v -> _
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
517 _done(); | |
518 _closeCompleter.complete(_outbound); | |
519 }, | |
520 onError: (error) { | |
521 if (!_done(error)) { | |
522 _closeCompleter.completeError(error); | |
523 } | |
524 }); | |
525 } | |
526 | |
527 bool _done([error]) { | |
528 if (_completer == null) return false; | |
529 var tmp = _completer; | |
530 _completer = null; | |
531 if (error != null) { | |
532 tmp.completeError(error); | |
533 } else { | |
534 tmp.complete(_outbound); | |
535 } | |
536 return true; | |
537 } | |
503 | 538 |
504 Future addStream(var stream) { | 539 Future addStream(var stream) { |
505 throw new UnimplementedError("_HttpOutboundConsumer.addStream"); | 540 _ensureController(); |
541 _completer = new Completer(); | |
542 _subscription = stream.listen( | |
543 (data) { | |
544 _controller.add(data); | |
545 }, | |
546 onDone: () { | |
547 _done(); | |
548 }, | |
549 onError: (error) { | |
550 _done(error); | |
551 }, | |
552 unsubscribeOnError: true); | |
553 return _completer.future; | |
506 } | 554 } |
507 | 555 |
508 Future close() { | 556 Future close() { |
509 throw new UnimplementedError("_HttpOutboundConsumer.close"); | 557 _ensureController(); |
558 _controller.close(); | |
559 return _closeCompleter.future.then((_) { | |
560 return _ioSink.close().then((_) => _outbound); | |
561 }); | |
510 } | 562 } |
511 } | 563 } |
512 | 564 |
513 | 565 |
514 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { | 566 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { |
515 const int MIN_CHUNK_SIZE = 4 * 1024; | 567 const int MIN_CHUNK_SIZE = 4 * 1024; |
516 const int MAX_BUFFER_SIZE = 16 * 1024; | 568 const int MAX_BUFFER_SIZE = 16 * 1024; |
517 | 569 |
518 final _BufferList _buffer = new _BufferList(); | 570 final _BufferList _buffer = new _BufferList(); |
519 | 571 |
(...skipping 11 matching lines...) Expand all Loading... | |
531 } | 583 } |
532 } | 584 } |
533 | 585 |
534 void handleDone(EventSink<List<int>> sink) { | 586 void handleDone(EventSink<List<int>> sink) { |
535 flush(sink); | 587 flush(sink); |
536 sink.close(); | 588 sink.close(); |
537 } | 589 } |
538 | 590 |
539 void flush(EventSink<List<int>> sink) { | 591 void flush(EventSink<List<int>> sink) { |
540 if (_buffer.length > 0) { | 592 if (_buffer.length > 0) { |
541 sink.add(_buffer.readBytes()); | 593 var data = _buffer.readBytes(); |
Søren Gjesse
2013/04/15 06:56:30
Why this change?
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
594 sink.add(data); | |
542 _buffer.clear(); | 595 _buffer.clear(); |
543 } | 596 } |
544 } | 597 } |
545 } | 598 } |
546 | 599 |
547 | 600 |
548 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> | 601 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
549 implements HttpResponse { | 602 implements HttpResponse { |
550 int statusCode = 200; | 603 int statusCode = 200; |
551 String _reasonPhrase; | 604 String _reasonPhrase; |
(...skipping 15 matching lines...) Expand all Loading... | |
567 _reasonPhrase = reasonPhrase; | 620 _reasonPhrase = reasonPhrase; |
568 } | 621 } |
569 | 622 |
570 Future<Socket> detachSocket() { | 623 Future<Socket> detachSocket() { |
571 if (_headersWritten) throw new StateError("Headers already sent"); | 624 if (_headersWritten) throw new StateError("Headers already sent"); |
572 _writeHeaders(); | 625 _writeHeaders(); |
573 var future = _httpRequest._httpConnection.detachSocket(); | 626 var future = _httpRequest._httpConnection.detachSocket(); |
574 // Close connection so the socket is 'free'. | 627 // Close connection so the socket is 'free'. |
575 close(); | 628 close(); |
576 done.catchError((_) { | 629 done.catchError((_) { |
577 // Catch any error on done, as they automatically will be propegated to | 630 // Catch any error on done, as they automatically will be |
578 // the websocket. | 631 // propegated to the websocket. |
Søren Gjesse
2013/04/15 06:56:30
propagated
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
579 }); | 632 }); |
580 return future; | 633 return future; |
581 } | 634 } |
582 | 635 |
583 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; | 636 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
584 | 637 |
585 void _writeHeader() { | 638 void _writeHeader() { |
586 var buffer = new _BufferList(); | 639 var buffer = new _BufferList(); |
587 writeSP() => buffer.add(const [_CharCode.SP]); | 640 writeSP() => buffer.add(const [_CharCode.SP]); |
588 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); | 641 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
625 _cookies.forEach((cookie) { | 678 _cookies.forEach((cookie) { |
626 headers.add(HttpHeaders.SET_COOKIE, cookie); | 679 headers.add(HttpHeaders.SET_COOKIE, cookie); |
627 }); | 680 }); |
628 } | 681 } |
629 | 682 |
630 headers._finalize(); | 683 headers._finalize(); |
631 | 684 |
632 // Write headers. | 685 // Write headers. |
633 headers._write(buffer); | 686 headers._write(buffer); |
634 writeCRLF(); | 687 writeCRLF(); |
635 _ioSink.add(buffer.readBytes()); | 688 _headersSink.add(buffer.readBytes()); |
636 } | 689 } |
637 | 690 |
638 String _findReasonPhrase(int statusCode) { | 691 String _findReasonPhrase(int statusCode) { |
639 if (_reasonPhrase != null) { | 692 if (_reasonPhrase != null) { |
640 return _reasonPhrase; | 693 return _reasonPhrase; |
641 } | 694 } |
642 | 695 |
643 switch (statusCode) { | 696 switch (statusCode) { |
644 case HttpStatus.CONTINUE: return "Continue"; | 697 case HttpStatus.CONTINUE: return "Continue"; |
645 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; | 698 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
724 _HttpClientConnection this._httpClientConnection) | 777 _HttpClientConnection this._httpClientConnection) |
725 : super("1.1", outgoing) { | 778 : super("1.1", outgoing) { |
726 // GET and HEAD have 'content-length: 0' by default. | 779 // GET and HEAD have 'content-length: 0' by default. |
727 if (method == "GET" || method == "HEAD") { | 780 if (method == "GET" || method == "HEAD") { |
728 contentLength = 0; | 781 contentLength = 0; |
729 } | 782 } |
730 } | 783 } |
731 | 784 |
732 Future<HttpClientResponse> get done { | 785 Future<HttpClientResponse> get done { |
733 if (_response == null) { | 786 if (_response == null) { |
734 _response = Future.wait([_responseCompleter.future, super.done]) | 787 _response = Future.wait([_responseCompleter.future, |
788 super.done]) | |
735 .then((list) => list[0]); | 789 .then((list) => list[0]); |
736 } | 790 } |
737 return _response; | 791 return _response; |
738 } | 792 } |
739 | 793 |
740 Future<HttpClientResponse> consume(Stream<List<int>> stream) { | |
741 super.consume(stream); | |
742 return done; | |
743 } | |
744 | |
745 Future<HttpClientResponse> close() { | 794 Future<HttpClientResponse> close() { |
746 super.close(); | 795 super.close(); |
747 return done; | 796 return done; |
748 } | 797 } |
749 | 798 |
750 int get maxRedirects => _maxRedirects; | 799 int get maxRedirects => _maxRedirects; |
751 void set maxRedirects(int maxRedirects) { | 800 void set maxRedirects(int maxRedirects) { |
752 if (_headersWritten) throw new StateError("Request already sent"); | 801 if (_headersWritten) throw new StateError("Request already sent"); |
753 _maxRedirects = maxRedirects; | 802 _maxRedirects = maxRedirects; |
754 } | 803 } |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
830 sb.write(cookies[i].value); | 879 sb.write(cookies[i].value); |
831 } | 880 } |
832 headers.add(HttpHeaders.COOKIE, sb.toString()); | 881 headers.add(HttpHeaders.COOKIE, sb.toString()); |
833 } | 882 } |
834 | 883 |
835 headers._finalize(); | 884 headers._finalize(); |
836 | 885 |
837 // Write headers. | 886 // Write headers. |
838 headers._write(buffer); | 887 headers._write(buffer); |
839 writeCRLF(); | 888 writeCRLF(); |
840 _ioSink.add(buffer.readBytes()); | 889 _headersSink.add(buffer.readBytes()); |
841 } | 890 } |
842 } | 891 } |
843 | 892 |
844 | 893 |
845 // Transformer that transforms data to HTTP Chunked Encoding. | 894 // Transformer that transforms data to HTTP Chunked Encoding. |
846 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { | 895 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
847 void handleData(List<int> data, EventSink<List<int>> sink) { | 896 void handleData(List<int> data, EventSink<List<int>> sink) { |
848 sink.add(_chunkHeader(data.length)); | 897 sink.add(_chunkHeader(data.length)); |
849 if (data.length > 0) sink.add(data); | 898 if (data.length > 0) sink.add(data); |
850 sink.add(_chunkFooter); | 899 sink.add(_chunkFooter); |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
906 " $_bytesWritten bytes written while expected " | 955 " $_bytesWritten bytes written while expected " |
907 "$expectedContentLength."))); | 956 "$expectedContentLength."))); |
908 } | 957 } |
909 sink.close(); | 958 sink.close(); |
910 } | 959 } |
911 } | 960 } |
912 | 961 |
913 | 962 |
914 // Extends StreamConsumer as this is an internal type, only used to pipe to. | 963 // Extends StreamConsumer as this is an internal type, only used to pipe to. |
915 class _HttpOutgoing implements StreamConsumer<List<int>> { | 964 class _HttpOutgoing implements StreamConsumer<List<int>> { |
916 Function _onStream; | 965 final Completer _doneCompleter = new Completer(); |
917 final Completer _consumeCompleter = new Completer(); | 966 final StreamConsumer _consumer; |
918 | 967 |
919 Future onStream(Future callback(Stream<List<int>> stream)) { | 968 _HttpOutgoing(StreamConsumer this._consumer); |
920 _onStream = callback; | |
921 return _consumeCompleter.future; | |
922 } | |
923 | |
924 Future consume(Stream<List<int>> stream) { | |
925 _onStream(stream) | |
926 .then((_) => _consumeCompleter.complete(), | |
927 onError: _consumeCompleter.completeError); | |
928 // Use .then to ensure a Future branch. | |
929 return _consumeCompleter.future.then((_) => this); | |
930 } | |
931 | 969 |
932 Future addStream(Stream<List<int>> stream) { | 970 Future addStream(Stream<List<int>> stream) { |
933 throw new UnimplementedError("_HttpOutgoing.addStream"); | 971 return _consumer.addStream(stream) |
972 .catchError((error) { | |
973 _doneCompleter.completeError(error); | |
974 throw error; | |
975 }); | |
934 } | 976 } |
935 | 977 |
936 Future close() { | 978 Future close() { |
937 throw new UnimplementedError("_HttpOutgoing.close"); | 979 _doneCompleter.complete(_consumer); |
980 return new Future.immediate(null); | |
938 } | 981 } |
982 | |
983 Future get done => _doneCompleter.future; | |
939 } | 984 } |
940 | 985 |
941 | 986 |
942 class _HttpClientConnection { | 987 class _HttpClientConnection { |
943 final String key; | 988 final String key; |
944 final Socket _socket; | 989 final Socket _socket; |
945 final _HttpParser _httpParser; | 990 final _HttpParser _httpParser; |
946 StreamSubscription _subscription; | 991 StreamSubscription _subscription; |
947 final _HttpClient _httpClient; | 992 final _HttpClient _httpClient; |
948 | 993 |
949 Completer<_HttpIncoming> _nextResponseCompleter; | 994 Completer<_HttpIncoming> _nextResponseCompleter; |
950 Future _streamFuture; | 995 Future _streamFuture; |
951 | 996 |
952 _HttpClientConnection(String this.key, | 997 _HttpClientConnection(String this.key, |
953 Socket this._socket, | 998 Socket this._socket, |
954 _HttpClient this._httpClient) | 999 _HttpClient this._httpClient) |
955 : _httpParser = new _HttpParser.responseParser() { | 1000 : _httpParser = new _HttpParser.responseParser() { |
956 _socket.pipe(_httpParser); | 1001 _socket.pipe(_httpParser); |
957 _socket.done.catchError((e) { destroy(); }); | |
958 | 1002 |
959 // Set up handlers on the parser here, so we are sure to get 'onDone' from | 1003 // Set up handlers on the parser here, so we are sure to get 'onDone' from |
960 // the parser. | 1004 // the parser. |
961 _subscription = _httpParser.listen( | 1005 _subscription = _httpParser.listen( |
962 (incoming) { | 1006 (incoming) { |
963 // Only handle one incoming response at the time. Keep the | 1007 // Only handle one incoming response at the time. Keep the |
964 // stream paused until the response have been processed. | 1008 // stream paused until the response have been processed. |
965 _subscription.pause(); | 1009 _subscription.pause(); |
966 // We assume the response is not here, until we have send the request. | 1010 // We assume the response is not here, until we have send the request. |
967 assert(_nextResponseCompleter != null); | 1011 assert(_nextResponseCompleter != null); |
968 var completer = _nextResponseCompleter; | 1012 var completer = _nextResponseCompleter; |
969 _nextResponseCompleter = null; | 1013 _nextResponseCompleter = null; |
970 completer.complete(incoming); | 1014 completer.complete(incoming); |
971 }, | 1015 }, |
972 onError: (error) { | 1016 onError: (error) { |
973 if (_nextResponseCompleter != null) { | 1017 if (_nextResponseCompleter != null) { |
974 _nextResponseCompleter.completeError(error); | 1018 _nextResponseCompleter.completeError(error); |
975 _nextResponseCompleter = null; | 1019 _nextResponseCompleter = null; |
976 } | 1020 } |
977 }, | 1021 }, |
978 onDone: () { | 1022 onDone: () { |
979 close(); | 1023 close(); |
980 }); | 1024 }); |
981 } | 1025 } |
982 | 1026 |
983 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { | 1027 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { |
984 // Start with pausing the parser. | 1028 // Start with pausing the parser. |
985 _subscription.pause(); | 1029 _subscription.pause(); |
986 var outgoing = new _HttpOutgoing(); | 1030 var outgoing = new _HttpOutgoing(_socket); |
987 // Create new request object, wrapping the outgoing connection. | 1031 // Create new request object, wrapping the outgoing connection. |
988 var request = new _HttpClientRequest(outgoing, | 1032 var request = new _HttpClientRequest(outgoing, |
989 uri, | 1033 uri, |
990 method, | 1034 method, |
991 !isDirect, | 1035 !isDirect, |
992 _httpClient, | 1036 _httpClient, |
993 this); | 1037 this); |
994 request.headers.host = uri.domain; | 1038 request.headers.host = uri.domain; |
995 request.headers.port = port; | 1039 request.headers.port = port; |
996 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); | 1040 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); |
997 if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 1041 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
998 // If the URL contains user information use that for basic | 1042 // If the URL contains user information use that for basic |
999 // authorization | 1043 // authorization |
1000 String auth = | 1044 String auth = |
1001 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); | 1045 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); |
1002 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 1046 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
1003 } else { | 1047 } else { |
1004 // Look for credentials. | 1048 // Look for credentials. |
1005 _Credentials cr = _httpClient._findCredentials(uri); | 1049 _Credentials cr = _httpClient._findCredentials(uri); |
1006 if (cr != null) { | 1050 if (cr != null) { |
1007 cr.authorize(request); | 1051 cr.authorize(request); |
1008 } | 1052 } |
1009 } | 1053 } |
1010 // Start sending the request (lazy, delayed until the user provides | 1054 // Start sending the request (lazy, delayed until the user provides |
1011 // data). | 1055 // data). |
1012 _httpParser.responseToMethod = method; | 1056 _httpParser.responseToMethod = method; |
1013 _streamFuture = outgoing.onStream((stream) { | 1057 _streamFuture = outgoing.done |
1014 return _socket.writeStream(stream) | 1058 .then((s) { |
1015 .then((s) { | 1059 // Request sent, set up response completer. |
1016 // Request sent, set up response completer. | 1060 _nextResponseCompleter = new Completer(); |
1017 _nextResponseCompleter = new Completer(); | |
1018 | 1061 |
1019 // Listen for response. | 1062 // Listen for response. |
1020 _nextResponseCompleter.future | 1063 _nextResponseCompleter.future |
1021 .then((incoming) { | 1064 .then((incoming) { |
1022 incoming.dataDone.then((_) { | 1065 incoming.dataDone.then((_) { |
1023 if (incoming.headers.persistentConnection && | 1066 if (incoming.headers.persistentConnection && |
1024 request.persistentConnection) { | 1067 request.persistentConnection) { |
1025 // Return connection, now we are done. | 1068 // Return connection, now we are done. |
1026 _httpClient._returnConnection(this); | 1069 _httpClient._returnConnection(this); |
1027 _subscription.resume(); | 1070 _subscription.resume(); |
1028 } else { | 1071 } else { |
1029 destroy(); | |
1030 } | |
1031 }); | |
1032 request._onIncoming(incoming); | |
1033 }) | |
1034 // If we see a state error, we failed to get the 'first' | |
1035 // element. | |
1036 // Transform the error to a HttpParserException, for | |
1037 // consistency. | |
1038 .catchError((error) { | |
1039 throw new HttpParserException( | |
1040 "Connection closed before data was received"); | |
1041 }, test: (error) => error is StateError) | |
1042 .catchError((error) { | |
1043 // We are done with the socket. | |
1044 destroy(); | 1072 destroy(); |
1045 request._onError(error); | 1073 } |
1046 }); | 1074 }); |
1075 request._onIncoming(incoming); | |
1076 }) | |
1077 // If we see a state error, we failed to get the 'first' | |
1078 // element. | |
1079 // Transform the error to a HttpParserException, for | |
1080 // consistency. | |
1081 .catchError((error) { | |
1082 throw new HttpParserException( | |
1083 "Connection closed before data was received"); | |
1084 }, test: (error) => error is StateError) | |
1085 .catchError((error) { | |
1086 // We are done with the socket. | |
1087 destroy(); | |
1088 request._onError(error); | |
1089 }); | |
1047 | 1090 |
1048 // Resume the parser now we have a handler. | 1091 // Resume the parser now we have a handler. |
1049 _subscription.resume(); | 1092 _subscription.resume(); |
1050 return s; | 1093 return s; |
1051 }, onError: (e) { | 1094 }, onError: (e) { |
1052 destroy(); | 1095 destroy(); |
1053 throw e; | 1096 }); |
1054 }); | |
1055 }); | |
1056 return request; | 1097 return request; |
1057 } | 1098 } |
1058 | 1099 |
1059 Future<Socket> detachSocket() { | 1100 Future<Socket> detachSocket() { |
1060 return _streamFuture | 1101 return _streamFuture.then( |
1061 .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), | 1102 (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
1062 onError: (_) {}); | |
1063 } | 1103 } |
1064 | 1104 |
1065 void destroy() { | 1105 void destroy() { |
1066 _httpClient._connectionClosed(this); | 1106 _httpClient._connectionClosed(this); |
1067 _socket.destroy(); | 1107 _socket.destroy(); |
1068 } | 1108 } |
1069 | 1109 |
1070 void close() { | 1110 void close() { |
1071 _httpClient._connectionClosed(this); | 1111 _httpClient._connectionClosed(this); |
1072 _streamFuture | 1112 _streamFuture |
1073 // TODO(ajohnsen): Add timeout. | 1113 // TODO(ajohnsen): Add timeout. |
1074 .then((_) => _socket.destroy(), | 1114 .then((_) => _socket.destroy()); |
1075 onError: (_) {}); | |
1076 } | 1115 } |
1077 | 1116 |
1078 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); | 1117 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
1079 } | 1118 } |
1080 | 1119 |
1081 class _ConnnectionInfo { | 1120 class _ConnnectionInfo { |
1082 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); | 1121 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); |
1083 final _HttpClientConnection connection; | 1122 final _HttpClientConnection connection; |
1084 final _Proxy proxy; | 1123 final _Proxy proxy; |
1085 } | 1124 } |
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1370 final Socket _socket; | 1409 final Socket _socket; |
1371 final _HttpServer _httpServer; | 1410 final _HttpServer _httpServer; |
1372 final _HttpParser _httpParser; | 1411 final _HttpParser _httpParser; |
1373 StreamSubscription _subscription; | 1412 StreamSubscription _subscription; |
1374 | 1413 |
1375 Future _streamFuture; | 1414 Future _streamFuture; |
1376 | 1415 |
1377 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) | 1416 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
1378 : _httpParser = new _HttpParser.requestParser() { | 1417 : _httpParser = new _HttpParser.requestParser() { |
1379 _socket.pipe(_httpParser); | 1418 _socket.pipe(_httpParser); |
1380 _socket.done.catchError((e) => destroy()); | |
1381 _subscription = _httpParser.listen( | 1419 _subscription = _httpParser.listen( |
1382 (incoming) { | 1420 (incoming) { |
1383 // Only handle one incoming request at the time. Keep the | 1421 // Only handle one incoming request at the time. Keep the |
1384 // stream paused until the request has been send. | 1422 // stream paused until the request has been send. |
1385 _subscription.pause(); | 1423 _subscription.pause(); |
1386 _state = _ACTIVE; | 1424 _state = _ACTIVE; |
1387 var outgoing = new _HttpOutgoing(); | 1425 var outgoing = new _HttpOutgoing(_socket); |
1388 var response = new _HttpResponse(incoming.headers.protocolVersion, | 1426 var response = new _HttpResponse(incoming.headers.protocolVersion, |
1389 outgoing); | 1427 outgoing); |
1390 var request = new _HttpRequest(response, incoming, _httpServer, this); | 1428 var request = new _HttpRequest(response, incoming, _httpServer, this); |
1391 outgoing.onStream((stream) { | 1429 _streamFuture = outgoing.done |
1392 return _streamFuture = _socket.writeStream(stream) | 1430 .then((_) { |
1393 .then((_) { | 1431 if (_state == _DETACHED) return; |
1394 if (_state == _DETACHED) return; | 1432 if (response.persistentConnection && |
1395 if (response.persistentConnection && | 1433 request.persistentConnection && |
1396 request.persistentConnection && | 1434 incoming.fullBodyRead) { |
1397 incoming.fullBodyRead) { | 1435 _state = _IDLE; |
1398 _state = _IDLE; | 1436 // Resume the subscription for incoming requests as the |
1399 // Resume the subscription for incoming requests as the | 1437 // request is now processed. |
1400 // request is now processed. | 1438 _subscription.resume(); |
1401 _subscription.resume(); | 1439 } else { |
1402 } else { | 1440 // Close socket, keep-alive not used or body sent before |
1403 // Close socket, keep-alive not used or body sent before | 1441 // received data was handled. |
1404 // received data was handled. | |
1405 destroy(); | |
1406 } | |
1407 }) | |
1408 .catchError((e) { | |
1409 destroy(); | 1442 destroy(); |
1410 throw e; | 1443 } |
1411 }); | 1444 }) |
1412 }); | 1445 .catchError((e) { |
1446 destroy(); | |
1447 }); | |
1413 response._ignoreBody = request.method == "HEAD"; | 1448 response._ignoreBody = request.method == "HEAD"; |
1414 response._httpRequest = request; | 1449 response._httpRequest = request; |
1415 _httpServer._handleRequest(request); | 1450 _httpServer._handleRequest(request); |
1416 }, | 1451 }, |
1417 onDone: () { | 1452 onDone: () { |
1418 destroy(); | 1453 destroy(); |
1419 }, | 1454 }, |
1420 onError: (error) { | 1455 onError: (error) { |
1421 _httpServer._handleError(error); | 1456 _httpServer._handleError(error); |
1422 destroy(); | 1457 destroy(); |
(...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1684 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); | 1719 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); |
1685 | 1720 |
1686 void writeAll(Iterable objects, [String separator = ""]) { | 1721 void writeAll(Iterable objects, [String separator = ""]) { |
1687 _socket.writeAll(objects, separator); | 1722 _socket.writeAll(objects, separator); |
1688 } | 1723 } |
1689 | 1724 |
1690 void add(List<int> bytes) => _socket.add(bytes); | 1725 void add(List<int> bytes) => _socket.add(bytes); |
1691 | 1726 |
1692 void addError(AsyncError error) => _socket.addError(error); | 1727 void addError(AsyncError error) => _socket.addError(error); |
1693 | 1728 |
1694 Future<Socket> consume(Stream<List<int>> stream) { | |
1695 return _socket.consume(stream); | |
1696 } | |
1697 | |
1698 Future<Socket> addStream(Stream<List<int>> stream) { | 1729 Future<Socket> addStream(Stream<List<int>> stream) { |
1699 return _socket.addStream(stream); | 1730 return _socket.addStream(stream); |
1700 } | 1731 } |
1701 | 1732 |
1702 Future<Socket> writeStream(Stream<List<int>> stream) { | |
1703 return _socket.writeStream(stream); | |
1704 } | |
1705 | |
1706 void destroy() => _socket.destroy(); | 1733 void destroy() => _socket.destroy(); |
1707 | 1734 |
1708 Future close() => _socket.close(); | 1735 Future close() => _socket.close(); |
1709 | 1736 |
1710 Future<Socket> get done => _socket.done; | 1737 Future<Socket> get done => _socket.done; |
1711 | 1738 |
1712 int get port => _socket.port; | 1739 int get port => _socket.port; |
1713 | 1740 |
1714 String get remoteHost => _socket.remoteHost; | 1741 String get remoteHost => _socket.remoteHost; |
1715 | 1742 |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1826 | 1853 |
1827 | 1854 |
1828 class _RedirectInfo implements RedirectInfo { | 1855 class _RedirectInfo implements RedirectInfo { |
1829 const _RedirectInfo(int this.statusCode, | 1856 const _RedirectInfo(int this.statusCode, |
1830 String this.method, | 1857 String this.method, |
1831 Uri this.location); | 1858 Uri this.location); |
1832 final int statusCode; | 1859 final int statusCode; |
1833 final String method; | 1860 final String method; |
1834 final Uri location; | 1861 final Uri location; |
1835 } | 1862 } |
OLD | NEW |