Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 class _HttpIncoming extends Stream<List<int>> { | 7 class _HttpIncoming extends Stream<List<int>> { |
| 8 final int _transferLength; | 8 final int _transferLength; |
| 9 final Completer _dataCompleter = new Completer(); | 9 final Completer _dataCompleter = new Completer(); |
| 10 Stream<List<int>> _stream; | 10 Stream<List<int>> _stream; |
| (...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 319 return new Future.immediate(this); | 319 return new Future.immediate(this); |
| 320 } | 320 } |
| 321 } | 321 } |
| 322 | 322 |
| 323 | 323 |
| 324 abstract class _HttpOutboundMessage<T> implements IOSink { | 324 abstract class _HttpOutboundMessage<T> implements IOSink { |
| 325 // Used to mark when the body should be written. This is used for HEAD | 325 // Used to mark when the body should be written. This is used for HEAD |
| 326 // requests and in error handling. | 326 // requests and in error handling. |
| 327 bool _ignoreBody = false; | 327 bool _ignoreBody = false; |
| 328 bool _headersWritten = false; | 328 bool _headersWritten = false; |
| 329 bool _asGZip = false; | |
| 329 | 330 |
| 330 IOSink _ioSink; | 331 IOSink _headersSink; |
| 332 IOSink _dataSink; | |
| 333 | |
| 331 final _HttpOutgoing _outgoing; | 334 final _HttpOutgoing _outgoing; |
| 332 | 335 |
| 333 final _HttpHeaders headers; | 336 final _HttpHeaders headers; |
| 334 | 337 |
| 335 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) | 338 _HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing) |
| 336 : _outgoing = outgoing, | 339 : _outgoing = outgoing, |
| 337 _ioSink = new IOSink(outgoing, encoding: Encoding.ASCII), | 340 _headersSink = new IOSink(outgoing, encoding: Encoding.ASCII), |
| 338 headers = new _HttpHeaders(protocolVersion); | 341 headers = new _HttpHeaders(protocolVersion) { |
| 342 _dataSink = new IOSink( | |
| 343 new _HttpOutboundConsumer(_headersSink, _addStream, this)); | |
| 344 } | |
| 339 | 345 |
| 340 int get contentLength => headers.contentLength; | 346 int get contentLength => headers.contentLength; |
| 341 void set contentLength(int contentLength) { | 347 void set contentLength(int contentLength) { |
| 342 headers.contentLength = contentLength; | 348 headers.contentLength = contentLength; |
| 343 } | 349 } |
| 344 | 350 |
| 345 bool get persistentConnection => headers.persistentConnection; | 351 bool get persistentConnection => headers.persistentConnection; |
| 346 void set persistentConnection(bool p) { | 352 void set persistentConnection(bool p) { |
| 347 headers.persistentConnection = p; | 353 headers.persistentConnection = p; |
| 348 } | 354 } |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 369 String string; | 375 String string; |
| 370 if (obj is String) { | 376 if (obj is String) { |
| 371 string = obj; | 377 string = obj; |
| 372 } else { | 378 } else { |
| 373 string = obj.toString(); | 379 string = obj.toString(); |
| 374 if (string is! String) { | 380 if (string is! String) { |
| 375 throw new ArgumentError('toString() did not return a string'); | 381 throw new ArgumentError('toString() did not return a string'); |
| 376 } | 382 } |
| 377 } | 383 } |
| 378 if (string.isEmpty) return; | 384 if (string.isEmpty) return; |
| 379 _ioSink.write(string); | 385 _dataSink.write(string); |
| 380 } | 386 } |
| 381 | 387 |
| 382 void writeAll(Iterable objects, [String separator = ""]) { | 388 void writeAll(Iterable objects, [String separator = ""]) { |
| 383 bool isFirst = true; | 389 bool isFirst = true; |
| 384 for (Object obj in objects) { | 390 for (Object obj in objects) { |
| 385 if (isFirst) { | 391 if (isFirst) { |
| 386 isFirst = false; | 392 isFirst = false; |
| 387 } else { | 393 } else { |
| 388 if (!separator.isEmpty) write(separator); | 394 if (!separator.isEmpty) write(separator); |
| 389 } | 395 } |
| 390 write(obj); | 396 write(obj); |
| 391 } | 397 } |
| 392 } | 398 } |
| 393 | 399 |
| 394 void writeln([Object obj = ""]) { | 400 void writeln([Object obj = ""]) { |
| 395 write(obj); | 401 write(obj); |
| 396 write("\n"); | 402 write("\n"); |
| 397 } | 403 } |
| 398 | 404 |
| 399 void writeCharCode(int charCode) { | 405 void writeCharCode(int charCode) { |
| 400 write(new String.fromCharCode(charCode)); | 406 write(new String.fromCharCode(charCode)); |
| 401 } | 407 } |
| 402 | 408 |
| 403 void add(List<int> data) { | 409 void add(List<int> data) { |
| 404 _writeHeaders(); | 410 _writeHeaders(); |
| 405 if (data.length == 0) return; | 411 if (data.length == 0) return; |
| 406 _ioSink.add(data); | 412 _dataSink.add(data); |
| 407 } | 413 } |
| 408 | 414 |
| 409 void addError(AsyncError error) { | 415 void addError(AsyncError error) { |
| 410 _writeHeaders(); | 416 _writeHeaders(); |
| 411 _ioSink.addError(error); | 417 _dataSink.addError(error); |
| 412 } | |
| 413 | |
| 414 Future<T> consume(Stream<List<int>> stream) { | |
| 415 _writeHeaders(); | |
| 416 return _ioSink.consume(stream); | |
| 417 } | 418 } |
| 418 | 419 |
| 419 Future<T> addStream(Stream<List<int>> stream) { | 420 Future<T> addStream(Stream<List<int>> stream) { |
| 420 _writeHeaders(); | 421 _writeHeaders(); |
| 421 return _ioSink.writeStream(stream).then((_) => this); | 422 return _dataSink.addStream(stream); |
| 422 } | |
| 423 | |
| 424 Future<T> writeStream(Stream<List<int>> stream) { | |
| 425 return addStream(stream); | |
| 426 } | 423 } |
| 427 | 424 |
| 428 Future close() { | 425 Future close() { |
| 429 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and | 426 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and |
| 430 // persistentConnection is not guaranteed to be in sync. | 427 // persistentConnection is not guaranteed to be in sync. |
| 431 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { | 428 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { |
| 432 // If no body was written, _ignoreBody is false (it's not a HEAD | 429 // If no body was written, _ignoreBody is false (it's not a HEAD |
| 433 // request) and the content-length is unspecified, set contentLength to 0. | 430 // request) and the content-length is unspecified, set contentLength to 0. |
| 434 headers.chunkedTransferEncoding = false; | 431 headers.chunkedTransferEncoding = false; |
| 435 headers.contentLength = 0; | 432 headers.contentLength = 0; |
| 436 } | 433 } |
| 437 _writeHeaders(); | 434 _writeHeaders(); |
| 438 return _ioSink.close(); | 435 return _dataSink.close(); |
| 439 } | 436 } |
| 440 | 437 |
| 441 Future<T> get done { | 438 Future<T> get done => _dataSink.done.then((_) => this); |
| 442 _writeHeaders(); | |
| 443 return _ioSink.done; | |
| 444 } | |
| 445 | 439 |
| 446 void _writeHeaders() { | 440 void _writeHeaders() { |
| 447 if (_headersWritten) return; | 441 if (_headersWritten) return; |
| 448 _headersWritten = true; | 442 _headersWritten = true; |
| 449 _ioSink.encoding = Encoding.ASCII; | |
| 450 headers._synchronize(); // Be sure the 'chunked' option is updated. | 443 headers._synchronize(); // Be sure the 'chunked' option is updated. |
| 451 bool asGZip = false; | |
| 452 bool isServerSide = this is _HttpResponse; | 444 bool isServerSide = this is _HttpResponse; |
| 453 if (isServerSide && headers.chunkedTransferEncoding) { | 445 if (isServerSide && headers.chunkedTransferEncoding) { |
| 454 var response = this; | 446 var response = this; |
| 455 List acceptEncodings = | 447 List acceptEncodings = |
| 456 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; | 448 response._httpRequest.headers[HttpHeaders.ACCEPT_ENCODING]; |
| 457 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; | 449 List contentEncoding = headers[HttpHeaders.CONTENT_ENCODING]; |
| 458 if (acceptEncodings != null && | 450 if (acceptEncodings != null && |
| 459 acceptEncodings | 451 acceptEncodings |
| 460 .expand((list) => list.split(",")) | 452 .expand((list) => list.split(",")) |
| 461 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && | 453 .any((encoding) => encoding.trim().toLowerCase() == "gzip") && |
| 462 contentEncoding == null) { | 454 contentEncoding == null) { |
| 463 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); | 455 headers.set(HttpHeaders.CONTENT_ENCODING, "gzip"); |
| 464 asGZip = true; | 456 _asGZip = true; |
| 465 } | 457 } |
| 466 } | 458 } |
| 467 _writeHeader(); | 459 _writeHeader(); |
| 468 _ioSink = new IOSink(new _HttpOutboundConsumer(_ioSink, _consume, asGZip)); | |
| 469 _ioSink.encoding = encoding; | |
| 470 } | 460 } |
| 471 | 461 |
| 472 Future _consume(IOSink ioSink, Stream<List<int>> stream, bool asGZip) { | 462 Future _addStream(IOSink ioSink, Stream<List<int>> stream) { |
| 473 int contentLength = headers.contentLength; | 463 int contentLength = headers.contentLength; |
| 474 if (_ignoreBody) { | 464 if (_ignoreBody) { |
| 475 ioSink.close(); | 465 stream.fold(null, (x, y) {}).catchError((_) {}); |
| 476 return stream.fold(null, (x, y) {}).then((_) => this); | 466 return ioSink.close().then((_) => this); |
| 477 } | 467 } |
| 478 stream = stream.transform(new _BufferTransformer()); | 468 stream = stream.transform(new _BufferTransformer()); |
| 479 if (headers.chunkedTransferEncoding) { | 469 if (headers.chunkedTransferEncoding) { |
| 480 if (asGZip) { | 470 if (_asGZip) { |
| 481 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); | 471 stream = stream.transform(new ZLibDeflater(gzip: true, level: 6)); |
| 482 } | 472 } |
| 483 stream = stream.transform(new _ChunkedTransformer()); | 473 stream = stream.transform(new _ChunkedTransformer()); |
| 484 } else if (contentLength >= 0) { | 474 } else if (contentLength >= 0) { |
| 485 stream = stream.transform(new _ContentLengthValidator(contentLength)); | 475 stream = stream.transform(new _ContentLengthValidator(contentLength)); |
| 486 } | 476 } |
| 487 return stream.pipe(ioSink).then((_) => this); | 477 return ioSink.addStream(stream).then((_) => this); |
| 488 } | 478 } |
| 489 | 479 |
| 490 void _writeHeader(); // TODO(ajohnsen): Better name. | 480 void _writeHeader(); // TODO(ajohnsen): Better name. |
| 491 } | 481 } |
| 492 | 482 |
| 493 | 483 |
| 494 class _HttpOutboundConsumer implements StreamConsumer { | 484 class _HttpOutboundConsumer implements StreamConsumer { |
| 495 Function _consume; | 485 StreamController _controller; |
| 486 StreamSubscription _subscription; | |
| 487 Function _addStream; | |
| 496 IOSink _ioSink; | 488 IOSink _ioSink; |
| 497 bool _asGZip; | 489 Completer _closeCompleter = new Completer(); |
| 490 Completer _completer; | |
| 491 | |
| 492 _HttpOutboundMessage _outbound; | |
| 498 _HttpOutboundConsumer(IOSink this._ioSink, | 493 _HttpOutboundConsumer(IOSink this._ioSink, |
| 499 Function this._consume, | 494 Function this._addStream, |
| 500 bool this._asGZip); | 495 _HttpOutboundMessage this._outbound); |
| 501 | 496 |
| 502 Future consume(var stream) => _consume(_ioSink, stream, _asGZip); | 497 void _onPause() { |
| 498 if (_controller.isPaused) { | |
| 499 _subscription.pause(); | |
| 500 } else { | |
| 501 _subscription.resume(); | |
| 502 } | |
| 503 } | |
| 504 | |
| 505 void _onListen() { | |
| 506 if (!_controller.hasListener && _subscription != null) { | |
| 507 _subscription.cancel(); | |
| 508 } | |
| 509 } | |
| 510 | |
| 511 _ensureController() { | |
| 512 if (_controller != null) return; | |
| 513 _controller = new StreamController(onPauseStateChange: _onPause, | |
| 514 onSubscriptionStateChange: _onListen); | |
| 515 _addStream(_ioSink, _controller.stream) | |
| 516 .then((v) { | |
|
Søren Gjesse
2013/04/15 06:56:30
v -> _
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 517 _done(); | |
| 518 _closeCompleter.complete(_outbound); | |
| 519 }, | |
| 520 onError: (error) { | |
| 521 if (!_done(error)) { | |
| 522 _closeCompleter.completeError(error); | |
| 523 } | |
| 524 }); | |
| 525 } | |
| 526 | |
| 527 bool _done([error]) { | |
| 528 if (_completer == null) return false; | |
| 529 var tmp = _completer; | |
| 530 _completer = null; | |
| 531 if (error != null) { | |
| 532 tmp.completeError(error); | |
| 533 } else { | |
| 534 tmp.complete(_outbound); | |
| 535 } | |
| 536 return true; | |
| 537 } | |
| 503 | 538 |
| 504 Future addStream(var stream) { | 539 Future addStream(var stream) { |
| 505 throw new UnimplementedError("_HttpOutboundConsumer.addStream"); | 540 _ensureController(); |
| 541 _completer = new Completer(); | |
| 542 _subscription = stream.listen( | |
| 543 (data) { | |
| 544 _controller.add(data); | |
| 545 }, | |
| 546 onDone: () { | |
| 547 _done(); | |
| 548 }, | |
| 549 onError: (error) { | |
| 550 _done(error); | |
| 551 }, | |
| 552 unsubscribeOnError: true); | |
| 553 return _completer.future; | |
| 506 } | 554 } |
| 507 | 555 |
| 508 Future close() { | 556 Future close() { |
| 509 throw new UnimplementedError("_HttpOutboundConsumer.close"); | 557 _ensureController(); |
| 558 _controller.close(); | |
| 559 return _closeCompleter.future.then((_) { | |
| 560 return _ioSink.close().then((_) => _outbound); | |
| 561 }); | |
| 510 } | 562 } |
| 511 } | 563 } |
| 512 | 564 |
| 513 | 565 |
| 514 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { | 566 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { |
| 515 const int MIN_CHUNK_SIZE = 4 * 1024; | 567 const int MIN_CHUNK_SIZE = 4 * 1024; |
| 516 const int MAX_BUFFER_SIZE = 16 * 1024; | 568 const int MAX_BUFFER_SIZE = 16 * 1024; |
| 517 | 569 |
| 518 final _BufferList _buffer = new _BufferList(); | 570 final _BufferList _buffer = new _BufferList(); |
| 519 | 571 |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 531 } | 583 } |
| 532 } | 584 } |
| 533 | 585 |
| 534 void handleDone(EventSink<List<int>> sink) { | 586 void handleDone(EventSink<List<int>> sink) { |
| 535 flush(sink); | 587 flush(sink); |
| 536 sink.close(); | 588 sink.close(); |
| 537 } | 589 } |
| 538 | 590 |
| 539 void flush(EventSink<List<int>> sink) { | 591 void flush(EventSink<List<int>> sink) { |
| 540 if (_buffer.length > 0) { | 592 if (_buffer.length > 0) { |
| 541 sink.add(_buffer.readBytes()); | 593 var data = _buffer.readBytes(); |
|
Søren Gjesse
2013/04/15 06:56:30
Why this change?
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 594 sink.add(data); | |
| 542 _buffer.clear(); | 595 _buffer.clear(); |
| 543 } | 596 } |
| 544 } | 597 } |
| 545 } | 598 } |
| 546 | 599 |
| 547 | 600 |
| 548 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> | 601 class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
| 549 implements HttpResponse { | 602 implements HttpResponse { |
| 550 int statusCode = 200; | 603 int statusCode = 200; |
| 551 String _reasonPhrase; | 604 String _reasonPhrase; |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 567 _reasonPhrase = reasonPhrase; | 620 _reasonPhrase = reasonPhrase; |
| 568 } | 621 } |
| 569 | 622 |
| 570 Future<Socket> detachSocket() { | 623 Future<Socket> detachSocket() { |
| 571 if (_headersWritten) throw new StateError("Headers already sent"); | 624 if (_headersWritten) throw new StateError("Headers already sent"); |
| 572 _writeHeaders(); | 625 _writeHeaders(); |
| 573 var future = _httpRequest._httpConnection.detachSocket(); | 626 var future = _httpRequest._httpConnection.detachSocket(); |
| 574 // Close connection so the socket is 'free'. | 627 // Close connection so the socket is 'free'. |
| 575 close(); | 628 close(); |
| 576 done.catchError((_) { | 629 done.catchError((_) { |
| 577 // Catch any error on done, as they automatically will be propegated to | 630 // Catch any error on done, as they automatically will be |
| 578 // the websocket. | 631 // propegated to the websocket. |
|
Søren Gjesse
2013/04/15 06:56:30
propagated
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 579 }); | 632 }); |
| 580 return future; | 633 return future; |
| 581 } | 634 } |
| 582 | 635 |
| 583 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; | 636 HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo; |
| 584 | 637 |
| 585 void _writeHeader() { | 638 void _writeHeader() { |
| 586 var buffer = new _BufferList(); | 639 var buffer = new _BufferList(); |
| 587 writeSP() => buffer.add(const [_CharCode.SP]); | 640 writeSP() => buffer.add(const [_CharCode.SP]); |
| 588 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); | 641 writeCRLF() => buffer.add(const [_CharCode.CR, _CharCode.LF]); |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 625 _cookies.forEach((cookie) { | 678 _cookies.forEach((cookie) { |
| 626 headers.add(HttpHeaders.SET_COOKIE, cookie); | 679 headers.add(HttpHeaders.SET_COOKIE, cookie); |
| 627 }); | 680 }); |
| 628 } | 681 } |
| 629 | 682 |
| 630 headers._finalize(); | 683 headers._finalize(); |
| 631 | 684 |
| 632 // Write headers. | 685 // Write headers. |
| 633 headers._write(buffer); | 686 headers._write(buffer); |
| 634 writeCRLF(); | 687 writeCRLF(); |
| 635 _ioSink.add(buffer.readBytes()); | 688 _headersSink.add(buffer.readBytes()); |
| 636 } | 689 } |
| 637 | 690 |
| 638 String _findReasonPhrase(int statusCode) { | 691 String _findReasonPhrase(int statusCode) { |
| 639 if (_reasonPhrase != null) { | 692 if (_reasonPhrase != null) { |
| 640 return _reasonPhrase; | 693 return _reasonPhrase; |
| 641 } | 694 } |
| 642 | 695 |
| 643 switch (statusCode) { | 696 switch (statusCode) { |
| 644 case HttpStatus.CONTINUE: return "Continue"; | 697 case HttpStatus.CONTINUE: return "Continue"; |
| 645 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; | 698 case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols"; |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 724 _HttpClientConnection this._httpClientConnection) | 777 _HttpClientConnection this._httpClientConnection) |
| 725 : super("1.1", outgoing) { | 778 : super("1.1", outgoing) { |
| 726 // GET and HEAD have 'content-length: 0' by default. | 779 // GET and HEAD have 'content-length: 0' by default. |
| 727 if (method == "GET" || method == "HEAD") { | 780 if (method == "GET" || method == "HEAD") { |
| 728 contentLength = 0; | 781 contentLength = 0; |
| 729 } | 782 } |
| 730 } | 783 } |
| 731 | 784 |
| 732 Future<HttpClientResponse> get done { | 785 Future<HttpClientResponse> get done { |
| 733 if (_response == null) { | 786 if (_response == null) { |
| 734 _response = Future.wait([_responseCompleter.future, super.done]) | 787 _response = Future.wait([_responseCompleter.future, |
| 788 super.done]) | |
| 735 .then((list) => list[0]); | 789 .then((list) => list[0]); |
| 736 } | 790 } |
| 737 return _response; | 791 return _response; |
| 738 } | 792 } |
| 739 | 793 |
| 740 Future<HttpClientResponse> consume(Stream<List<int>> stream) { | |
| 741 super.consume(stream); | |
| 742 return done; | |
| 743 } | |
| 744 | |
| 745 Future<HttpClientResponse> close() { | 794 Future<HttpClientResponse> close() { |
| 746 super.close(); | 795 super.close(); |
| 747 return done; | 796 return done; |
| 748 } | 797 } |
| 749 | 798 |
| 750 int get maxRedirects => _maxRedirects; | 799 int get maxRedirects => _maxRedirects; |
| 751 void set maxRedirects(int maxRedirects) { | 800 void set maxRedirects(int maxRedirects) { |
| 752 if (_headersWritten) throw new StateError("Request already sent"); | 801 if (_headersWritten) throw new StateError("Request already sent"); |
| 753 _maxRedirects = maxRedirects; | 802 _maxRedirects = maxRedirects; |
| 754 } | 803 } |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 830 sb.write(cookies[i].value); | 879 sb.write(cookies[i].value); |
| 831 } | 880 } |
| 832 headers.add(HttpHeaders.COOKIE, sb.toString()); | 881 headers.add(HttpHeaders.COOKIE, sb.toString()); |
| 833 } | 882 } |
| 834 | 883 |
| 835 headers._finalize(); | 884 headers._finalize(); |
| 836 | 885 |
| 837 // Write headers. | 886 // Write headers. |
| 838 headers._write(buffer); | 887 headers._write(buffer); |
| 839 writeCRLF(); | 888 writeCRLF(); |
| 840 _ioSink.add(buffer.readBytes()); | 889 _headersSink.add(buffer.readBytes()); |
| 841 } | 890 } |
| 842 } | 891 } |
| 843 | 892 |
| 844 | 893 |
| 845 // Transformer that transforms data to HTTP Chunked Encoding. | 894 // Transformer that transforms data to HTTP Chunked Encoding. |
| 846 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { | 895 class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
| 847 void handleData(List<int> data, EventSink<List<int>> sink) { | 896 void handleData(List<int> data, EventSink<List<int>> sink) { |
| 848 sink.add(_chunkHeader(data.length)); | 897 sink.add(_chunkHeader(data.length)); |
| 849 if (data.length > 0) sink.add(data); | 898 if (data.length > 0) sink.add(data); |
| 850 sink.add(_chunkFooter); | 899 sink.add(_chunkFooter); |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 906 " $_bytesWritten bytes written while expected " | 955 " $_bytesWritten bytes written while expected " |
| 907 "$expectedContentLength."))); | 956 "$expectedContentLength."))); |
| 908 } | 957 } |
| 909 sink.close(); | 958 sink.close(); |
| 910 } | 959 } |
| 911 } | 960 } |
| 912 | 961 |
| 913 | 962 |
| 914 // Extends StreamConsumer as this is an internal type, only used to pipe to. | 963 // Extends StreamConsumer as this is an internal type, only used to pipe to. |
| 915 class _HttpOutgoing implements StreamConsumer<List<int>> { | 964 class _HttpOutgoing implements StreamConsumer<List<int>> { |
| 916 Function _onStream; | 965 final Completer _doneCompleter = new Completer(); |
| 917 final Completer _consumeCompleter = new Completer(); | 966 final StreamConsumer _consumer; |
| 918 | 967 |
| 919 Future onStream(Future callback(Stream<List<int>> stream)) { | 968 _HttpOutgoing(StreamConsumer this._consumer); |
| 920 _onStream = callback; | |
| 921 return _consumeCompleter.future; | |
| 922 } | |
| 923 | |
| 924 Future consume(Stream<List<int>> stream) { | |
| 925 _onStream(stream) | |
| 926 .then((_) => _consumeCompleter.complete(), | |
| 927 onError: _consumeCompleter.completeError); | |
| 928 // Use .then to ensure a Future branch. | |
| 929 return _consumeCompleter.future.then((_) => this); | |
| 930 } | |
| 931 | 969 |
| 932 Future addStream(Stream<List<int>> stream) { | 970 Future addStream(Stream<List<int>> stream) { |
| 933 throw new UnimplementedError("_HttpOutgoing.addStream"); | 971 return _consumer.addStream(stream) |
| 972 .catchError((error) { | |
| 973 _doneCompleter.completeError(error); | |
| 974 throw error; | |
| 975 }); | |
| 934 } | 976 } |
| 935 | 977 |
| 936 Future close() { | 978 Future close() { |
| 937 throw new UnimplementedError("_HttpOutgoing.close"); | 979 _doneCompleter.complete(_consumer); |
| 980 return new Future.immediate(null); | |
| 938 } | 981 } |
| 982 | |
| 983 Future get done => _doneCompleter.future; | |
| 939 } | 984 } |
| 940 | 985 |
| 941 | 986 |
| 942 class _HttpClientConnection { | 987 class _HttpClientConnection { |
| 943 final String key; | 988 final String key; |
| 944 final Socket _socket; | 989 final Socket _socket; |
| 945 final _HttpParser _httpParser; | 990 final _HttpParser _httpParser; |
| 946 StreamSubscription _subscription; | 991 StreamSubscription _subscription; |
| 947 final _HttpClient _httpClient; | 992 final _HttpClient _httpClient; |
| 948 | 993 |
| 949 Completer<_HttpIncoming> _nextResponseCompleter; | 994 Completer<_HttpIncoming> _nextResponseCompleter; |
| 950 Future _streamFuture; | 995 Future _streamFuture; |
| 951 | 996 |
| 952 _HttpClientConnection(String this.key, | 997 _HttpClientConnection(String this.key, |
| 953 Socket this._socket, | 998 Socket this._socket, |
| 954 _HttpClient this._httpClient) | 999 _HttpClient this._httpClient) |
| 955 : _httpParser = new _HttpParser.responseParser() { | 1000 : _httpParser = new _HttpParser.responseParser() { |
| 956 _socket.pipe(_httpParser); | 1001 _socket.pipe(_httpParser); |
| 957 _socket.done.catchError((e) { destroy(); }); | |
| 958 | 1002 |
| 959 // Set up handlers on the parser here, so we are sure to get 'onDone' from | 1003 // Set up handlers on the parser here, so we are sure to get 'onDone' from |
| 960 // the parser. | 1004 // the parser. |
| 961 _subscription = _httpParser.listen( | 1005 _subscription = _httpParser.listen( |
| 962 (incoming) { | 1006 (incoming) { |
| 963 // Only handle one incoming response at the time. Keep the | 1007 // Only handle one incoming response at the time. Keep the |
| 964 // stream paused until the response have been processed. | 1008 // stream paused until the response have been processed. |
| 965 _subscription.pause(); | 1009 _subscription.pause(); |
| 966 // We assume the response is not here, until we have send the request. | 1010 // We assume the response is not here, until we have send the request. |
| 967 assert(_nextResponseCompleter != null); | 1011 assert(_nextResponseCompleter != null); |
| 968 var completer = _nextResponseCompleter; | 1012 var completer = _nextResponseCompleter; |
| 969 _nextResponseCompleter = null; | 1013 _nextResponseCompleter = null; |
| 970 completer.complete(incoming); | 1014 completer.complete(incoming); |
| 971 }, | 1015 }, |
| 972 onError: (error) { | 1016 onError: (error) { |
| 973 if (_nextResponseCompleter != null) { | 1017 if (_nextResponseCompleter != null) { |
| 974 _nextResponseCompleter.completeError(error); | 1018 _nextResponseCompleter.completeError(error); |
| 975 _nextResponseCompleter = null; | 1019 _nextResponseCompleter = null; |
| 976 } | 1020 } |
| 977 }, | 1021 }, |
| 978 onDone: () { | 1022 onDone: () { |
| 979 close(); | 1023 close(); |
| 980 }); | 1024 }); |
| 981 } | 1025 } |
| 982 | 1026 |
| 983 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { | 1027 _HttpClientRequest send(Uri uri, int port, String method, bool isDirect) { |
| 984 // Start with pausing the parser. | 1028 // Start with pausing the parser. |
| 985 _subscription.pause(); | 1029 _subscription.pause(); |
| 986 var outgoing = new _HttpOutgoing(); | 1030 var outgoing = new _HttpOutgoing(_socket); |
| 987 // Create new request object, wrapping the outgoing connection. | 1031 // Create new request object, wrapping the outgoing connection. |
| 988 var request = new _HttpClientRequest(outgoing, | 1032 var request = new _HttpClientRequest(outgoing, |
| 989 uri, | 1033 uri, |
| 990 method, | 1034 method, |
| 991 !isDirect, | 1035 !isDirect, |
| 992 _httpClient, | 1036 _httpClient, |
| 993 this); | 1037 this); |
| 994 request.headers.host = uri.domain; | 1038 request.headers.host = uri.domain; |
| 995 request.headers.port = port; | 1039 request.headers.port = port; |
| 996 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); | 1040 request.headers.set(HttpHeaders.ACCEPT_ENCODING, "gzip"); |
| 997 if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 1041 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
| 998 // If the URL contains user information use that for basic | 1042 // If the URL contains user information use that for basic |
| 999 // authorization | 1043 // authorization |
| 1000 String auth = | 1044 String auth = |
| 1001 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); | 1045 CryptoUtils.bytesToBase64(_encodeString(uri.userInfo)); |
| 1002 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 1046 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
| 1003 } else { | 1047 } else { |
| 1004 // Look for credentials. | 1048 // Look for credentials. |
| 1005 _Credentials cr = _httpClient._findCredentials(uri); | 1049 _Credentials cr = _httpClient._findCredentials(uri); |
| 1006 if (cr != null) { | 1050 if (cr != null) { |
| 1007 cr.authorize(request); | 1051 cr.authorize(request); |
| 1008 } | 1052 } |
| 1009 } | 1053 } |
| 1010 // Start sending the request (lazy, delayed until the user provides | 1054 // Start sending the request (lazy, delayed until the user provides |
| 1011 // data). | 1055 // data). |
| 1012 _httpParser.responseToMethod = method; | 1056 _httpParser.responseToMethod = method; |
| 1013 _streamFuture = outgoing.onStream((stream) { | 1057 _streamFuture = outgoing.done |
| 1014 return _socket.writeStream(stream) | 1058 .then((s) { |
| 1015 .then((s) { | 1059 // Request sent, set up response completer. |
| 1016 // Request sent, set up response completer. | 1060 _nextResponseCompleter = new Completer(); |
| 1017 _nextResponseCompleter = new Completer(); | |
| 1018 | 1061 |
| 1019 // Listen for response. | 1062 // Listen for response. |
| 1020 _nextResponseCompleter.future | 1063 _nextResponseCompleter.future |
| 1021 .then((incoming) { | 1064 .then((incoming) { |
| 1022 incoming.dataDone.then((_) { | 1065 incoming.dataDone.then((_) { |
| 1023 if (incoming.headers.persistentConnection && | 1066 if (incoming.headers.persistentConnection && |
| 1024 request.persistentConnection) { | 1067 request.persistentConnection) { |
| 1025 // Return connection, now we are done. | 1068 // Return connection, now we are done. |
| 1026 _httpClient._returnConnection(this); | 1069 _httpClient._returnConnection(this); |
| 1027 _subscription.resume(); | 1070 _subscription.resume(); |
| 1028 } else { | 1071 } else { |
| 1029 destroy(); | |
| 1030 } | |
| 1031 }); | |
| 1032 request._onIncoming(incoming); | |
| 1033 }) | |
| 1034 // If we see a state error, we failed to get the 'first' | |
| 1035 // element. | |
| 1036 // Transform the error to a HttpParserException, for | |
| 1037 // consistency. | |
| 1038 .catchError((error) { | |
| 1039 throw new HttpParserException( | |
| 1040 "Connection closed before data was received"); | |
| 1041 }, test: (error) => error is StateError) | |
| 1042 .catchError((error) { | |
| 1043 // We are done with the socket. | |
| 1044 destroy(); | 1072 destroy(); |
| 1045 request._onError(error); | 1073 } |
| 1046 }); | 1074 }); |
| 1075 request._onIncoming(incoming); | |
| 1076 }) | |
| 1077 // If we see a state error, we failed to get the 'first' | |
| 1078 // element. | |
| 1079 // Transform the error to a HttpParserException, for | |
| 1080 // consistency. | |
| 1081 .catchError((error) { | |
| 1082 throw new HttpParserException( | |
| 1083 "Connection closed before data was received"); | |
| 1084 }, test: (error) => error is StateError) | |
| 1085 .catchError((error) { | |
| 1086 // We are done with the socket. | |
| 1087 destroy(); | |
| 1088 request._onError(error); | |
| 1089 }); | |
| 1047 | 1090 |
| 1048 // Resume the parser now we have a handler. | 1091 // Resume the parser now we have a handler. |
| 1049 _subscription.resume(); | 1092 _subscription.resume(); |
| 1050 return s; | 1093 return s; |
| 1051 }, onError: (e) { | 1094 }, onError: (e) { |
| 1052 destroy(); | 1095 destroy(); |
| 1053 throw e; | 1096 }); |
| 1054 }); | |
| 1055 }); | |
| 1056 return request; | 1097 return request; |
| 1057 } | 1098 } |
| 1058 | 1099 |
| 1059 Future<Socket> detachSocket() { | 1100 Future<Socket> detachSocket() { |
| 1060 return _streamFuture | 1101 return _streamFuture.then( |
| 1061 .then((_) => new _DetachedSocket(_socket, _httpParser.detachIncoming()), | 1102 (_) => new _DetachedSocket(_socket, _httpParser.detachIncoming())); |
| 1062 onError: (_) {}); | |
| 1063 } | 1103 } |
| 1064 | 1104 |
| 1065 void destroy() { | 1105 void destroy() { |
| 1066 _httpClient._connectionClosed(this); | 1106 _httpClient._connectionClosed(this); |
| 1067 _socket.destroy(); | 1107 _socket.destroy(); |
| 1068 } | 1108 } |
| 1069 | 1109 |
| 1070 void close() { | 1110 void close() { |
| 1071 _httpClient._connectionClosed(this); | 1111 _httpClient._connectionClosed(this); |
| 1072 _streamFuture | 1112 _streamFuture |
| 1073 // TODO(ajohnsen): Add timeout. | 1113 // TODO(ajohnsen): Add timeout. |
| 1074 .then((_) => _socket.destroy(), | 1114 .then((_) => _socket.destroy()); |
| 1075 onError: (_) {}); | |
| 1076 } | 1115 } |
| 1077 | 1116 |
| 1078 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); | 1117 HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket); |
| 1079 } | 1118 } |
| 1080 | 1119 |
| 1081 class _ConnnectionInfo { | 1120 class _ConnnectionInfo { |
| 1082 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); | 1121 _ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy); |
| 1083 final _HttpClientConnection connection; | 1122 final _HttpClientConnection connection; |
| 1084 final _Proxy proxy; | 1123 final _Proxy proxy; |
| 1085 } | 1124 } |
| (...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1370 final Socket _socket; | 1409 final Socket _socket; |
| 1371 final _HttpServer _httpServer; | 1410 final _HttpServer _httpServer; |
| 1372 final _HttpParser _httpParser; | 1411 final _HttpParser _httpParser; |
| 1373 StreamSubscription _subscription; | 1412 StreamSubscription _subscription; |
| 1374 | 1413 |
| 1375 Future _streamFuture; | 1414 Future _streamFuture; |
| 1376 | 1415 |
| 1377 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) | 1416 _HttpConnection(Socket this._socket, _HttpServer this._httpServer) |
| 1378 : _httpParser = new _HttpParser.requestParser() { | 1417 : _httpParser = new _HttpParser.requestParser() { |
| 1379 _socket.pipe(_httpParser); | 1418 _socket.pipe(_httpParser); |
| 1380 _socket.done.catchError((e) => destroy()); | |
| 1381 _subscription = _httpParser.listen( | 1419 _subscription = _httpParser.listen( |
| 1382 (incoming) { | 1420 (incoming) { |
| 1383 // Only handle one incoming request at the time. Keep the | 1421 // Only handle one incoming request at the time. Keep the |
| 1384 // stream paused until the request has been send. | 1422 // stream paused until the request has been send. |
| 1385 _subscription.pause(); | 1423 _subscription.pause(); |
| 1386 _state = _ACTIVE; | 1424 _state = _ACTIVE; |
| 1387 var outgoing = new _HttpOutgoing(); | 1425 var outgoing = new _HttpOutgoing(_socket); |
| 1388 var response = new _HttpResponse(incoming.headers.protocolVersion, | 1426 var response = new _HttpResponse(incoming.headers.protocolVersion, |
| 1389 outgoing); | 1427 outgoing); |
| 1390 var request = new _HttpRequest(response, incoming, _httpServer, this); | 1428 var request = new _HttpRequest(response, incoming, _httpServer, this); |
| 1391 outgoing.onStream((stream) { | 1429 _streamFuture = outgoing.done |
| 1392 return _streamFuture = _socket.writeStream(stream) | 1430 .then((_) { |
| 1393 .then((_) { | 1431 if (_state == _DETACHED) return; |
| 1394 if (_state == _DETACHED) return; | 1432 if (response.persistentConnection && |
| 1395 if (response.persistentConnection && | 1433 request.persistentConnection && |
| 1396 request.persistentConnection && | 1434 incoming.fullBodyRead) { |
| 1397 incoming.fullBodyRead) { | 1435 _state = _IDLE; |
| 1398 _state = _IDLE; | 1436 // Resume the subscription for incoming requests as the |
| 1399 // Resume the subscription for incoming requests as the | 1437 // request is now processed. |
| 1400 // request is now processed. | 1438 _subscription.resume(); |
| 1401 _subscription.resume(); | 1439 } else { |
| 1402 } else { | 1440 // Close socket, keep-alive not used or body sent before |
| 1403 // Close socket, keep-alive not used or body sent before | 1441 // received data was handled. |
| 1404 // received data was handled. | |
| 1405 destroy(); | |
| 1406 } | |
| 1407 }) | |
| 1408 .catchError((e) { | |
| 1409 destroy(); | 1442 destroy(); |
| 1410 throw e; | 1443 } |
| 1411 }); | 1444 }) |
| 1412 }); | 1445 .catchError((e) { |
| 1446 destroy(); | |
| 1447 }); | |
| 1413 response._ignoreBody = request.method == "HEAD"; | 1448 response._ignoreBody = request.method == "HEAD"; |
| 1414 response._httpRequest = request; | 1449 response._httpRequest = request; |
| 1415 _httpServer._handleRequest(request); | 1450 _httpServer._handleRequest(request); |
| 1416 }, | 1451 }, |
| 1417 onDone: () { | 1452 onDone: () { |
| 1418 destroy(); | 1453 destroy(); |
| 1419 }, | 1454 }, |
| 1420 onError: (error) { | 1455 onError: (error) { |
| 1421 _httpServer._handleError(error); | 1456 _httpServer._handleError(error); |
| 1422 destroy(); | 1457 destroy(); |
| (...skipping 261 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1684 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); | 1719 void writeCharCode(int charCode) => _socket.writeCharCode(charCode); |
| 1685 | 1720 |
| 1686 void writeAll(Iterable objects, [String separator = ""]) { | 1721 void writeAll(Iterable objects, [String separator = ""]) { |
| 1687 _socket.writeAll(objects, separator); | 1722 _socket.writeAll(objects, separator); |
| 1688 } | 1723 } |
| 1689 | 1724 |
| 1690 void add(List<int> bytes) => _socket.add(bytes); | 1725 void add(List<int> bytes) => _socket.add(bytes); |
| 1691 | 1726 |
| 1692 void addError(AsyncError error) => _socket.addError(error); | 1727 void addError(AsyncError error) => _socket.addError(error); |
| 1693 | 1728 |
| 1694 Future<Socket> consume(Stream<List<int>> stream) { | |
| 1695 return _socket.consume(stream); | |
| 1696 } | |
| 1697 | |
| 1698 Future<Socket> addStream(Stream<List<int>> stream) { | 1729 Future<Socket> addStream(Stream<List<int>> stream) { |
| 1699 return _socket.addStream(stream); | 1730 return _socket.addStream(stream); |
| 1700 } | 1731 } |
| 1701 | 1732 |
| 1702 Future<Socket> writeStream(Stream<List<int>> stream) { | |
| 1703 return _socket.writeStream(stream); | |
| 1704 } | |
| 1705 | |
| 1706 void destroy() => _socket.destroy(); | 1733 void destroy() => _socket.destroy(); |
| 1707 | 1734 |
| 1708 Future close() => _socket.close(); | 1735 Future close() => _socket.close(); |
| 1709 | 1736 |
| 1710 Future<Socket> get done => _socket.done; | 1737 Future<Socket> get done => _socket.done; |
| 1711 | 1738 |
| 1712 int get port => _socket.port; | 1739 int get port => _socket.port; |
| 1713 | 1740 |
| 1714 String get remoteHost => _socket.remoteHost; | 1741 String get remoteHost => _socket.remoteHost; |
| 1715 | 1742 |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1826 | 1853 |
| 1827 | 1854 |
| 1828 class _RedirectInfo implements RedirectInfo { | 1855 class _RedirectInfo implements RedirectInfo { |
| 1829 const _RedirectInfo(int this.statusCode, | 1856 const _RedirectInfo(int this.statusCode, |
| 1830 String this.method, | 1857 String this.method, |
| 1831 Uri this.location); | 1858 Uri this.location); |
| 1832 final int statusCode; | 1859 final int statusCode; |
| 1833 final String method; | 1860 final String method; |
| 1834 final Uri location; | 1861 final Uri location; |
| 1835 } | 1862 } |
| OLD | NEW |