Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1612)

Unified Diff: runtime/bin/socket_patch.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Review comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « pkg/scheduled_test/lib/src/scheduled_server/safe_http_server.dart ('k') | sdk/lib/async/stream.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/socket_patch.dart
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
index 81e8ab379728d001376dd6c6a8b4179a6f799f6d..0316652c4b3ccc2ae28ef7e13a582a0eae5fb8ac 100644
--- a/runtime/bin/socket_patch.dart
+++ b/runtime/bin/socket_patch.dart
@@ -665,32 +665,13 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
int offset;
List<int> buffer;
bool paused = false;
+ Completer streamCompleter;
_SocketStreamConsumer(this.socket);
- Future<Socket> consume(Stream<List<int>> stream) {
- if (socket._raw != null) {
- subscription = stream.listen(
- (data) {
- assert(!paused);
- assert(buffer == null);
- buffer = data;
- offset = 0;
- write();
- },
- onError: (error) {
- socket._consumerDone(error);
- },
- onDone: () {
- socket._consumerDone();
- },
- unsubscribeOnError: true);
- }
- return socket._doneFuture;
- }
-
Future<Socket> addStream(Stream<List<int>> stream) {
- Completer completer = new Completer<Socket>();
+ socket._ensureRawSocketSubscription();
+ streamCompleter = new Completer<Socket>();
if (socket._raw != null) {
subscription = stream.listen(
(data) {
@@ -701,20 +682,20 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
write();
},
onError: (error) {
- socket._consumerDone(error);
- completer.completeError(error.error, error.stackTrace);
+ socket._consumerDone();
+ done(error);
},
onDone: () {
- completer.complete(socket);
+ done();
},
unsubscribeOnError: true);
}
- return completer.future;
+ return streamCompleter.future;
}
Future<Socket> close() {
socket._consumerDone();
- return completer.future;
+ return new Future.immediate(socket);
}
void write() {
@@ -743,7 +724,20 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
}
} catch (e) {
stop();
- socket._consumerDone(e);
+ socket._consumerDone();
+ done(e);
+ }
+ }
+
+ void done([error]) {
+ if (streamCompleter != null) {
+ var tmp = streamCompleter;
+ streamCompleter = null;
+ if (error != null) {
+ tmp.completeError(error);
+ } else {
+ tmp.complete(socket);
+ }
}
}
@@ -762,8 +756,7 @@ class _Socket extends Stream<List<int>> implements Socket {
StreamController _controller;
bool _controllerClosed = false;
_SocketStreamConsumer _consumer;
- IOSink<Socket> _sink;
- Completer _doneCompleter;
+ IOSink _sink;
var _subscription;
_Socket(RawSocket this._raw) {
@@ -817,15 +810,11 @@ class _Socket extends Stream<List<int>> implements Socket {
void add(List<int> bytes) => _sink.add(bytes);
- Future<Socket> consume(Stream<List<int>> stream) {
- return _sink.consume(stream);
- }
-
- Future<Socket> writeStream(Stream<List<int>> stream) {
- return _sink.writeStream(stream);
+ Future<Socket> addStream(Stream<List<int>> stream) {
+ return _sink.addStream(stream);
}
- close() => _sink.close();
+ Future<Socket> close() => _sink.close();
Future<Socket> get done => _sink.done;
@@ -851,7 +840,7 @@ class _Socket extends Stream<List<int>> implements Socket {
// consumer needs a subscription as they share the error and done
// events from the raw socket.
void _ensureRawSocketSubscription() {
- if (_subscription == null) {
+ if (_subscription == null && _raw != null) {
_subscription = _raw.listen(_onData,
onError: _onError,
onDone: _onDone,
@@ -908,7 +897,7 @@ class _Socket extends Stream<List<int>> implements Socket {
_controllerClosed = true;
_controller.close();
}
- _done();
+ _consumer.done();
}
void _onError(error) {
@@ -917,27 +906,7 @@ class _Socket extends Stream<List<int>> implements Socket {
_controller.addError(error);
_controller.close();
}
- _done(error);
- }
-
- get _doneFuture {
- if (_doneCompleter == null) {
- _ensureRawSocketSubscription();
- _doneCompleter = new Completer();
- }
- return _doneCompleter.future;
- }
-
- void _done([error]) {
- if (_doneCompleter != null) {
- var tmp = _doneCompleter;
- _doneCompleter = null;
- if (error != null) {
- tmp.completeError(error);
- } else {
- tmp.complete(this);
- }
- }
+ _consumer.done(error);
}
int _write(List<int> data, int offset, int length) =>
@@ -953,8 +922,7 @@ class _Socket extends Stream<List<int>> implements Socket {
}
}
- void _consumerDone([error]) {
- _done(error);
+ void _consumerDone() {
if (_raw != null) {
_raw.shutdown(SocketDirection.SEND);
_disableWriteEvent();
« no previous file with comments | « pkg/scheduled_test/lib/src/scheduled_server/safe_http_server.dart ('k') | sdk/lib/async/stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698