Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(81)

Side by Side Diff: runtime/bin/socket_stream.dart

Issue 8437090: Change the handling of closing sockets (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed review comments by ager@ Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/socket_impl.dart ('k') | tests/standalone/src/EchoServerStreamTest.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 class SocketInputStream implements InputStream { 5 class SocketInputStream implements InputStream {
6 SocketInputStream(Socket socket) { 6 SocketInputStream(Socket socket) {
7 _socket = socket; 7 _socket = socket;
8 } 8 }
9 9
10 List<int> read([int len]) { 10 List<int> read([int len]) {
(...skipping 18 matching lines...) Expand all
29 } 29 }
30 30
31 int readInto(List<int> buffer, int offset, int len) { 31 int readInto(List<int> buffer, int offset, int len) {
32 if (offset === null) offset = 0; 32 if (offset === null) offset = 0;
33 if (len === null) len = buffer.length; 33 if (len === null) len = buffer.length;
34 if (offset < 0) throw new StreamException("Illegal offset $offset"); 34 if (offset < 0) throw new StreamException("Illegal offset $offset");
35 if (len < 0) throw new StreamException("Illegal length $len"); 35 if (len < 0) throw new StreamException("Illegal length $len");
36 return _socket.readList(buffer, offset, len); 36 return _socket.readList(buffer, offset, len);
37 } 37 }
38 38
39 int available() { 39 int available() => _socket.available();
40 return _socket.available();
41 }
42 40
43 void set dataHandler(void callback()) { 41 void set dataHandler(void callback()) {
44 _socket.dataHandler = callback; 42 _socket.dataHandler = callback;
45 } 43 }
46 44
47 void set closeHandler(void callback()) { 45 void set closeHandler(void callback()) {
48 _socket.closeHandler = callback; 46 _socket.closeHandler = callback;
49 } 47 }
50 48
51 void set errorHandler(void callback()) { 49 void set errorHandler(void callback()) {
52 _socket.errorHandler = callback; 50 _socket.errorHandler = callback;
53 } 51 }
54 52
55 Socket _socket; 53 Socket _socket;
56 } 54 }
57 55
58 56
59 class _BufferList2 { 57 class _BufferList2 {
60 _BufferList2() { 58 _BufferList2() {
61 clear(); 59 clear();
62 } 60 }
63 61
64 // Adds a new buffer to the list possibly with an offset of the 62 // Adds a new buffer to the list possibly with an offset of the
65 // first byte of interest. The offset can only be specified if the 63 // first byte of interest. The offset can only be specified if the
66 // buffer list is empty. 64 // buffer list is empty.
67 void add(List<int> buffer, [int offset = 0]) { 65 void add(List<int> buffer, [int offset = 0]) {
68 assert(offset == 0 || _buffers.isEmpty()); 66 assert(offset == 0 || _buffers.isEmpty());
69 _buffers.addLast(buffer); 67 _buffers.addLast(buffer);
70 _length += buffer.length; 68 _length += buffer.length - offset;
71 if (offset != 0) _index = offset; 69 if (offset != 0) _index = offset;
72 } 70 }
73 71
74 List<int> get first() => _buffers.first(); 72 List<int> get first() => _buffers.first();
75 int get index() => _index; 73 int get index() => _index;
76 74
77 void removeBytes(int count) { 75 void removeBytes(int count) {
78 int firstRemaining = first.length - _index; 76 int firstRemaining = first.length - _index;
79 assert(count <= firstRemaining); 77 assert(count <= firstRemaining);
80 if (count == firstRemaining) { 78 if (count == firstRemaining) {
(...skipping 29 matching lines...) Expand all
110 } 108 }
111 109
112 bool write(List<int> buffer) { 110 bool write(List<int> buffer) {
113 return _write(buffer, 0, buffer.length, false); 111 return _write(buffer, 0, buffer.length, false);
114 } 112 }
115 113
116 bool writeFrom(List<int> buffer, [int offset = 0, int len]) { 114 bool writeFrom(List<int> buffer, [int offset = 0, int len]) {
117 return _write(buffer, offset, (len == null) ? buffer.length : len, true); 115 return _write(buffer, offset, (len == null) ? buffer.length : len, true);
118 } 116 }
119 117
120 void end() { 118 void close() {
121 if (_ending || _ended) throw new StreamException("Stream ended"); 119 if (!_pendingWrites.isEmpty()) {
122 _ending = true; 120 // Mark the socket for close when all data is written.
123 if (_pendingWrites.isEmpty()) { 121 _closing = true;
124 close(); 122 _socket.writeHandler = _writeHandler;
123 } else {
124 // Close the socket for writing.
125 _socket._closeWrite();
126 _closed = true;
125 } 127 }
126 } 128 }
127 129
128 void close() { 130 void destroy() {
129 _socket.writeHandler = null; 131 _socket.writeHandler = null;
130 _pendingWrites.clear(); 132 _pendingWrites.clear();
131 _socket.close(); 133 _socket.close();
132 _ended = true; 134 _closed = true;
133 } 135 }
134 136
135 void set noPendingWriteHandler(void callback()) { 137 void set noPendingWriteHandler(void callback()) {
136 _noPendingWriteHandler = callback; 138 _noPendingWriteHandler = callback;
137 _socket.writeHandler = _writeHandler; 139 _socket.writeHandler = _writeHandler;
138 } 140 }
139 141
140 void set closeHandler(void callback()) { 142 void set closeHandler(void callback()) {
141 _socket.closeHandler = callback; 143 _socket.closeHandler = callback;
142 } 144 }
143 145
144 void set errorHandler(void callback()) { 146 void set errorHandler(void callback()) {
145 _streamErrorHandler = callback; 147 _streamErrorHandler = callback;
146 } 148 }
147 149
148 bool _write(List<int> buffer, int offset, int len, bool copyBuffer) { 150 bool _write(List<int> buffer, int offset, int len, bool copyBuffer) {
149 if (_ending || _ended) throw new StreamException("Stream ended"); 151 if (_closing || _closed) throw new StreamException("Stream closed");
150 if (len == null) len = buffer.length; 152 if (len == null) len = buffer.length;
151 int bytesWritten = 0; 153 int bytesWritten = 0;
152 if (_pendingWrites.isEmpty()) { 154 if (_pendingWrites.isEmpty()) {
153 // If nothing is buffered write as much as possible and buffer 155 // If nothing is buffered write as much as possible and buffer
154 // the rest. 156 // the rest.
155 bytesWritten = _socket.writeList(buffer, offset, len); 157 bytesWritten = _socket.writeList(buffer, offset, len);
156 if (bytesWritten == len) return true; 158 if (bytesWritten == len) return true;
157 } 159 }
158 160
159 // Place remaining data on the pending writes queue. 161 // Place remaining data on the pending writes queue.
160 if (copyBuffer) { 162 if (copyBuffer) {
161 List<int> newBuffer = 163 List<int> newBuffer =
162 buffer.getRange(offset + bytesWritten, buffer.length); 164 buffer.getRange(offset + bytesWritten, buffer.length);
163 _pendingWrites.add(newBuffer); 165 _pendingWrites.add(newBuffer);
164 } else { 166 } else {
165 _pendingWrites.add(buffer, bytesWritten); 167 _pendingWrites.add(buffer, bytesWritten);
166 } 168 }
167 } 169 }
168 170
169 void _writeHandler() { 171 void _writeHandler() {
170 _socket.writeHandler = _writeHandler;
171 // Write as much buffered data to the socket as possible. 172 // Write as much buffered data to the socket as possible.
172 while (!_pendingWrites.isEmpty()) { 173 while (!_pendingWrites.isEmpty()) {
173 List<int> buffer = _pendingWrites.first; 174 List<int> buffer = _pendingWrites.first;
174 int offset = _pendingWrites.index; 175 int offset = _pendingWrites.index;
175 int bytesToWrite = buffer.length - offset; 176 int bytesToWrite = buffer.length - offset;
176 int bytesWritten = _socket.writeList(buffer, offset, bytesToWrite); 177 int bytesWritten = _socket.writeList(buffer, offset, bytesToWrite);
177 _pendingWrites.removeBytes(bytesWritten); 178 _pendingWrites.removeBytes(bytesWritten);
178 if (bytesWritten < bytesToWrite) return; 179 if (bytesWritten < bytesToWrite) {
180 _socket.writeHandler = _writeHandler;
181 return;
182 }
179 } 183 }
180 184
181 // All buffered data was written. 185 // All buffered data was written.
182 if (_ending) { 186 if (_closing) {
183 _socket.close(); 187 _socket._closeWrite();
184 _ended = true; 188 _closed = true;
185 } else { 189 } else {
186 if (_noPendingWriteHandler != null) _noPendingWriteHandler(); 190 if (_noPendingWriteHandler != null) _noPendingWriteHandler();
187 } 191 }
188 } 192 }
189 193
190 void _errorHandler() { 194 void _errorHandler() {
191 close(); 195 close();
192 if (_streamErrorHandler != null) _streamErrorHandler(); 196 if (_streamErrorHandler != null) _streamErrorHandler();
193 } 197 }
194 198
195 Socket _socket; 199 Socket _socket;
196 _BufferList2 _pendingWrites; 200 _BufferList2 _pendingWrites;
197 bool _ending = false;
198 bool _ended = false;
199 var _noPendingWriteHandler; 201 var _noPendingWriteHandler;
200 var _streamErrorHandler; 202 var _streamErrorHandler;
203 bool _closing = false;
204 bool _closed = false;
201 } 205 }
OLDNEW
« no previous file with comments | « runtime/bin/socket_impl.dart ('k') | tests/standalone/src/EchoServerStreamTest.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698