OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "google_apis/gcm/base/socket_stream.h" | |
6 | |
7 #include "base/callback.h" | |
8 #include "net/base/io_buffer.h" | |
9 #include "net/base/net_errors.h" | |
10 #include "net/socket/stream_socket.h" | |
11 | |
12 namespace gcm { | |
13 | |
14 // TODO(zea): consider having dynamically sized buffers if this becomes too | |
akalin
2013/10/04 18:37:04
dynamically sized -> dynamically-sized
Nicolas Zea
2013/10/04 20:55:28
Done.
| |
15 // expensive. | |
16 const uint32 kDefaultBufferSize = 8*1024; | |
akalin
2013/10/04 18:37:04
put this in anon namespace?
Nicolas Zea
2013/10/04 20:55:28
Done.
| |
17 | |
18 SocketInputStream::SocketInputStream(net::StreamSocket* socket) | |
19 : socket_(socket), | |
20 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)), | |
21 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), | |
22 io_buffer_->size())), | |
23 buffer_read_pos_(0), | |
24 buffer_write_pos_(0), | |
25 backup_bytes_(0), | |
26 skipped_bytes_(0), | |
27 last_error_(net::OK), | |
28 state_(EMPTY), | |
29 weak_ptr_factory_(this) { | |
30 DCHECK(socket->IsConnected()); | |
31 } | |
32 | |
33 SocketInputStream::~SocketInputStream() { | |
34 } | |
35 | |
36 bool SocketInputStream::Next(const void** data, int* size) { | |
37 DCHECK_NE(state_, CLOSED); | |
38 DCHECK_NE(state_, READING); | |
39 | |
40 if (state_ == EMPTY) { | |
41 DVLOG(1) << "No unread data remaining, ending read."; | |
42 return false; | |
43 } | |
44 | |
45 if (backup_bytes_ > 0) { | |
akalin
2013/10/04 18:37:04
DCHECK backup_bytes_ isn't too big?
Nicolas Zea
2013/10/04 20:55:28
n/a now
| |
46 *size = backup_bytes_; | |
akalin
2013/10/04 18:37:04
put *size assignment after *data assignment
Nicolas Zea
2013/10/04 20:55:28
n/a now
| |
47 *data = io_buffer_->data() + buffer_read_pos_ - backup_bytes_; | |
48 backup_bytes_ = 0; | |
49 DCHECK_GT(*size, 0); | |
akalin
2013/10/04 18:37:04
i'm all for copious DCHECKs but this seems pretty
Nicolas Zea
2013/10/04 20:55:28
n/a now
| |
50 DVLOG(1) << "Consuming " << *size << " bytes in input buffer."; | |
51 if (backup_bytes_ == 0 && buffer_read_pos_ == buffer_write_pos_) | |
52 state_ = EMPTY; | |
53 return true; | |
54 } | |
55 | |
56 DCHECK_EQ(state_, READY) | |
57 << " Input stream must have pending data before reading."; | |
58 DCHECK_NE(buffer_write_pos_, buffer_read_pos_); | |
59 *data = io_buffer_->data() + buffer_read_pos_; | |
60 *size = buffer_write_pos_ - buffer_read_pos_; | |
61 buffer_read_pos_ = buffer_write_pos_; | |
62 state_ = EMPTY; | |
63 DVLOG(1) << "Consuming " << *size << " bytes in input buffer."; | |
64 return true; | |
65 } | |
66 | |
67 void SocketInputStream::BackUp(int count) { | |
68 DCHECK(state_ == READY || state_ == EMPTY); | |
69 DCHECK_GE(count, 0); | |
70 DCHECK_GE(backup_bytes_, 0); | |
71 DCHECK_LE(backup_bytes_, buffer_read_pos_); | |
72 DCHECK_EQ(skipped_bytes_, 0); | |
73 | |
74 backup_bytes_ += count; | |
75 state_ = READY; | |
76 DVLOG(1) << "Backing up " << count << " bytes in input buffer. " | |
77 << "Current position now at " << buffer_read_pos_ - backup_bytes_ | |
78 << " of " << buffer_write_pos_; | |
79 } | |
80 | |
81 bool SocketInputStream::Skip(int count) { | |
82 DCHECK_EQ(state_, READY); | |
83 DCHECK_GT(count, 0); | |
84 DVLOG(1) << "Skipping " << count << " bytes in stream."; | |
85 | |
86 if (backup_bytes_ >= count) { | |
87 // We have more data left over than we're trying to skip. Just chop it. | |
88 backup_bytes_ -= count; | |
89 return true; | |
90 } | |
91 | |
92 count -= backup_bytes_; | |
93 backup_bytes_ = 0; | |
94 skipped_bytes_ += count; | |
95 state_ = EMPTY; | |
96 | |
97 return true; | |
98 } | |
99 | |
100 int64 SocketInputStream::ByteCount() const { | |
101 DCHECK_NE(state_, CLOSED); | |
102 DCHECK_NE(state_, READING); | |
103 return buffer_write_pos_ - buffer_read_pos_ + backup_bytes_; | |
104 } | |
105 | |
106 void SocketInputStream::Refresh(const base::Closure& callback, | |
107 int byte_limit) { | |
108 DCHECK_NE(state_, CLOSED); | |
109 DCHECK_NE(state_, READING); | |
110 DCHECK_GT(byte_limit, 0); | |
111 DCHECK_LE(byte_limit, drainable_io_buffer_->BytesRemaining()); | |
112 | |
113 if (buffer_write_pos_ + byte_limit > io_buffer_->size()) { | |
114 LOG(ERROR) << "Out of buffer space, closing input stream."; | |
115 CloseStream(net::ERR_UNEXPECTED, callback); | |
116 return; | |
117 } | |
118 | |
119 if (!socket_->IsConnected()) { | |
120 LOG(ERROR) << "Socket was disconnected, closing input stream"; | |
121 CloseStream(net::ERR_CONNECTION_CLOSED, callback); | |
122 return; | |
123 } | |
124 | |
125 state_ = READING; | |
126 | |
127 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes."; | |
128 int result = socket_->Read( | |
129 drainable_io_buffer_, | |
130 byte_limit, | |
131 base::Bind(&SocketInputStream::RefreshCompletionCallback, | |
132 weak_ptr_factory_.GetWeakPtr(), | |
133 callback)); | |
134 DVLOG(1) << "Read returned " << result; | |
135 if (result != net::ERR_IO_PENDING) | |
136 RefreshCompletionCallback(callback, result); | |
137 } | |
138 | |
139 void SocketInputStream::RebuildBuffer() { | |
140 DCHECK_EQ(skipped_bytes_, 0); | |
141 DVLOG(1) << "Resetting input stream, consumed " | |
142 << buffer_read_pos_ - backup_bytes_ << " bytes."; | |
143 DCHECK_NE(state_, READING); | |
144 DCHECK_NE(state_, CLOSED); | |
145 | |
146 int last_read_pos = buffer_read_pos_ - backup_bytes_; | |
147 char* unread_data_ptr = io_buffer_->data() + last_read_pos; | |
148 int unread_buffer_size = buffer_write_pos_ - last_read_pos; | |
149 ResetInternal(); | |
150 | |
151 if (unread_buffer_size > 0) { | |
152 buffer_write_pos_ = unread_buffer_size; | |
153 drainable_io_buffer_->SetOffset(buffer_write_pos_); | |
154 state_ = READY; | |
155 | |
156 if (last_read_pos != 0) { | |
157 DVLOG(1) << "Have " << buffer_write_pos_ | |
158 << " unread bytes remaining, shifting."; | |
159 // Move any remaining unread data to the start of the buffer; | |
160 std::memmove(io_buffer_->data(), unread_data_ptr, buffer_write_pos_); | |
161 } else { | |
162 DVLOG(1) << "Have " << buffer_write_pos_ << " unread bytes remaining."; | |
163 } | |
164 } | |
165 } | |
166 | |
167 int SocketInputStream::last_error() const { | |
168 return last_error_; | |
169 } | |
170 | |
171 SocketInputStream::State SocketInputStream::state() const { | |
172 return state_; | |
173 } | |
174 | |
175 void SocketInputStream::RefreshCompletionCallback( | |
176 const base::Closure& callback, int result) { | |
177 DCHECK_EQ(state_, READING); | |
178 if (state_ == CLOSED) { | |
179 // An error occured before the completion callback could complete. Ignore | |
180 // the result. | |
181 return; | |
182 } | |
183 | |
184 if (result < net::OK) { | |
185 DVLOG(1) << "Failed to refresh socket: " << result; | |
186 CloseStream(result, callback); | |
187 return; | |
188 } | |
189 DCHECK_GT(result, 0); | |
190 | |
191 state_ = READY; | |
192 buffer_write_pos_ += result; | |
193 int bytes_to_skip = std::min(skipped_bytes_, result); | |
194 buffer_read_pos_ += bytes_to_skip; | |
195 skipped_bytes_ = std::max(skipped_bytes_ - result, 0); | |
196 drainable_io_buffer_->SetOffset(buffer_write_pos_); | |
197 if (buffer_read_pos_ - backup_bytes_ == buffer_write_pos_) | |
198 state_ = EMPTY; | |
199 | |
200 DVLOG(1) << "Refresh complete with " << result << " new bytes. " | |
201 << "Current position " << buffer_read_pos_ - backup_bytes_ | |
202 << " of " << buffer_write_pos_ << "."; | |
203 | |
204 if (!callback.is_null()) callback.Run(); | |
akalin
2013/10/04 18:37:04
newline before callback.Run()
Nicolas Zea
2013/10/04 20:55:28
Done.
| |
205 } | |
206 | |
207 void SocketInputStream::ResetInternal() { | |
208 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks. | |
209 buffer_write_pos_ = 0; | |
210 buffer_read_pos_ = 0; | |
211 backup_bytes_ = 0; | |
212 skipped_bytes_ = 0; | |
213 state_ = EMPTY; | |
214 | |
215 last_error_ = net::OK; | |
216 | |
217 // Reset the offset by creating a new one. Note that DrainableIOBuffers don't | |
218 // actually allocate their own buffer memory like normal IOBuffers. This will | |
219 // just reset the pointers to point to the beginning of io_buffer_'s data. | |
220 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(), | |
221 io_buffer_->size()); | |
222 } | |
223 | |
224 void SocketInputStream::CloseStream(int error, const base::Closure& callback) { | |
225 ResetInternal(); | |
226 state_ = CLOSED; | |
227 last_error_ = error; | |
228 LOG(ERROR) << "Closing stream with result " << error; | |
229 if (!callback.is_null()) callback.Run(); | |
230 } | |
231 | |
232 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket) | |
akalin
2013/10/04 18:37:04
(blanket comment) apply above comments on SocketIn
Nicolas Zea
2013/10/04 20:55:28
Done.
| |
233 : socket_(socket), | |
234 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)), | |
235 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), | |
236 io_buffer_->size())), | |
237 buffer_used_(0), | |
238 state_(EMPTY), | |
239 weak_ptr_factory_(this) { | |
240 DCHECK(socket->IsConnected()); | |
241 } | |
242 | |
243 SocketOutputStream::~SocketOutputStream() { | |
244 } | |
245 | |
246 bool SocketOutputStream::Next(void** data, int* size) { | |
247 DCHECK_NE(state_, CLOSED); | |
248 DCHECK_NE(state_, FLUSHING); | |
249 if (buffer_used_ == io_buffer_->size()) | |
250 return false; | |
251 | |
252 *data = io_buffer_->data() + buffer_used_; | |
253 *size = io_buffer_->size() - buffer_used_; | |
254 buffer_used_ = io_buffer_->size(); | |
255 state_ = READY; | |
256 return true; | |
257 } | |
258 | |
259 void SocketOutputStream::BackUp(int count) { | |
260 DCHECK_GE(count, 0); | |
261 if (count > buffer_used_) | |
262 buffer_used_ = 0; | |
263 buffer_used_ -= count; | |
264 DVLOG(1) << "Backing up " << count << " bytes in output buffer. " | |
265 << buffer_used_ << " bytes used."; | |
266 } | |
267 | |
268 int64 SocketOutputStream::ByteCount() const { | |
269 DCHECK_NE(state_, CLOSED); | |
270 DCHECK_NE(state_, FLUSHING); | |
271 return buffer_used_; | |
272 } | |
273 | |
274 void SocketOutputStream::Flush(const base::Closure& callback) { | |
275 DCHECK_EQ(state_, READY); | |
276 state_ = FLUSHING; | |
277 | |
278 if (!socket_->IsConnected()) { | |
279 LOG(ERROR) << "Socket was disconnected, closing output stream"; | |
280 last_error_ = net::ERR_CONNECTION_CLOSED; | |
281 state_ = CLOSED; | |
282 if (!callback.is_null()) callback.Run(); | |
283 return; | |
284 } | |
285 | |
286 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket."; | |
287 int result = socket_->Write( | |
288 drainable_io_buffer_, | |
289 buffer_used_, | |
290 base::Bind(&SocketOutputStream::FlushCompletionCallback, | |
291 weak_ptr_factory_.GetWeakPtr(), | |
292 callback)); | |
293 DVLOG(1) << "Write returned " << result; | |
294 if (result != net::ERR_IO_PENDING) | |
295 FlushCompletionCallback(callback, result); | |
296 } | |
297 | |
298 SocketOutputStream::State SocketOutputStream::state() const{ | |
299 return state_; | |
300 } | |
301 | |
302 int SocketOutputStream::last_error() const { | |
303 return last_error_; | |
304 } | |
305 | |
306 void SocketOutputStream::FlushCompletionCallback( | |
307 const base::Closure& callback, int result) { | |
308 DCHECK_EQ(state_, FLUSHING); | |
309 if (result < net::OK) { | |
310 LOG(ERROR) << "Failed to flush socket."; | |
311 last_error_ = result; | |
312 state_ = CLOSED; | |
313 if (!callback.is_null()) callback.Run(); | |
314 return; | |
315 } | |
316 | |
317 state_ = READY; | |
318 if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) { | |
319 DVLOG(1) << "Partial flush complete. Retrying."; | |
320 // Only a partial write was completed. Flush again to finish the write. | |
321 drainable_io_buffer_->SetOffset( | |
322 drainable_io_buffer_->BytesConsumed() + result); | |
323 Flush(callback); | |
324 return; | |
325 } | |
326 | |
327 DVLOG(1) << "Socket flush complete."; | |
328 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(), | |
329 io_buffer_->size()); | |
330 state_ = EMPTY; | |
331 buffer_used_ = 0; | |
332 if (!callback.is_null()) callback.Run(); | |
333 } | |
334 | |
335 } // namespace gcm | |
OLD | NEW |