Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(260)

Side by Side Diff: lib/src/copy/io_sink.dart

Issue 1225403008: Bring in latest dart:io WebSocket code. (Closed) Base URL: git@github.com:dart-lang/http_parser@master
Patch Set: pubspec + changelog Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/copy/bytes_builder.dart ('k') | lib/src/copy/web_socket.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io"
6 // implementation isn't used directly to support non-"dart:io" applications.
7 //
8 // Because it's copied directly, only modifications necessary to support the
9 // desired public API and to remove "dart:io" dependencies have been made.
10 //
11 // This is up-to-date as of sdk revision
12 // 86227840d75d974feb238f8b3c59c038b99c05cf.
13 library http_parser.copy.io_sink;
14
15 import 'dart:async';
16
17 class StreamSinkImpl<T> implements StreamSink<T> {
18 final StreamConsumer<T> _target;
19 Completer _doneCompleter = new Completer();
20 Future _doneFuture;
21 StreamController<T> _controllerInstance;
22 Completer _controllerCompleter;
23 bool _isClosed = false;
24 bool _isBound = false;
25 bool _hasError = false;
26
27 StreamSinkImpl(this._target) {
28 _doneFuture = _doneCompleter.future;
29 }
30
31 void add(T data) {
32 if (_isClosed) return;
33 _controller.add(data);
34 }
35
36 void addError(error, [StackTrace stackTrace]) {
37 _controller.addError(error, stackTrace);
38 }
39
40 Future addStream(Stream<T> stream) {
41 if (_isBound) {
42 throw new StateError("StreamSink is already bound to a stream");
43 }
44 _isBound = true;
45 if (_hasError) return done;
46 // Wait for any sync operations to complete.
47 Future targetAddStream() {
48 return _target.addStream(stream)
49 .whenComplete(() {
50 _isBound = false;
51 });
52 }
53 if (_controllerInstance == null) return targetAddStream();
54 var future = _controllerCompleter.future;
55 _controllerInstance.close();
56 return future.then((_) => targetAddStream());
57 }
58
59 Future flush() {
60 if (_isBound) {
61 throw new StateError("StreamSink is bound to a stream");
62 }
63 if (_controllerInstance == null) return new Future.value(this);
64 // Adding an empty stream-controller will return a future that will complete
65 // when all data is done.
66 _isBound = true;
67 var future = _controllerCompleter.future;
68 _controllerInstance.close();
69 return future.whenComplete(() {
70 _isBound = false;
71 });
72 }
73
74 Future close() {
75 if (_isBound) {
76 throw new StateError("StreamSink is bound to a stream");
77 }
78 if (!_isClosed) {
79 _isClosed = true;
80 if (_controllerInstance != null) {
81 _controllerInstance.close();
82 } else {
83 _closeTarget();
84 }
85 }
86 return done;
87 }
88
89 void _closeTarget() {
90 _target.close().then(_completeDoneValue, onError: _completeDoneError);
91 }
92
93 Future get done => _doneFuture;
94
95 void _completeDoneValue(value) {
96 if (_doneCompleter == null) return;
97 _doneCompleter.complete(value);
98 _doneCompleter = null;
99 }
100
101 void _completeDoneError(error, StackTrace stackTrace) {
102 if (_doneCompleter == null) return;
103 _hasError = true;
104 _doneCompleter.completeError(error, stackTrace);
105 _doneCompleter = null;
106 }
107
108 StreamController<T> get _controller {
109 if (_isBound) {
110 throw new StateError("StreamSink is bound to a stream");
111 }
112 if (_isClosed) {
113 throw new StateError("StreamSink is closed");
114 }
115 if (_controllerInstance == null) {
116 _controllerInstance = new StreamController<T>(sync: true);
117 _controllerCompleter = new Completer();
118 _target.addStream(_controller.stream)
119 .then(
120 (_) {
121 if (_isBound) {
122 // A new stream takes over - forward values to that stream.
123 _controllerCompleter.complete(this);
124 _controllerCompleter = null;
125 _controllerInstance = null;
126 } else {
127 // No new stream, .close was called. Close _target.
128 _closeTarget();
129 }
130 },
131 onError: (error, stackTrace) {
132 if (_isBound) {
133 // A new stream takes over - forward errors to that stream.
134 _controllerCompleter.completeError(error, stackTrace);
135 _controllerCompleter = null;
136 _controllerInstance = null;
137 } else {
138 // No new stream. No need to close target, as it have already
139 // failed.
140 _completeDoneError(error, stackTrace);
141 }
142 });
143 }
144 return _controllerInstance;
145 }
146 }
147
OLDNEW
« no previous file with comments | « lib/src/copy/bytes_builder.dart ('k') | lib/src/copy/web_socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698