Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(163)

Side by Side Diff: sdk/lib/io/http_impl.dart

Issue 13680002: StreamConsumer has an addStream and a close functions. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Update comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/file_impl.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _HttpIncoming extends Stream<List<int>> { 7 class _HttpIncoming extends Stream<List<int>> {
8 final int _transferLength; 8 final int _transferLength;
9 final Completer _dataCompleter = new Completer(); 9 final Completer _dataCompleter = new Completer();
10 Stream<List<int>> _stream; 10 Stream<List<int>> _stream;
(...skipping 389 matching lines...) Expand 10 before | Expand all | Expand 10 after
400 _writeHeaders(); 400 _writeHeaders();
401 if (data.length == 0) return; 401 if (data.length == 0) return;
402 _ioSink.writeBytes(data); 402 _ioSink.writeBytes(data);
403 } 403 }
404 404
405 Future<T> consume(Stream<List<int>> stream) { 405 Future<T> consume(Stream<List<int>> stream) {
406 _writeHeaders(); 406 _writeHeaders();
407 return _ioSink.consume(stream); 407 return _ioSink.consume(stream);
408 } 408 }
409 409
410 Future<T> writeStream(Stream<List<int>> stream) { 410 Future<T> addStream(Stream<List<int>> stream) {
411 _writeHeaders(); 411 _writeHeaders();
412 return _ioSink.writeStream(stream).then((_) => this); 412 return _ioSink.writeStream(stream).then((_) => this);
413 } 413 }
414 414
415 void close() { 415 Future<T> writeStream(Stream<List<int>> stream) {
416 return addStream(stream);
417 }
418
419 Future close() {
416 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and 420 // TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
417 // persistentConnection is not guaranteed to be in sync. 421 // persistentConnection is not guaranteed to be in sync.
418 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) { 422 if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) {
419 // If no body was written, _ignoreBody is false (it's not a HEAD 423 // If no body was written, _ignoreBody is false (it's not a HEAD
420 // request) and the content-length is unspecified, set contentLength to 0. 424 // request) and the content-length is unspecified, set contentLength to 0.
421 headers.chunkedTransferEncoding = false; 425 headers.chunkedTransferEncoding = false;
422 headers.contentLength = 0; 426 headers.contentLength = 0;
423 } 427 }
424 _writeHeaders(); 428 _writeHeaders();
425 _ioSink.close(); 429 return _ioSink.close();
426 } 430 }
427 431
428 Future<T> get done { 432 Future<T> get done {
429 _writeHeaders(); 433 _writeHeaders();
430 return _ioSink.done; 434 return _ioSink.done;
431 } 435 }
432 436
433 void _writeHeaders() { 437 void _writeHeaders() {
434 if (_headersWritten) return; 438 if (_headersWritten) return;
435 _headersWritten = true; 439 _headersWritten = true;
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
480 484
481 class _HttpOutboundConsumer implements StreamConsumer { 485 class _HttpOutboundConsumer implements StreamConsumer {
482 Function _consume; 486 Function _consume;
483 IOSink _ioSink; 487 IOSink _ioSink;
484 bool _asGZip; 488 bool _asGZip;
485 _HttpOutboundConsumer(IOSink this._ioSink, 489 _HttpOutboundConsumer(IOSink this._ioSink,
486 Function this._consume, 490 Function this._consume,
487 bool this._asGZip); 491 bool this._asGZip);
488 492
489 Future consume(var stream) => _consume(_ioSink, stream, _asGZip); 493 Future consume(var stream) => _consume(_ioSink, stream, _asGZip);
494
495 Future addStream(var stream) {
496 throw new UnimplementedError("_HttpOutboundConsumer.addStream");
497 }
498
499 Future close() {
500 throw new UnimplementedError("_HttpOutboundConsumer.close");
501 }
490 } 502 }
491 503
492 504
493 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { 505 class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> {
494 const int MIN_CHUNK_SIZE = 4 * 1024; 506 const int MIN_CHUNK_SIZE = 4 * 1024;
495 const int MAX_BUFFER_SIZE = 16 * 1024; 507 const int MAX_BUFFER_SIZE = 16 * 1024;
496 508
497 final _BufferList _buffer = new _BufferList(); 509 final _BufferList _buffer = new _BufferList();
498 510
499 void handleData(List<int> data, EventSink<List<int>> sink) { 511 void handleData(List<int> data, EventSink<List<int>> sink) {
(...skipping 387 matching lines...) Expand 10 before | Expand all | Expand 10 after
887 return _consumeCompleter.future; 899 return _consumeCompleter.future;
888 } 900 }
889 901
890 Future consume(Stream<List<int>> stream) { 902 Future consume(Stream<List<int>> stream) {
891 _onStream(stream) 903 _onStream(stream)
892 .then((_) => _consumeCompleter.complete(), 904 .then((_) => _consumeCompleter.complete(),
893 onError: _consumeCompleter.completeError); 905 onError: _consumeCompleter.completeError);
894 // Use .then to ensure a Future branch. 906 // Use .then to ensure a Future branch.
895 return _consumeCompleter.future.then((_) => this); 907 return _consumeCompleter.future.then((_) => this);
896 } 908 }
909
910 Future addStream(Stream<List<int>> stream) {
911 throw new UnimplementedError("_HttpOutgoing.addStream");
912 }
913
914 Future close() {
915 throw new UnimplementedError("_HttpOutgoing.close");
916 }
897 } 917 }
898 918
899 919
900 class _HttpClientConnection { 920 class _HttpClientConnection {
901 final String key; 921 final String key;
902 final Socket _socket; 922 final Socket _socket;
903 final _HttpParser _httpParser; 923 final _HttpParser _httpParser;
904 StreamSubscription _subscription; 924 StreamSubscription _subscription;
905 final _HttpClient _httpClient; 925 final _HttpClient _httpClient;
906 926
(...skipping 739 matching lines...) Expand 10 before | Expand all | Expand 10 after
1646 void writeAll(Iterable objects, [String separator = ""]) { 1666 void writeAll(Iterable objects, [String separator = ""]) {
1647 _socket.writeAll(objects, separator); 1667 _socket.writeAll(objects, separator);
1648 } 1668 }
1649 1669
1650 void writeBytes(List<int> bytes) => _socket.writeBytes(bytes); 1670 void writeBytes(List<int> bytes) => _socket.writeBytes(bytes);
1651 1671
1652 Future<Socket> consume(Stream<List<int>> stream) { 1672 Future<Socket> consume(Stream<List<int>> stream) {
1653 return _socket.consume(stream); 1673 return _socket.consume(stream);
1654 } 1674 }
1655 1675
1676 Future<Socket> addStream(Stream<List<int>> stream) {
1677 return _socket.addStream(stream);
1678 }
1679
1656 Future<Socket> writeStream(Stream<List<int>> stream) { 1680 Future<Socket> writeStream(Stream<List<int>> stream) {
1657 return _socket.writeStream(stream); 1681 return _socket.writeStream(stream);
1658 } 1682 }
1659 1683
1660 void destroy() => _socket.destroy(); 1684 void destroy() => _socket.destroy();
1661 1685
1662 void close() => _socket.close(); 1686 Future close() => _socket.close();
1663 1687
1664 Future<Socket> get done => _socket.done; 1688 Future<Socket> get done => _socket.done;
1665 1689
1666 int get port => _socket.port; 1690 int get port => _socket.port;
1667 1691
1668 String get remoteHost => _socket.remoteHost; 1692 String get remoteHost => _socket.remoteHost;
1669 1693
1670 int get remotePort => _socket.remotePort; 1694 int get remotePort => _socket.remotePort;
1671 1695
1672 bool setOption(SocketOption option, bool enabled) { 1696 bool setOption(SocketOption option, bool enabled) {
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
1780 1804
1781 1805
1782 class _RedirectInfo implements RedirectInfo { 1806 class _RedirectInfo implements RedirectInfo {
1783 const _RedirectInfo(int this.statusCode, 1807 const _RedirectInfo(int this.statusCode,
1784 String this.method, 1808 String this.method,
1785 Uri this.location); 1809 Uri this.location);
1786 final int statusCode; 1810 final int statusCode;
1787 final String method; 1811 final String method;
1788 final Uri location; 1812 final Uri location;
1789 } 1813 }
OLDNEW
« no previous file with comments | « sdk/lib/io/file_impl.dart ('k') | sdk/lib/io/http_parser.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698