Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Review + refactor timeout logic into separate component Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h"
10 #include "net/socket/stream_socket.h"
11
12 namespace gcm {
13
14 // TODO(zea): consider having dynamically sized buffers if this becomes too
15 // expensive.
16 const uint32 kDefaultBufferSize = 8*1024;
17
18 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
19 : socket_(socket),
20 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
21 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
22 io_buffer_->size())),
23 buffer_read_pos_(0),
24 buffer_write_pos_(0),
25 backup_bytes_(0),
26 skipped_bytes_(0),
27 last_error_(net::OK),
28 state_(EMPTY),
29 weak_ptr_factory_(this) {
30 DCHECK(socket->IsConnected());
31 }
32
33 SocketInputStream::~SocketInputStream() {
34 }
35
36 bool SocketInputStream::Next(const void** data, int* size) {
37 DCHECK_NE(state_, CLOSED);
38 DCHECK_NE(state_, READING);
39
40 if (state_ == EMPTY) {
41 DVLOG(1) << "No unread data remaining, ending read.";
42 return false;
43 }
44
45 if (backup_bytes_ > 0) {
46 *size = backup_bytes_;
47 *data = io_buffer_->data() + buffer_read_pos_ - backup_bytes_;
48 backup_bytes_ = 0;
49 DCHECK_GT(*size, 0);
50 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
51 if (backup_bytes_ == 0 && buffer_read_pos_ == buffer_write_pos_)
52 state_ = EMPTY;
53 return true;
54 }
55
56 DCHECK_EQ(state_, READY)
57 << " Input stream must have pending data before reading.";
58 DCHECK_NE(buffer_write_pos_, buffer_read_pos_);
59 *data = io_buffer_->data() + buffer_read_pos_;
60 *size = buffer_write_pos_ - buffer_read_pos_;
61 buffer_read_pos_ = buffer_write_pos_;
62 state_ = EMPTY;
63 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
64 return true;
65 }
66
67 void SocketInputStream::BackUp(int count) {
68 DCHECK(state_ == READY || state_ == EMPTY);
69 DCHECK_GE(backup_bytes_, 0);
70 DCHECK_LE(backup_bytes_, buffer_read_pos_);
71 DCHECK_EQ(skipped_bytes_, 0);
72
73 backup_bytes_ += count;
74 state_ = READY;
75 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
76 << "Current position now at " << buffer_read_pos_ - backup_bytes_
77 << " of " << buffer_write_pos_;
78 }
79
80 bool SocketInputStream::Skip(int count) {
81 DCHECK_EQ(state_, READY);
82 DCHECK_GT(count, 0);
83 DVLOG(1) << "Skipping " << count << " bytes in stream.";
84
85 if (backup_bytes_ >= count) {
86 // We have more data left over than we're trying to skip. Just chop it.
87 backup_bytes_ -= count;
88 return true;
89 }
90
91 count -= backup_bytes_;
92 backup_bytes_ = 0;
93 skipped_bytes_ += count;
94 state_ = EMPTY;
95
96 return true;
97 }
98
99 int64 SocketInputStream::ByteCount() const {
100 DCHECK_NE(state_, CLOSED);
101 DCHECK_NE(state_, READING);
102 return buffer_write_pos_;
103 }
104
105 void SocketInputStream::Refresh(const base::Closure& callback,
106 int byte_limit) {
107 DCHECK_NE(state_, CLOSED);
108 DCHECK_NE(state_, READING);
109 DCHECK_GT(byte_limit, 0);
110 DCHECK_LE(byte_limit, drainable_io_buffer_->BytesRemaining());
111
112 if (buffer_write_pos_ + byte_limit > io_buffer_->size()) {
113 LOG(ERROR) << "Out of buffer space, closing input stream.";
114 CloseStream(net::ERR_UNEXPECTED, callback);
115 return;
116 }
117
118 if (!socket_->IsConnected()) {
119 LOG(ERROR) << "Socket was disconnected, closing input stream";
120 CloseStream(net::ERR_CONNECTION_CLOSED, callback);
121 return;
122 }
123
124 state_ = READING;
125
126 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
127 int result = socket_->Read(
128 drainable_io_buffer_,
129 byte_limit,
130 base::Bind(&SocketInputStream::RefreshCompletionCallback,
131 weak_ptr_factory_.GetWeakPtr(),
132 callback));
133 DVLOG(1) << "Read returned " << result;
134 if (result != net::ERR_IO_PENDING)
135 RefreshCompletionCallback(callback, result);
136 }
137
138 void SocketInputStream::RebuildBuffer() {
139 DCHECK_EQ(skipped_bytes_, 0);
140 DVLOG(1) << "Resetting input stream, consumed "
141 << buffer_read_pos_ - backup_bytes_ << " bytes.";
142 DCHECK_NE(state_, READING);
143 DCHECK_NE(state_, CLOSED);
144
145 int last_read_pos = buffer_read_pos_ - backup_bytes_;
146 char* unread_data_ptr = io_buffer_->data() + last_read_pos;
147 int unread_buffer_size = buffer_write_pos_ - last_read_pos;
148 ResetInternal();
149
150 if (unread_buffer_size > 0) {
151 buffer_write_pos_ = unread_buffer_size;
152 drainable_io_buffer_->SetOffset(buffer_write_pos_);
153 state_ = READY;
154
155 if (last_read_pos != 0) {
156 DVLOG(1) << "Have " << buffer_write_pos_
157 << " unread bytes remaining, shifting.";
158 // Move any remaining unread data to the start of the buffer;
159 std::memmove(io_buffer_->data(), unread_data_ptr, buffer_write_pos_);
160 } else {
161 DVLOG(1) << "Have " << buffer_write_pos_ << " unread bytes remaining.";
162 }
163 }
164 }
165
166 int SocketInputStream::last_error() const {
167 return last_error_;
168 }
169
170 SocketInputStream::State SocketInputStream::state() const {
171 return state_;
172 }
173
174 void SocketInputStream::RefreshCompletionCallback(
175 const base::Closure& callback, int result) {
176 DCHECK_EQ(state_, READING);
177 if (state_ == CLOSED) {
178 // An error occured before the completion callback could complete. Ignore
179 // the result.
180 return;
181 }
182
183 if (result < net::OK) {
184 DVLOG(1) << "Failed to refresh socket: " << result;
185 CloseStream(result, callback);
186 return;
187 }
188 DCHECK_GT(result, 0);
189
190 state_ = READY;
191 buffer_write_pos_ += result;
192 int bytes_to_skip = std::min(skipped_bytes_, result);
193 buffer_read_pos_ += bytes_to_skip;
194 skipped_bytes_ = std::max(skipped_bytes_ - result, 0);
195 drainable_io_buffer_->SetOffset(buffer_write_pos_);
196 if (buffer_read_pos_ - backup_bytes_ == buffer_write_pos_)
197 state_ = EMPTY;
198
199 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
200 << "Current position " << buffer_read_pos_ - backup_bytes_
201 << " of " << buffer_write_pos_ << ".";
202
203 if (!callback.is_null()) callback.Run();
204 }
205
206 void SocketInputStream::ResetInternal() {
207 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
208 buffer_write_pos_ = 0;
209 buffer_read_pos_ = 0;
210 backup_bytes_ = 0;
211 skipped_bytes_ = 0;
212 state_ = EMPTY;
213
214 last_error_ = net::OK;
215
216 // Reset the offset by creating a new one. Note that DrainableIOBuffers don't
217 // actually allocate their own buffer memory like normal IOBuffers. This will
218 // just reset the pointers to point to the beginning of io_buffer_'s data.
219 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
220 io_buffer_->size());
221 }
222
223 void SocketInputStream::CloseStream(int error, const base::Closure& callback) {
224 ResetInternal();
225 state_ = CLOSED;
226 last_error_ = error;
227 LOG(ERROR) << "Closing stream with result " << error;
228 if (!callback.is_null()) callback.Run();
229 }
230
231 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
232 : socket_(socket),
233 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
234 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
235 io_buffer_->size())),
236 buffer_used_(0),
237 state_(EMPTY),
238 weak_ptr_factory_(this) {
239 DCHECK(socket->IsConnected());
240 }
241
242 SocketOutputStream::~SocketOutputStream() {
243 }
244
245 bool SocketOutputStream::Next(void** data, int* size) {
246 DCHECK_NE(state_, CLOSED);
247 DCHECK_NE(state_, FLUSHING);
248 if (buffer_used_ == io_buffer_->size())
249 return false;
250
251 *data = io_buffer_->data() + buffer_used_;
252 *size = io_buffer_->size() - buffer_used_;
253 buffer_used_ = io_buffer_->size();
254 state_ = READY;
255 return true;
256 }
257
258 void SocketOutputStream::BackUp(int count) {
259 DCHECK_GE(count, 0);
260 if (count > buffer_used_)
261 buffer_used_ = 0;
262 buffer_used_ -= count;
263 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
264 << buffer_used_ << " bytes used.";
265 }
266
267 int64 SocketOutputStream::ByteCount() const {
268 DCHECK_NE(state_, CLOSED);
269 DCHECK_NE(state_, FLUSHING);
270 return buffer_used_;
271 }
272
273 void SocketOutputStream::Flush(const base::Closure& callback) {
274 DCHECK_EQ(state_, READY);
275 state_ = FLUSHING;
276
277 if (!socket_->IsConnected()) {
278 LOG(ERROR) << "Socket was disconnected, closing output stream";
279 last_error_ = net::ERR_CONNECTION_CLOSED;
280 state_ = CLOSED;
281 if (!callback.is_null()) callback.Run();
282 return;
283 }
284
285 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
286 int result = socket_->Write(
287 drainable_io_buffer_,
288 buffer_used_,
289 base::Bind(&SocketOutputStream::FlushCompletionCallback,
290 weak_ptr_factory_.GetWeakPtr(),
291 callback));
292 DVLOG(1) << "Write returned " << result;
293 if (result != net::ERR_IO_PENDING)
294 FlushCompletionCallback(callback, result);
295 }
296
297 SocketOutputStream::State SocketOutputStream::state() const{
298 return state_;
299 }
300
301 int SocketOutputStream::last_error() const {
302 return last_error_;
303 }
304
305 void SocketOutputStream::FlushCompletionCallback(
306 const base::Closure& callback, int result) {
307 DCHECK_EQ(state_, FLUSHING);
308 if (result < net::OK) {
309 LOG(ERROR) << "Failed to flush socket.";
310 last_error_ = result;
311 state_ = CLOSED;
312 if (!callback.is_null()) callback.Run();
313 return;
314 }
315
316 state_ = READY;
317 if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) {
318 DVLOG(1) << "Partial flush complete. Retrying.";
319 // Only a partial write was completed. Flush again to finish the write.
320 drainable_io_buffer_->SetOffset(
321 drainable_io_buffer_->BytesConsumed() + result);
322 Flush(callback);
323 return;
324 }
325
326 DVLOG(1) << "Socket flush complete.";
327 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
328 io_buffer_->size());
329 state_ = EMPTY;
330 buffer_used_ = 0;
331 if (!callback.is_null()) callback.Run();
332 }
333
334 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698