| Index: runtime/bin/socket_patch.dart
|
| diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
|
| index 81e8ab379728d001376dd6c6a8b4179a6f799f6d..0316652c4b3ccc2ae28ef7e13a582a0eae5fb8ac 100644
|
| --- a/runtime/bin/socket_patch.dart
|
| +++ b/runtime/bin/socket_patch.dart
|
| @@ -665,32 +665,13 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
|
| int offset;
|
| List<int> buffer;
|
| bool paused = false;
|
| + Completer streamCompleter;
|
|
|
| _SocketStreamConsumer(this.socket);
|
|
|
| - Future<Socket> consume(Stream<List<int>> stream) {
|
| - if (socket._raw != null) {
|
| - subscription = stream.listen(
|
| - (data) {
|
| - assert(!paused);
|
| - assert(buffer == null);
|
| - buffer = data;
|
| - offset = 0;
|
| - write();
|
| - },
|
| - onError: (error) {
|
| - socket._consumerDone(error);
|
| - },
|
| - onDone: () {
|
| - socket._consumerDone();
|
| - },
|
| - unsubscribeOnError: true);
|
| - }
|
| - return socket._doneFuture;
|
| - }
|
| -
|
| Future<Socket> addStream(Stream<List<int>> stream) {
|
| - Completer completer = new Completer<Socket>();
|
| + socket._ensureRawSocketSubscription();
|
| + streamCompleter = new Completer<Socket>();
|
| if (socket._raw != null) {
|
| subscription = stream.listen(
|
| (data) {
|
| @@ -701,20 +682,20 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
|
| write();
|
| },
|
| onError: (error) {
|
| - socket._consumerDone(error);
|
| - completer.completeError(error.error, error.stackTrace);
|
| + socket._consumerDone();
|
| + done(error);
|
| },
|
| onDone: () {
|
| - completer.complete(socket);
|
| + done();
|
| },
|
| unsubscribeOnError: true);
|
| }
|
| - return completer.future;
|
| + return streamCompleter.future;
|
| }
|
|
|
| Future<Socket> close() {
|
| socket._consumerDone();
|
| - return completer.future;
|
| + return new Future.immediate(socket);
|
| }
|
|
|
| void write() {
|
| @@ -743,7 +724,20 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
|
| }
|
| } catch (e) {
|
| stop();
|
| - socket._consumerDone(e);
|
| + socket._consumerDone();
|
| + done(e);
|
| + }
|
| + }
|
| +
|
| + void done([error]) {
|
| + if (streamCompleter != null) {
|
| + var tmp = streamCompleter;
|
| + streamCompleter = null;
|
| + if (error != null) {
|
| + tmp.completeError(error);
|
| + } else {
|
| + tmp.complete(socket);
|
| + }
|
| }
|
| }
|
|
|
| @@ -762,8 +756,7 @@ class _Socket extends Stream<List<int>> implements Socket {
|
| StreamController _controller;
|
| bool _controllerClosed = false;
|
| _SocketStreamConsumer _consumer;
|
| - IOSink<Socket> _sink;
|
| - Completer _doneCompleter;
|
| + IOSink _sink;
|
| var _subscription;
|
|
|
| _Socket(RawSocket this._raw) {
|
| @@ -817,15 +810,11 @@ class _Socket extends Stream<List<int>> implements Socket {
|
|
|
| void add(List<int> bytes) => _sink.add(bytes);
|
|
|
| - Future<Socket> consume(Stream<List<int>> stream) {
|
| - return _sink.consume(stream);
|
| - }
|
| -
|
| - Future<Socket> writeStream(Stream<List<int>> stream) {
|
| - return _sink.writeStream(stream);
|
| + Future<Socket> addStream(Stream<List<int>> stream) {
|
| + return _sink.addStream(stream);
|
| }
|
|
|
| - close() => _sink.close();
|
| + Future<Socket> close() => _sink.close();
|
|
|
| Future<Socket> get done => _sink.done;
|
|
|
| @@ -851,7 +840,7 @@ class _Socket extends Stream<List<int>> implements Socket {
|
| // consumer needs a subscription as they share the error and done
|
| // events from the raw socket.
|
| void _ensureRawSocketSubscription() {
|
| - if (_subscription == null) {
|
| + if (_subscription == null && _raw != null) {
|
| _subscription = _raw.listen(_onData,
|
| onError: _onError,
|
| onDone: _onDone,
|
| @@ -908,7 +897,7 @@ class _Socket extends Stream<List<int>> implements Socket {
|
| _controllerClosed = true;
|
| _controller.close();
|
| }
|
| - _done();
|
| + _consumer.done();
|
| }
|
|
|
| void _onError(error) {
|
| @@ -917,27 +906,7 @@ class _Socket extends Stream<List<int>> implements Socket {
|
| _controller.addError(error);
|
| _controller.close();
|
| }
|
| - _done(error);
|
| - }
|
| -
|
| - get _doneFuture {
|
| - if (_doneCompleter == null) {
|
| - _ensureRawSocketSubscription();
|
| - _doneCompleter = new Completer();
|
| - }
|
| - return _doneCompleter.future;
|
| - }
|
| -
|
| - void _done([error]) {
|
| - if (_doneCompleter != null) {
|
| - var tmp = _doneCompleter;
|
| - _doneCompleter = null;
|
| - if (error != null) {
|
| - tmp.completeError(error);
|
| - } else {
|
| - tmp.complete(this);
|
| - }
|
| - }
|
| + _consumer.done(error);
|
| }
|
|
|
| int _write(List<int> data, int offset, int length) =>
|
| @@ -953,8 +922,7 @@ class _Socket extends Stream<List<int>> implements Socket {
|
| }
|
| }
|
|
|
| - void _consumerDone([error]) {
|
| - _done(error);
|
| + void _consumerDone() {
|
| if (_raw != null) {
|
| _raw.shutdown(SocketDirection.SEND);
|
| _disableWriteEvent();
|
|
|