| Index: sdk/lib/io/http_impl.dart
|
| diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart
|
| index 36cfa29575f134f52311cd5a80e938b4f117e585..b586f0974463c417c159ace7a534484078e05322 100644
|
| --- a/sdk/lib/io/http_impl.dart
|
| +++ b/sdk/lib/io/http_impl.dart
|
| @@ -513,12 +513,12 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
|
| stream.drain().catchError((_) {});
|
| return _headersSink.close();
|
| }
|
| - stream = stream.transform(new _BufferTransformer());
|
| + stream = stream.transform(const _BufferTransformer());
|
| if (headers.chunkedTransferEncoding) {
|
| if (_asGZip) {
|
| stream = stream.transform(GZIP.encoder);
|
| }
|
| - stream = stream.transform(new _ChunkedTransformer());
|
| + stream = stream.transform(const _ChunkedTransformer());
|
| } else if (contentLength >= 0) {
|
| stream = stream.transform(
|
| new _ContentLengthValidator(contentLength, _uri));
|
| @@ -646,39 +646,56 @@ class _HttpOutboundConsumer implements StreamConsumer {
|
| }
|
|
|
|
|
| -class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> {
|
| +class _BufferTransformerSink implements EventSink<List<int>> {
|
| static const int MIN_CHUNK_SIZE = 4 * 1024;
|
| static const int MAX_BUFFER_SIZE = 16 * 1024;
|
|
|
| final BytesBuilder _builder = new BytesBuilder();
|
| + final EventSink<List<int>> _outSink;
|
|
|
| - void handleData(List<int> data, EventSink<List<int>> sink) {
|
| + _BufferTransformerSink(this._outSink);
|
| +
|
| + void add(List<int> data) {
|
| // TODO(ajohnsen): Use timeout?
|
| if (data.length == 0) return;
|
| if (data.length >= MIN_CHUNK_SIZE) {
|
| - flush(sink);
|
| - sink.add(data);
|
| + flush();
|
| + _outSink.add(data);
|
| } else {
|
| _builder.add(data);
|
| if (_builder.length >= MAX_BUFFER_SIZE) {
|
| - flush(sink);
|
| + flush();
|
| }
|
| }
|
| }
|
|
|
| - void handleDone(EventSink<List<int>> sink) {
|
| - flush(sink);
|
| - sink.close();
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + _outSink.addError(error, stackTrace);
|
| + }
|
| +
|
| + void close() {
|
| + flush();
|
| + _outSink.close();
|
| }
|
|
|
| - void flush(EventSink<List<int>> sink) {
|
| + void flush() {
|
| if (_builder.length > 0) {
|
| // takeBytes will clear the BytesBuilder.
|
| - sink.add(_builder.takeBytes());
|
| + _outSink.add(_builder.takeBytes());
|
| }
|
| }
|
| }
|
|
|
| +class _BufferTransformer implements StreamTransformer<List<int>, List<int>> {
|
| + const _BufferTransformer();
|
| +
|
| + Stream<List<int>> bind(Stream<List<int>> stream) {
|
| + return new Stream<List<int>>.eventTransformed(
|
| + stream,
|
| + (EventSink outSink) => new _BufferTransformerSink(outSink));
|
| + }
|
| +}
|
| +
|
|
|
| class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
|
| implements HttpResponse {
|
| @@ -1025,19 +1042,26 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
| }
|
|
|
|
|
| -// Transformer that transforms data to HTTP Chunked Encoding.
|
| -class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> {
|
| +class _ChunkedTransformerSink implements EventSink<List<int>> {
|
| +
|
| int _pendingFooter = 0;
|
| + final EventSink<List<int>> _outSink;
|
| +
|
| + _ChunkedTransformerSink(this._outSink);
|
|
|
| - void handleData(List<int> data, EventSink<List<int>> sink) {
|
| - sink.add(_chunkHeader(data.length));
|
| - if (data.length > 0) sink.add(data);
|
| + void add(List<int> data) {
|
| + _outSink.add(_chunkHeader(data.length));
|
| + if (data.length > 0) _outSink.add(data);
|
| _pendingFooter = 2;
|
| }
|
|
|
| - void handleDone(EventSink<List<int>> sink) {
|
| - handleData(const [], sink);
|
| - sink.close();
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + _outSink.addError(error, stackTrace);
|
| + }
|
| +
|
| + void close() {
|
| + add(const []);
|
| + _outSink.close();
|
| }
|
|
|
| List<int> _chunkHeader(int length) {
|
| @@ -1077,40 +1101,68 @@ class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> {
|
| const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]);
|
| }
|
|
|
| +// Transformer that transforms data to HTTP Chunked Encoding.
|
| +class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
|
| + const _ChunkedTransformer();
|
| +
|
| + Stream<List<int>> bind(Stream<List<int>> stream) {
|
| + return new Stream<List<int>>.eventTransformed(
|
| + stream,
|
| + (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink));
|
| + }
|
| +}
|
|
|
| // Transformer that validates the content length.
|
| class _ContentLengthValidator
|
| - extends StreamEventTransformer<List<int>, List<int>> {
|
| + implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> {
|
| final int expectedContentLength;
|
| final Uri uri;
|
| int _bytesWritten = 0;
|
|
|
| + EventSink<List<int>> _outSink;
|
| +
|
| _ContentLengthValidator(int this.expectedContentLength, Uri this.uri);
|
|
|
| - void handleData(List<int> data, EventSink<List<int>> sink) {
|
| + Stream<List<int>> bind(Stream<List<int>> stream) {
|
| + return new Stream.eventTransformed(
|
| + stream,
|
| + (EventSink sink) {
|
| + if (_outSink != null) {
|
| + throw new StateError("Validator transformer already used");
|
| + }
|
| + _outSink = sink;
|
| + return this;
|
| + });
|
| + }
|
| +
|
| + void add(List<int> data) {
|
| _bytesWritten += data.length;
|
| if (_bytesWritten > expectedContentLength) {
|
| - sink.addError(new HttpException(
|
| + _outSink.addError(new HttpException(
|
| "Content size exceeds specified contentLength. "
|
| "$_bytesWritten bytes written while expected "
|
| "$expectedContentLength. "
|
| "[${new String.fromCharCodes(data)}]",
|
| uri: uri));
|
| - sink.close();
|
| + _outSink.close();
|
| } else {
|
| - sink.add(data);
|
| + _outSink.add(data);
|
| }
|
| }
|
|
|
| - void handleDone(EventSink<List<int>> sink) {
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + _outSink.addError(error, stackTrace);
|
| + }
|
| +
|
| + void close() {
|
| if (_bytesWritten < expectedContentLength) {
|
| - sink.addError(new HttpException(
|
| + _outSink.addError(new HttpException(
|
| "Content size below specified contentLength. "
|
| " $_bytesWritten bytes written while expected "
|
| "$expectedContentLength.",
|
| uri: uri));
|
| }
|
| - sink.close();
|
| + _outSink.close();
|
| }
|
| }
|
|
|
|
|