Index: sdk/lib/io/http_impl.dart |
diff --git a/sdk/lib/io/http_impl.dart b/sdk/lib/io/http_impl.dart |
index 36cfa29575f134f52311cd5a80e938b4f117e585..b586f0974463c417c159ace7a534484078e05322 100644 |
--- a/sdk/lib/io/http_impl.dart |
+++ b/sdk/lib/io/http_impl.dart |
@@ -513,12 +513,12 @@ abstract class _HttpOutboundMessage<T> implements IOSink { |
stream.drain().catchError((_) {}); |
return _headersSink.close(); |
} |
- stream = stream.transform(new _BufferTransformer()); |
+ stream = stream.transform(const _BufferTransformer()); |
if (headers.chunkedTransferEncoding) { |
if (_asGZip) { |
stream = stream.transform(GZIP.encoder); |
} |
- stream = stream.transform(new _ChunkedTransformer()); |
+ stream = stream.transform(const _ChunkedTransformer()); |
} else if (contentLength >= 0) { |
stream = stream.transform( |
new _ContentLengthValidator(contentLength, _uri)); |
@@ -646,39 +646,56 @@ class _HttpOutboundConsumer implements StreamConsumer { |
} |
-class _BufferTransformer extends StreamEventTransformer<List<int>, List<int>> { |
+class _BufferTransformerSink implements EventSink<List<int>> { |
static const int MIN_CHUNK_SIZE = 4 * 1024; |
static const int MAX_BUFFER_SIZE = 16 * 1024; |
final BytesBuilder _builder = new BytesBuilder(); |
+ final EventSink<List<int>> _outSink; |
- void handleData(List<int> data, EventSink<List<int>> sink) { |
+ _BufferTransformerSink(this._outSink); |
+ |
+ void add(List<int> data) { |
// TODO(ajohnsen): Use timeout? |
if (data.length == 0) return; |
if (data.length >= MIN_CHUNK_SIZE) { |
- flush(sink); |
- sink.add(data); |
+ flush(); |
+ _outSink.add(data); |
} else { |
_builder.add(data); |
if (_builder.length >= MAX_BUFFER_SIZE) { |
- flush(sink); |
+ flush(); |
} |
} |
} |
- void handleDone(EventSink<List<int>> sink) { |
- flush(sink); |
- sink.close(); |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _outSink.addError(error, stackTrace); |
+ } |
+ |
+ void close() { |
+ flush(); |
+ _outSink.close(); |
} |
- void flush(EventSink<List<int>> sink) { |
+ void flush() { |
if (_builder.length > 0) { |
// takeBytes will clear the BytesBuilder. |
- sink.add(_builder.takeBytes()); |
+ _outSink.add(_builder.takeBytes()); |
} |
} |
} |
+class _BufferTransformer implements StreamTransformer<List<int>, List<int>> { |
+ const _BufferTransformer(); |
+ |
+ Stream<List<int>> bind(Stream<List<int>> stream) { |
+ return new Stream<List<int>>.eventTransformed( |
+ stream, |
+ (EventSink outSink) => new _BufferTransformerSink(outSink)); |
+ } |
+} |
+ |
class _HttpResponse extends _HttpOutboundMessage<HttpResponse> |
implements HttpResponse { |
@@ -1025,19 +1042,26 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse> |
} |
-// Transformer that transforms data to HTTP Chunked Encoding. |
-class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
+class _ChunkedTransformerSink implements EventSink<List<int>> { |
+ |
int _pendingFooter = 0; |
+ final EventSink<List<int>> _outSink; |
+ |
+ _ChunkedTransformerSink(this._outSink); |
- void handleData(List<int> data, EventSink<List<int>> sink) { |
- sink.add(_chunkHeader(data.length)); |
- if (data.length > 0) sink.add(data); |
+ void add(List<int> data) { |
+ _outSink.add(_chunkHeader(data.length)); |
+ if (data.length > 0) _outSink.add(data); |
_pendingFooter = 2; |
} |
- void handleDone(EventSink<List<int>> sink) { |
- handleData(const [], sink); |
- sink.close(); |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _outSink.addError(error, stackTrace); |
+ } |
+ |
+ void close() { |
+ add(const []); |
+ _outSink.close(); |
} |
List<int> _chunkHeader(int length) { |
@@ -1077,40 +1101,68 @@ class _ChunkedTransformer extends StreamEventTransformer<List<int>, List<int>> { |
const [0x30, _CharCode.CR, _CharCode.LF, _CharCode.CR, _CharCode.LF]); |
} |
+// Transformer that transforms data to HTTP Chunked Encoding. |
+class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> { |
+ const _ChunkedTransformer(); |
+ |
+ Stream<List<int>> bind(Stream<List<int>> stream) { |
+ return new Stream<List<int>>.eventTransformed( |
+ stream, |
+ (EventSink<List<int>> sink) => new _ChunkedTransformerSink(sink)); |
+ } |
+} |
// Transformer that validates the content length. |
class _ContentLengthValidator |
- extends StreamEventTransformer<List<int>, List<int>> { |
+ implements StreamTransformer<List<int>, List<int>>, EventSink<List<int>> { |
final int expectedContentLength; |
final Uri uri; |
int _bytesWritten = 0; |
+ EventSink<List<int>> _outSink; |
+ |
_ContentLengthValidator(int this.expectedContentLength, Uri this.uri); |
- void handleData(List<int> data, EventSink<List<int>> sink) { |
+ Stream<List<int>> bind(Stream<List<int>> stream) { |
+ return new Stream.eventTransformed( |
+ stream, |
+ (EventSink sink) { |
+ if (_outSink != null) { |
+ throw new StateError("Validator transformer already used"); |
+ } |
+ _outSink = sink; |
+ return this; |
+ }); |
+ } |
+ |
+ void add(List<int> data) { |
_bytesWritten += data.length; |
if (_bytesWritten > expectedContentLength) { |
- sink.addError(new HttpException( |
+ _outSink.addError(new HttpException( |
"Content size exceeds specified contentLength. " |
"$_bytesWritten bytes written while expected " |
"$expectedContentLength. " |
"[${new String.fromCharCodes(data)}]", |
uri: uri)); |
- sink.close(); |
+ _outSink.close(); |
} else { |
- sink.add(data); |
+ _outSink.add(data); |
} |
} |
- void handleDone(EventSink<List<int>> sink) { |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _outSink.addError(error, stackTrace); |
+ } |
+ |
+ void close() { |
if (_bytesWritten < expectedContentLength) { |
- sink.addError(new HttpException( |
+ _outSink.addError(new HttpException( |
"Content size below specified contentLength. " |
" $_bytesWritten bytes written while expected " |
"$expectedContentLength.", |
uri: uri)); |
} |
- sink.close(); |
+ _outSink.close(); |
} |
} |