Index: runtime/bin/socket_patch.dart |
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart |
index 81e8ab379728d001376dd6c6a8b4179a6f799f6d..0316652c4b3ccc2ae28ef7e13a582a0eae5fb8ac 100644 |
--- a/runtime/bin/socket_patch.dart |
+++ b/runtime/bin/socket_patch.dart |
@@ -665,32 +665,13 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> { |
int offset; |
List<int> buffer; |
bool paused = false; |
+ Completer streamCompleter; |
_SocketStreamConsumer(this.socket); |
- Future<Socket> consume(Stream<List<int>> stream) { |
- if (socket._raw != null) { |
- subscription = stream.listen( |
- (data) { |
- assert(!paused); |
- assert(buffer == null); |
- buffer = data; |
- offset = 0; |
- write(); |
- }, |
- onError: (error) { |
- socket._consumerDone(error); |
- }, |
- onDone: () { |
- socket._consumerDone(); |
- }, |
- unsubscribeOnError: true); |
- } |
- return socket._doneFuture; |
- } |
- |
Future<Socket> addStream(Stream<List<int>> stream) { |
- Completer completer = new Completer<Socket>(); |
+ socket._ensureRawSocketSubscription(); |
+ streamCompleter = new Completer<Socket>(); |
if (socket._raw != null) { |
subscription = stream.listen( |
(data) { |
@@ -701,20 +682,20 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> { |
write(); |
}, |
onError: (error) { |
- socket._consumerDone(error); |
- completer.completeError(error.error, error.stackTrace); |
+ socket._consumerDone(); |
+ done(error); |
}, |
onDone: () { |
- completer.complete(socket); |
+ done(); |
}, |
unsubscribeOnError: true); |
} |
- return completer.future; |
+ return streamCompleter.future; |
} |
Future<Socket> close() { |
socket._consumerDone(); |
- return completer.future; |
+ return new Future.immediate(socket); |
} |
void write() { |
@@ -743,7 +724,20 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> { |
} |
} catch (e) { |
stop(); |
- socket._consumerDone(e); |
+ socket._consumerDone(); |
+ done(e); |
+ } |
+ } |
+ |
+ void done([error]) { |
+ if (streamCompleter != null) { |
+ var tmp = streamCompleter; |
+ streamCompleter = null; |
+ if (error != null) { |
+ tmp.completeError(error); |
+ } else { |
+ tmp.complete(socket); |
+ } |
} |
} |
@@ -762,8 +756,7 @@ class _Socket extends Stream<List<int>> implements Socket { |
StreamController _controller; |
bool _controllerClosed = false; |
_SocketStreamConsumer _consumer; |
- IOSink<Socket> _sink; |
- Completer _doneCompleter; |
+ IOSink _sink; |
var _subscription; |
_Socket(RawSocket this._raw) { |
@@ -817,15 +810,11 @@ class _Socket extends Stream<List<int>> implements Socket { |
void add(List<int> bytes) => _sink.add(bytes); |
- Future<Socket> consume(Stream<List<int>> stream) { |
- return _sink.consume(stream); |
- } |
- |
- Future<Socket> writeStream(Stream<List<int>> stream) { |
- return _sink.writeStream(stream); |
+ Future<Socket> addStream(Stream<List<int>> stream) { |
+ return _sink.addStream(stream); |
} |
- close() => _sink.close(); |
+ Future<Socket> close() => _sink.close(); |
Future<Socket> get done => _sink.done; |
@@ -851,7 +840,7 @@ class _Socket extends Stream<List<int>> implements Socket { |
// consumer needs a subscription as they share the error and done |
// events from the raw socket. |
void _ensureRawSocketSubscription() { |
- if (_subscription == null) { |
+ if (_subscription == null && _raw != null) { |
_subscription = _raw.listen(_onData, |
onError: _onError, |
onDone: _onDone, |
@@ -908,7 +897,7 @@ class _Socket extends Stream<List<int>> implements Socket { |
_controllerClosed = true; |
_controller.close(); |
} |
- _done(); |
+ _consumer.done(); |
} |
void _onError(error) { |
@@ -917,27 +906,7 @@ class _Socket extends Stream<List<int>> implements Socket { |
_controller.addError(error); |
_controller.close(); |
} |
- _done(error); |
- } |
- |
- get _doneFuture { |
- if (_doneCompleter == null) { |
- _ensureRawSocketSubscription(); |
- _doneCompleter = new Completer(); |
- } |
- return _doneCompleter.future; |
- } |
- |
- void _done([error]) { |
- if (_doneCompleter != null) { |
- var tmp = _doneCompleter; |
- _doneCompleter = null; |
- if (error != null) { |
- tmp.completeError(error); |
- } else { |
- tmp.complete(this); |
- } |
- } |
+ _consumer.done(error); |
} |
int _write(List<int> data, int offset, int length) => |
@@ -953,8 +922,7 @@ class _Socket extends Stream<List<int>> implements Socket { |
} |
} |
- void _consumerDone([error]) { |
- _done(error); |
+ void _consumerDone() { |
if (_raw != null) { |
_raw.shutdown(SocketDirection.SEND); |
_disableWriteEvent(); |