Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Make SocketStream asynchronous. Remove custom data provider Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "base/message_loop/message_loop.h"
9 #include "net/base/io_buffer.h"
10 #include "net/base/net_errors.h"
11 #include "net/socket/stream_socket.h"
12
13 namespace gcm {
14
15 // TODO(zea): consider having dynamically sized buffers if this becomes too
16 // expensive.
17 const uint32 kDefaultBufferSize = 8*1024;
18
19 SocketInputStream::SocketInputStream(base::TimeDelta read_timeout,
20 net::StreamSocket* socket)
21 : socket_(socket),
22 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
23 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
24 io_buffer_->size())),
25 buffer_pos_(0),
26 buffer_size_(0),
27 limit_(0),
28 backup_bytes_(0),
29 skipped_bytes_(0),
30 last_error_(net::OK),
31 state_(EMPTY),
32 read_timeout_(read_timeout),
33 weak_ptr_factory_(this) {
34 DCHECK(socket->IsConnected());
35 }
36
37 SocketInputStream::~SocketInputStream() {
38 }
39
40 bool SocketInputStream::Next(const void** data, int* size) {
41 DCHECK_NE(state_, CLOSED);
42 DCHECK_NE(state_, READING);
43
44 if (backup_bytes_ > 0) {
45 *size = backup_bytes_;
46 *data = io_buffer_->data() + buffer_pos_ - backup_bytes_;
47 backup_bytes_ = 0;
48 return true;
49 }
50
51 if (limit_ != 0 && buffer_pos_ >= limit_) {
52 DVLOG(1) << "Reached buffer limit, ending read.";
53 return false;
54 }
55
56 DCHECK_EQ(state_, READY)
57 << " Input stream must have pending data before reading.";
58 DCHECK_NE(buffer_size_, buffer_pos_);
59 *data = io_buffer_->data() + buffer_pos_;
60 *size = buffer_size_ - buffer_pos_;
61 buffer_pos_ = buffer_size_;
62 state_ = EMPTY;
63 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
64 return true;
65 }
66
67 void SocketInputStream::BackUp(int count) {
68 DCHECK(state_ == READY || state_ == EMPTY);
69 DCHECK_GE(backup_bytes_, 0);
70 DCHECK_LE(backup_bytes_, buffer_pos_);
71 DCHECK_EQ(skipped_bytes_, 0);
72
73 backup_bytes_ += count;
74 state_ = READY;
75 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
76 << "Current position now at " << buffer_pos_ - backup_bytes_
77 << " of " << buffer_size_;
78 }
79
80 bool SocketInputStream::Skip(int count) {
81 DCHECK_EQ(state_, READY);
82 DCHECK_GT(count, 0);
83 DVLOG(1) << "Skipping " << count << " bytes in stream.";
84
85 if (backup_bytes_ >= count) {
86 // We have more data left over than we're trying to skip. Just chop it.
87 backup_bytes_ -= count;
88 return true;
89 }
90
91 count -= backup_bytes_;
92 backup_bytes_ = 0;
93 skipped_bytes_ += count;
94 state_ = EMPTY;
95
96 return true;
97 }
98
99 int64 SocketInputStream::ByteCount() const {
100 DCHECK_NE(state_, CLOSED);
101 DCHECK_NE(state_, READING);
102 return buffer_size_;
103 }
104
105 void SocketInputStream::Refresh(const base::Closure& callback) {
106 DCHECK_NE(state_, CLOSED);
107 DCHECK_NE(state_, READING);
108
109 if (buffer_pos_ - backup_bytes_ == io_buffer_->size()) {
110 LOG(ERROR) << "Out of buffer space, closing input stream.";
111 CloseStream(net::ERR_UNEXPECTED, callback);
112 return;
113 }
114
115 if (!socket_->IsConnected()) {
116 LOG(ERROR) << "Socket was disconnected, closing input stream";
117 CloseStream(net::ERR_CONNECTION_CLOSED, callback);
118 return;
119 }
120
121 state_ = READING;
122 int read_limit =
123 limit_ == 0 ?
124 drainable_io_buffer_->BytesRemaining() :
125 limit_ - buffer_pos_;
126 read_limit = std::min(read_limit, drainable_io_buffer_->BytesRemaining());
127
128 if (read_limit <= buffer_size_ - (buffer_pos_ - backup_bytes_)) {
129 state_ = READY;
130 if (read_timeout_timer_.IsRunning())
131 read_timeout_timer_.Reset();
132 DVLOG(1) << "Already have enough data for read limit.";
133 if (!callback.is_null())
134 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
135 return;
136 }
137
138 DVLOG(1) << "Refreshing input stream, limit of " << read_limit << " bytes.";
139 DCHECK_GT(read_limit, 0);
140 int result = socket_->Read(
141 drainable_io_buffer_,
142 read_limit,
143 base::Bind(&SocketInputStream::RefreshCompletionCallback,
144 weak_ptr_factory_.GetWeakPtr(),
145 callback));
146 DVLOG(1) << "Read returned " << result;
147 if (result != net::ERR_IO_PENDING)
148 RefreshCompletionCallback(callback, result);
149 }
150
151 void SocketInputStream::Reset() {
152 DCHECK_EQ(skipped_bytes_, 0);
153 DVLOG(1) << "Resetting input stream, consumed "
154 << buffer_pos_ - backup_bytes_ << " bytes.";
155 DCHECK_NE(state_, READING);
156 DCHECK_NE(state_, CLOSED);
157
158 int old_buffer_pos = buffer_pos_ - backup_bytes_;
159 char* remaining_data_ptr = &(io_buffer_->data()[old_buffer_pos]);
160 int remaining_buffer_size = buffer_size_ - old_buffer_pos;
161 ResetInternal();
162
163 if (remaining_buffer_size > 0) {
164 buffer_size_ = remaining_buffer_size;
165 drainable_io_buffer_->SetOffset(buffer_size_);
166 state_ = READY;
167
168 if (old_buffer_pos != 0) {
169 DVLOG(1) << "Have " << buffer_size_ << " bytes remaining, shifting.";
170 // Move any remaining unread data to the start of the buffer;
171 std::memmove(io_buffer_->data(), remaining_data_ptr, buffer_size_);
172 } else {
173 DVLOG(1) << "Have " << buffer_size_ << " bytes remaining.";
174 }
175 }
176 }
177
178 void SocketInputStream::GetNextMessage(int limit,
179 const base::Closure& callback) {
180 DCHECK_GE(limit, 0);
181 DCHECK_LT(limit, io_buffer_->size());
182 DVLOG(1) << "Setting read limit to " << limit;
183 DVLOG(1) << " Current pos: " << buffer_pos_;
184 DVLOG(1) << " Current backup: " << backup_bytes_;
185 DVLOG(1) << " Current size: " << buffer_size_;
186 limit_ = limit;
187
188 // Set up the timeout timer. Note that if this does get triggered, the
189 // IOBuffer used in the Read call will remain owned by the socket itself
190 // until the read completes (which may never happen). A timeout must therefore
191 // be a fatal error for the input stream.
192 read_timeout_timer_.Start(
193 FROM_HERE,
194 read_timeout_,
195 base::Bind(&SocketInputStream::RefreshCompletionCallback,
196 weak_ptr_factory_.GetWeakPtr(),
197 callback,
198 net::ERR_TIMED_OUT));
199 Refresh(callback);
200 }
201
202 int SocketInputStream::last_error() const {
203 return last_error_;
204 }
205
206 SocketInputStream::State SocketInputStream::state() const {
207 return state_;
208 }
209
210 void SocketInputStream::RefreshCompletionCallback(
211 const base::Closure& callback, int result) {
212 DCHECK_EQ(state_, READING);
213 // TODO(zea): should the timeout be per read or per message? For now it's
214 // per read, and reset on every read completion (successful or not).
215 if (read_timeout_timer_.IsRunning())
216 read_timeout_timer_.Reset();
217
218 if (state_ == CLOSED) {
219 // An error occured before the completion callback could complete. Ignore
220 // the result.
221 return;
222 }
223
224 if (result < net::OK) {
225 DVLOG(1) << "Failed to refresh socket: " << result;
226 CloseStream(result, callback);
227 return;
228 }
229 DCHECK_GT(result, 0);
230
231 state_ = READY;
232 buffer_size_ += result;
233 int bytes_to_skip = std::min(skipped_bytes_, result);
234 buffer_pos_ += bytes_to_skip;
235 skipped_bytes_ = std::max(skipped_bytes_ - result, 0);
236 drainable_io_buffer_->SetOffset(buffer_size_);
237 if (buffer_pos_ - backup_bytes_ == buffer_size_)
238 state_ = EMPTY;
239
240 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
241 << "Current position " << buffer_pos_ - backup_bytes_
242 << " of " << buffer_size_ << ".";
243
244 // If a limit is set, kick off another refresh to get the rest of the message
245 // data.
246 if (limit_ != 0 && buffer_size_ < limit_) {
247 read_timeout_timer_.Start(
248 FROM_HERE,
249 read_timeout_,
250 base::Bind(&SocketInputStream::RefreshCompletionCallback,
251 weak_ptr_factory_.GetWeakPtr(),
252 callback,
253 net::ERR_TIMED_OUT));
254 Refresh(callback);
255 return;
256 }
257
258 if (!callback.is_null())
259 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
260 }
261
262 void SocketInputStream::ResetInternal() {
263 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
264 buffer_size_ = 0;
265 buffer_pos_ = 0;
266 backup_bytes_ = 0;
267 skipped_bytes_ = 0;
268 state_ = EMPTY;
269 limit_ = 0;
270
271 last_error_ = net::OK;
272
273 // Reset the offset by creating a new one. Note that DrainableIOBuffers don't
274 // actually allocate their own buffer memory like normal IOBuffers. This will
275 // just reset the pointers to point to the beginning of io_buffer_'s data.
276 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
277 io_buffer_->size());
278 }
279
280 void SocketInputStream::CloseStream(int result, const base::Closure& callback) {
281 ResetInternal();
282 state_ = CLOSED;
283 last_error_ = result;
284
285 if (!callback.is_null())
286 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
Ryan Sleevi 2013/09/10 19:35:27 FWIW, net/ code synchronously runs callbacks (and
Nicolas Zea 2013/09/12 22:46:58 Done.
287 }
288
289 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
290 : socket_(socket),
291 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
292 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
293 io_buffer_->size())),
294 buffer_used_(0),
295 state_(EMPTY),
296 weak_ptr_factory_(this) {
297 DCHECK(socket->IsConnected());
298 }
299
300 SocketOutputStream::~SocketOutputStream() {
301 }
302
303 bool SocketOutputStream::Next(void** data, int* size) {
304 DCHECK_NE(state_, CLOSED);
305 DCHECK_NE(state_, FLUSHING);
306 if (buffer_used_ == io_buffer_->size())
307 return false;
308
309 *data = io_buffer_->data() + buffer_used_;
310 *size = io_buffer_->size() - buffer_used_;
311 buffer_used_ = io_buffer_->size();
312 state_ = READY;
313 return true;
314 }
315
316 void SocketOutputStream::BackUp(int count) {
317 DCHECK_GE(count, 0);
318 if (count > buffer_used_)
319 buffer_used_ = 0;
320 buffer_used_ -= count;
321 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
322 << buffer_used_ << " bytes used.";
323 }
324
325 int64 SocketOutputStream::ByteCount() const {
326 DCHECK_NE(state_, CLOSED);
327 DCHECK_NE(state_, FLUSHING);
328 return buffer_used_;
329 }
330
331 void SocketOutputStream::Flush(const base::Closure& callback) {
332 DCHECK_EQ(state_, READY);
333 state_ = FLUSHING;
334
335 if (!socket_->IsConnected()) {
336 LOG(ERROR) << "Socket was disconnected, closing output stream";
337 last_error_ = net::ERR_CONNECTION_CLOSED;
338 state_ = CLOSED;
339 if (!callback.is_null())
340 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
341 return;
342 }
343
344 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
345 int result = socket_->Write(
346 drainable_io_buffer_,
347 buffer_used_,
348 base::Bind(&SocketOutputStream::FlushCompletionCallback,
349 weak_ptr_factory_.GetWeakPtr(),
350 callback));
351 DVLOG(1) << "Write returned " << result;
352 if (result != net::ERR_IO_PENDING)
353 FlushCompletionCallback(callback, result);
354 }
355
356 SocketOutputStream::State SocketOutputStream::state() const{
357 return state_;
358 }
359
360 int SocketOutputStream::last_error() const {
361 return last_error_;
362 }
363
364 void SocketOutputStream::FlushCompletionCallback(
365 const base::Closure& callback, int result) {
366 DCHECK_EQ(state_, FLUSHING);
367 if (result < net::OK) {
368 LOG(ERROR) << "Failed to flush socket.";
369 last_error_ = result;
370 state_ = CLOSED;
371 if (!callback.is_null())
372 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
373 return;
374 }
375
376 state_ = READY;
377 if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) {
378 DVLOG(1) << "Partial flush complete. Retrying.";
379 // Only a partial write was completed. Flush again to finish the write.
380 drainable_io_buffer_->SetOffset(
381 drainable_io_buffer_->BytesConsumed() + result);
382 Flush(callback);
383 return;
384 }
385
386 DVLOG(1) << "Socket flush complete.";
387 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
388 io_buffer_->size());
389 state_ = EMPTY;
390 buffer_used_ = 0;
391 if (!callback.is_null())
392 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
393 }
394
395 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698