Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(115)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Self review Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/run_loop.h"
10 #include "net/base/io_buffer.h"
11 #include "net/base/net_errors.h"
12 #include "net/socket/stream_socket.h"
13
14 namespace gcm {
15
16 // TODO(zea): consider having dynamically sized buffers if this becomes too
17 // expensive.
18 const uint32 kDefaultBufferSize = 8*1024;
19
20 SocketInputStream::SocketInputStream(base::TimeDelta read_timeout,
21 net::StreamSocket* socket)
22 : socket_(socket),
23 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
24 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
25 io_buffer_->size())),
26 buffer_pos_(0),
27 buffer_size_(0),
28 limit_(0),
29 backup_bytes_(0),
30 skipped_bytes_(0),
31 last_error_(net::OK),
32 state_(EMPTY),
33 read_timeout_(read_timeout),
34 weak_ptr_factory_(this) {
35 DCHECK(socket->IsConnected());
36 }
37
38 SocketInputStream::~SocketInputStream() {
39 }
40
41 bool SocketInputStream::Next(const void** data, int* size) {
42 if (state_ == CLOSED)
43 return false;
44
45 if (backup_bytes_ > 0) {
46 *size = backup_bytes_;
47 *data = io_buffer_->data() + buffer_pos_ - backup_bytes_;
48 backup_bytes_ = 0;
49 return true;
50 }
51
52 if (limit_ != 0 && buffer_pos_ >= limit_) {
53 DVLOG(1) << "Reached buffer limit, ending read.";
54 return false;
55 }
56
57 if (buffer_size_ == buffer_pos_) {
58 if (!run_loop_.get()) {
59 Refresh(base::Closure());
60
61 // Check if refresh is still pending.
62 if (state_ == READING) {
63 // It's valid for the caller to spin if size is set to 0 (i.e. no new
64 // data when data is expected) and true is returned. Make sure the
65 // RefreshCompletionCallback has a chance to execute by running pending
66 // messages on this message loop.
67 run_loop_.reset(new base::RunLoop());
68 // Post a timeout task. If it runs, it means the read is still
69 // outstanding, but took too long. This is necessary to handle "brutal"
70 // disconnects, that do not properly close the TCP connection.
71 base::MessageLoop::current()->PostDelayedTask(
72 FROM_HERE,
73 run_loop_->QuitClosure(),
74 read_timeout_);
75 run_loop_->Run();
76 run_loop_.reset();
77 }
78 }
79
80 if (buffer_size_ == buffer_pos_) {
81 *size = 0;
82 if (state_ == READING) {
83 // The timeout was hit; the stream must be recreated with a socket
84 // with a new connection.
85 DVLOG(1) << "Socket read timed out, closing input stream.";
86 CloseStream(net::ERR_TIMED_OUT, base::Closure());
87 return false;
88 } else if (state_ == CLOSED) {
89 // An error was encountered and the stream has already been closed.
90 return false;
91 }
92
93 // The refresh was successful, but did not return any data.
94 // TODO(zea): is this a valid situation?
95 NOTREACHED() << "Refresh did not return data.";
96 return false;
97 }
98 }
99
100 *data = io_buffer_->data() + buffer_pos_;
101 *size = buffer_size_ - buffer_pos_;
102 buffer_pos_ = buffer_size_;
103 state_ = EMPTY;
104 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
105 return true;
106 }
107
108 void SocketInputStream::BackUp(int count) {
109 DCHECK(state_ == READY || state_ == EMPTY);
110 DCHECK_GE(backup_bytes_, 0);
111 DCHECK_LE(backup_bytes_, buffer_pos_);
112 DCHECK_EQ(skipped_bytes_, 0);
113
114 backup_bytes_ += count;
115 state_ = READY;
116 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
117 << "Current position now at " << buffer_pos_ - backup_bytes_
118 << " of " << buffer_size_;
119 }
120
121 bool SocketInputStream::Skip(int count) {
122 DCHECK_EQ(state_, READY);
123 DCHECK_GT(count, 0);
124 DVLOG(1) << "Skipping " << count << " bytes in stream.";
125
126 if (backup_bytes_ >= count) {
127 // We have more data left over than we're trying to skip. Just chop it.
128 backup_bytes_ -= count;
129 return true;
130 }
131
132 count -= backup_bytes_;
133 backup_bytes_ = 0;
134 skipped_bytes_ += count;
135 state_ = EMPTY;
136
137 return true;
138 }
139
140 int64 SocketInputStream::ByteCount() const {
141 DCHECK_NE(state_, CLOSED);
142 DCHECK_NE(state_, READING);
143 return buffer_size_;
144 }
145
146 void SocketInputStream::Refresh(const base::Closure& callback) {
147 DCHECK_NE(state_, CLOSED);
148 DCHECK_NE(state_, READING);
149 DCHECK_EQ(backup_bytes_, 0);
150
151 if (buffer_pos_ == io_buffer_->size()) {
152 LOG(ERROR) << "Out of buffer space, closing input stream.";
153 CloseStream(net::ERR_UNEXPECTED, callback);
154 return;
155 }
156
157 if (!socket_->IsConnected()) {
158 LOG(ERROR) << "Socket was disconnected, closing input stream";
159 CloseStream(net::ERR_CONNECTION_CLOSED, callback);
160 return;
161 }
162
163 state_ = READING;
164 int read_limit =
165 limit_ == 0 ?
166 drainable_io_buffer_->BytesRemaining() :
167 limit_ - buffer_pos_;
168 read_limit = std::min(read_limit, drainable_io_buffer_->BytesRemaining());
169 DVLOG(1) << "Refreshing input stream, limit of " << read_limit << " bytes.";
170 DCHECK_GT(read_limit, 0);
171 int result = socket_->Read(
172 drainable_io_buffer_,
173 read_limit,
174 base::Bind(&SocketInputStream::RefreshCompletionCallback,
175 weak_ptr_factory_.GetWeakPtr(),
176 callback));
177 if (result != net::ERR_IO_PENDING)
178 RefreshCompletionCallback(callback, result);
179 }
180
181 void SocketInputStream::Reset() {
182 DCHECK_EQ(skipped_bytes_, 0);
183 DVLOG(1) << "Resetting input stream, consumed "
184 << buffer_pos_ - backup_bytes_ << " bytes.";
185 DCHECK_NE(state_, READING);
186 DCHECK_NE(state_, CLOSED);
187
188 char* remaining_data_ptr = &(io_buffer_->data()[buffer_pos_ - backup_bytes_]);
189 int remaining_buffer_size = buffer_size_ - buffer_pos_ + backup_bytes_;
190 ResetInternal();
191
192 if (remaining_buffer_size > 0) {
193 buffer_size_ = remaining_buffer_size;
194 DVLOG(1) << "Have " << buffer_size_ << " bytes remaining, shifting.";
195 // Move any remaining unread data to the start of the buffer;
196 std::memmove(io_buffer_->data(), remaining_data_ptr, buffer_size_);
197 state_ = READY;
198 drainable_io_buffer_->SetOffset(buffer_size_);
199 }
200 }
201
202 void SocketInputStream::SetLimit(int limit) {
203 DCHECK_GE(limit, 0);
204 DCHECK_LT(limit, io_buffer_->size());
205 DVLOG(1) << "Setting read limit to " << limit;
206 DVLOG(1) << " Current pos: " << buffer_pos_;
207 DVLOG(1) << " Current backup: " << backup_bytes_;
208 DVLOG(1) << " Current size: " << buffer_size_;
209 limit_ = limit;
210 }
211
212 int SocketInputStream::GetCurrentPosition() const {
213 DCHECK_EQ(skipped_bytes_, 0);
214 return buffer_pos_ - backup_bytes_;
215 }
216
217 int SocketInputStream::last_error() const {
218 return last_error_;
219 }
220
221 SocketInputStream::State SocketInputStream::state() const {
222 return state_;
223 }
224
225 void SocketInputStream::RefreshCompletionCallback(
226 const base::Closure& callback, int result) {
227 DCHECK_EQ(state_, READING);
228
229 if (run_loop_.get())
230 run_loop_->Quit();
231
232 if (state_ == CLOSED) {
233 // An error occured before the completion callback could complete. Ignore
234 // the result.
235 return;
236 }
237
238 if (result < net::OK) {
239 DVLOG(1) << "Failed to refresh socket: " << result;
240 CloseStream(result, callback);
241 return;
242 }
243
244 if (result == 0) {
245 // No data found, just return.
246 state_ = (buffer_pos_ == buffer_size_ ? EMPTY : READY);
247 return;
248 }
249
250 state_ = READY;
251 buffer_size_ += result;
252 int bytes_to_skip = std::min(skipped_bytes_, result);
253 buffer_pos_ += bytes_to_skip;
254 if (buffer_pos_== buffer_size_)
255 state_ = EMPTY;
256 skipped_bytes_ = std::max(skipped_bytes_ - result, 0);
257 drainable_io_buffer_->SetOffset(buffer_size_);
258 if (!callback.is_null())
259 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
260
261 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
262 << "Current position " << buffer_pos_ - backup_bytes_
263 << " of " << buffer_size_;
264 }
265
266 void SocketInputStream::ResetInternal() {
267 if (run_loop_.get()) {
268 run_loop_->Quit();
269 run_loop_.reset();
270 }
271 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
272 buffer_size_ = 0;
273 buffer_pos_ = 0;
274 backup_bytes_ = 0;
275 skipped_bytes_ = 0;
276 state_ = EMPTY;
277 limit_ = 0;
278
279 last_error_ = net::OK;
280
281 // Reset the offset by creating a new one. Note that DrainableIOBuffers don't
282 // actually allocate their own buffer memory like normal IOBuffers. This will
283 // just reset the pointers to point to the beginning of io_buffer_'s data.
284 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
285 io_buffer_->size());
286 }
287
288 void SocketInputStream::CloseStream(int result, const base::Closure& callback) {
289 ResetInternal();
290 state_ = CLOSED;
291 last_error_ = result;
292
293 if (!callback.is_null())
294 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
295 }
296
297 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
298 : socket_(socket),
299 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
300 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
301 io_buffer_->size())),
302 buffer_used_(0),
303 state_(EMPTY),
304 weak_ptr_factory_(this) {
305 DCHECK(socket->IsConnected());
306 }
307
308 SocketOutputStream::~SocketOutputStream() {
309 }
310
311 bool SocketOutputStream::Next(void** data, int* size) {
312 DCHECK_NE(state_, CLOSED);
313 DCHECK_NE(state_, FLUSHING);
314 if (buffer_used_ == io_buffer_->size())
315 return false;
316
317 *data = io_buffer_->data() + buffer_used_;
318 *size = io_buffer_->size() - buffer_used_;
319 buffer_used_ = io_buffer_->size();
320 state_ = READY;
321 return true;
322 }
323
324 void SocketOutputStream::BackUp(int count) {
325 DCHECK_GE(count, 0);
326 if (count > buffer_used_)
327 buffer_used_ = 0;
328 buffer_used_ -= count;
329 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
330 << buffer_used_ << " bytes used.";
331 }
332
333 int64 SocketOutputStream::ByteCount() const {
334 DCHECK_NE(state_, CLOSED);
335 DCHECK_NE(state_, FLUSHING);
336 return buffer_used_;
337 }
338
339 void SocketOutputStream::Flush(const base::Closure& callback) {
340 DCHECK_EQ(state_, READY);
341 state_ = FLUSHING;
342
343 if (!socket_->IsConnected()) {
344 LOG(ERROR) << "Socket was disconnected, closing output stream";
345 last_error_ = net::ERR_CONNECTION_CLOSED;
346 state_ = CLOSED;
347 if (!callback.is_null())
348 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
349 return;
350 }
351
352 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
353 int result = socket_->Write(
354 drainable_io_buffer_,
355 buffer_used_,
356 base::Bind(&SocketOutputStream::FlushCompletionCallback,
357 weak_ptr_factory_.GetWeakPtr(),
358 callback));
359 if (result != net::ERR_IO_PENDING)
360 FlushCompletionCallback(callback, result);
361 }
362
363 SocketOutputStream::State SocketOutputStream::state() const{
364 return state_;
365 }
366
367 int SocketOutputStream::last_error() const {
368 return last_error_;
369 }
370
371 void SocketOutputStream::FlushCompletionCallback(
372 const base::Closure& callback, int result) {
373 DCHECK_EQ(state_, FLUSHING);
374 if (result < net::OK) {
375 LOG(ERROR) << "Failed to flush socket.";
376 last_error_ = result;
377 state_ = CLOSED;
378 if (!callback.is_null())
379 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
380 return;
381 }
382
383 state_ = READY;
384 if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) {
385 DVLOG(1) << "Partial flush complete. Retrying.";
386 // Only a partial write was completed. Flush again to finish the write.
387 drainable_io_buffer_->SetOffset(
388 drainable_io_buffer_->BytesConsumed() + result);
389 Flush(callback);
390 return;
391 }
392
393 DVLOG(1) << "Socket flush complete.";
394 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
395 io_buffer_->size());
396 state_ = EMPTY;
397 buffer_used_ = 0;
398 if (!callback.is_null())
399 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
400 }
401
402 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698