Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(290)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: address comments Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h"
10 #include "net/socket/stream_socket.h"
11
12 namespace gcm {
13
14 // TODO(zea): consider having dynamically sized buffers if this becomes too
akalin 2013/10/04 18:37:04 dynamically sized -> dynamically-sized
Nicolas Zea 2013/10/04 20:55:28 Done.
15 // expensive.
16 const uint32 kDefaultBufferSize = 8*1024;
akalin 2013/10/04 18:37:04 put this in anon namespace?
Nicolas Zea 2013/10/04 20:55:28 Done.
17
18 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
19 : socket_(socket),
20 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
21 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
22 io_buffer_->size())),
23 buffer_read_pos_(0),
24 buffer_write_pos_(0),
25 backup_bytes_(0),
26 skipped_bytes_(0),
27 last_error_(net::OK),
28 state_(EMPTY),
29 weak_ptr_factory_(this) {
30 DCHECK(socket->IsConnected());
31 }
32
33 SocketInputStream::~SocketInputStream() {
34 }
35
36 bool SocketInputStream::Next(const void** data, int* size) {
37 DCHECK_NE(state_, CLOSED);
38 DCHECK_NE(state_, READING);
39
40 if (state_ == EMPTY) {
41 DVLOG(1) << "No unread data remaining, ending read.";
42 return false;
43 }
44
45 if (backup_bytes_ > 0) {
akalin 2013/10/04 18:37:04 DCHECK backup_bytes_ isn't too big?
Nicolas Zea 2013/10/04 20:55:28 n/a now
46 *size = backup_bytes_;
akalin 2013/10/04 18:37:04 put *size assignment after *data assignment
Nicolas Zea 2013/10/04 20:55:28 n/a now
47 *data = io_buffer_->data() + buffer_read_pos_ - backup_bytes_;
48 backup_bytes_ = 0;
49 DCHECK_GT(*size, 0);
akalin 2013/10/04 18:37:04 i'm all for copious DCHECKs but this seems pretty
Nicolas Zea 2013/10/04 20:55:28 n/a now
50 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
51 if (backup_bytes_ == 0 && buffer_read_pos_ == buffer_write_pos_)
52 state_ = EMPTY;
53 return true;
54 }
55
56 DCHECK_EQ(state_, READY)
57 << " Input stream must have pending data before reading.";
58 DCHECK_NE(buffer_write_pos_, buffer_read_pos_);
59 *data = io_buffer_->data() + buffer_read_pos_;
60 *size = buffer_write_pos_ - buffer_read_pos_;
61 buffer_read_pos_ = buffer_write_pos_;
62 state_ = EMPTY;
63 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
64 return true;
65 }
66
67 void SocketInputStream::BackUp(int count) {
68 DCHECK(state_ == READY || state_ == EMPTY);
69 DCHECK_GE(count, 0);
70 DCHECK_GE(backup_bytes_, 0);
71 DCHECK_LE(backup_bytes_, buffer_read_pos_);
72 DCHECK_EQ(skipped_bytes_, 0);
73
74 backup_bytes_ += count;
75 state_ = READY;
76 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
77 << "Current position now at " << buffer_read_pos_ - backup_bytes_
78 << " of " << buffer_write_pos_;
79 }
80
81 bool SocketInputStream::Skip(int count) {
82 DCHECK_EQ(state_, READY);
83 DCHECK_GT(count, 0);
84 DVLOG(1) << "Skipping " << count << " bytes in stream.";
85
86 if (backup_bytes_ >= count) {
87 // We have more data left over than we're trying to skip. Just chop it.
88 backup_bytes_ -= count;
89 return true;
90 }
91
92 count -= backup_bytes_;
93 backup_bytes_ = 0;
94 skipped_bytes_ += count;
95 state_ = EMPTY;
96
97 return true;
98 }
99
100 int64 SocketInputStream::ByteCount() const {
101 DCHECK_NE(state_, CLOSED);
102 DCHECK_NE(state_, READING);
103 return buffer_write_pos_ - buffer_read_pos_ + backup_bytes_;
104 }
105
106 void SocketInputStream::Refresh(const base::Closure& callback,
107 int byte_limit) {
108 DCHECK_NE(state_, CLOSED);
109 DCHECK_NE(state_, READING);
110 DCHECK_GT(byte_limit, 0);
111 DCHECK_LE(byte_limit, drainable_io_buffer_->BytesRemaining());
112
113 if (buffer_write_pos_ + byte_limit > io_buffer_->size()) {
114 LOG(ERROR) << "Out of buffer space, closing input stream.";
115 CloseStream(net::ERR_UNEXPECTED, callback);
116 return;
117 }
118
119 if (!socket_->IsConnected()) {
120 LOG(ERROR) << "Socket was disconnected, closing input stream";
121 CloseStream(net::ERR_CONNECTION_CLOSED, callback);
122 return;
123 }
124
125 state_ = READING;
126
127 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
128 int result = socket_->Read(
129 drainable_io_buffer_,
130 byte_limit,
131 base::Bind(&SocketInputStream::RefreshCompletionCallback,
132 weak_ptr_factory_.GetWeakPtr(),
133 callback));
134 DVLOG(1) << "Read returned " << result;
135 if (result != net::ERR_IO_PENDING)
136 RefreshCompletionCallback(callback, result);
137 }
138
139 void SocketInputStream::RebuildBuffer() {
140 DCHECK_EQ(skipped_bytes_, 0);
141 DVLOG(1) << "Resetting input stream, consumed "
142 << buffer_read_pos_ - backup_bytes_ << " bytes.";
143 DCHECK_NE(state_, READING);
144 DCHECK_NE(state_, CLOSED);
145
146 int last_read_pos = buffer_read_pos_ - backup_bytes_;
147 char* unread_data_ptr = io_buffer_->data() + last_read_pos;
148 int unread_buffer_size = buffer_write_pos_ - last_read_pos;
149 ResetInternal();
150
151 if (unread_buffer_size > 0) {
152 buffer_write_pos_ = unread_buffer_size;
153 drainable_io_buffer_->SetOffset(buffer_write_pos_);
154 state_ = READY;
155
156 if (last_read_pos != 0) {
157 DVLOG(1) << "Have " << buffer_write_pos_
158 << " unread bytes remaining, shifting.";
159 // Move any remaining unread data to the start of the buffer;
160 std::memmove(io_buffer_->data(), unread_data_ptr, buffer_write_pos_);
161 } else {
162 DVLOG(1) << "Have " << buffer_write_pos_ << " unread bytes remaining.";
163 }
164 }
165 }
166
167 int SocketInputStream::last_error() const {
168 return last_error_;
169 }
170
171 SocketInputStream::State SocketInputStream::state() const {
172 return state_;
173 }
174
175 void SocketInputStream::RefreshCompletionCallback(
176 const base::Closure& callback, int result) {
177 DCHECK_EQ(state_, READING);
178 if (state_ == CLOSED) {
179 // An error occured before the completion callback could complete. Ignore
180 // the result.
181 return;
182 }
183
184 if (result < net::OK) {
185 DVLOG(1) << "Failed to refresh socket: " << result;
186 CloseStream(result, callback);
187 return;
188 }
189 DCHECK_GT(result, 0);
190
191 state_ = READY;
192 buffer_write_pos_ += result;
193 int bytes_to_skip = std::min(skipped_bytes_, result);
194 buffer_read_pos_ += bytes_to_skip;
195 skipped_bytes_ = std::max(skipped_bytes_ - result, 0);
196 drainable_io_buffer_->SetOffset(buffer_write_pos_);
197 if (buffer_read_pos_ - backup_bytes_ == buffer_write_pos_)
198 state_ = EMPTY;
199
200 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
201 << "Current position " << buffer_read_pos_ - backup_bytes_
202 << " of " << buffer_write_pos_ << ".";
203
204 if (!callback.is_null()) callback.Run();
akalin 2013/10/04 18:37:04 newline before callback.Run()
Nicolas Zea 2013/10/04 20:55:28 Done.
205 }
206
207 void SocketInputStream::ResetInternal() {
208 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
209 buffer_write_pos_ = 0;
210 buffer_read_pos_ = 0;
211 backup_bytes_ = 0;
212 skipped_bytes_ = 0;
213 state_ = EMPTY;
214
215 last_error_ = net::OK;
216
217 // Reset the offset by creating a new one. Note that DrainableIOBuffers don't
218 // actually allocate their own buffer memory like normal IOBuffers. This will
219 // just reset the pointers to point to the beginning of io_buffer_'s data.
220 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
221 io_buffer_->size());
222 }
223
224 void SocketInputStream::CloseStream(int error, const base::Closure& callback) {
225 ResetInternal();
226 state_ = CLOSED;
227 last_error_ = error;
228 LOG(ERROR) << "Closing stream with result " << error;
229 if (!callback.is_null()) callback.Run();
230 }
231
232 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
akalin 2013/10/04 18:37:04 (blanket comment) apply above comments on SocketIn
Nicolas Zea 2013/10/04 20:55:28 Done.
233 : socket_(socket),
234 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
235 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
236 io_buffer_->size())),
237 buffer_used_(0),
238 state_(EMPTY),
239 weak_ptr_factory_(this) {
240 DCHECK(socket->IsConnected());
241 }
242
243 SocketOutputStream::~SocketOutputStream() {
244 }
245
246 bool SocketOutputStream::Next(void** data, int* size) {
247 DCHECK_NE(state_, CLOSED);
248 DCHECK_NE(state_, FLUSHING);
249 if (buffer_used_ == io_buffer_->size())
250 return false;
251
252 *data = io_buffer_->data() + buffer_used_;
253 *size = io_buffer_->size() - buffer_used_;
254 buffer_used_ = io_buffer_->size();
255 state_ = READY;
256 return true;
257 }
258
259 void SocketOutputStream::BackUp(int count) {
260 DCHECK_GE(count, 0);
261 if (count > buffer_used_)
262 buffer_used_ = 0;
263 buffer_used_ -= count;
264 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
265 << buffer_used_ << " bytes used.";
266 }
267
268 int64 SocketOutputStream::ByteCount() const {
269 DCHECK_NE(state_, CLOSED);
270 DCHECK_NE(state_, FLUSHING);
271 return buffer_used_;
272 }
273
274 void SocketOutputStream::Flush(const base::Closure& callback) {
275 DCHECK_EQ(state_, READY);
276 state_ = FLUSHING;
277
278 if (!socket_->IsConnected()) {
279 LOG(ERROR) << "Socket was disconnected, closing output stream";
280 last_error_ = net::ERR_CONNECTION_CLOSED;
281 state_ = CLOSED;
282 if (!callback.is_null()) callback.Run();
283 return;
284 }
285
286 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
287 int result = socket_->Write(
288 drainable_io_buffer_,
289 buffer_used_,
290 base::Bind(&SocketOutputStream::FlushCompletionCallback,
291 weak_ptr_factory_.GetWeakPtr(),
292 callback));
293 DVLOG(1) << "Write returned " << result;
294 if (result != net::ERR_IO_PENDING)
295 FlushCompletionCallback(callback, result);
296 }
297
298 SocketOutputStream::State SocketOutputStream::state() const{
299 return state_;
300 }
301
302 int SocketOutputStream::last_error() const {
303 return last_error_;
304 }
305
306 void SocketOutputStream::FlushCompletionCallback(
307 const base::Closure& callback, int result) {
308 DCHECK_EQ(state_, FLUSHING);
309 if (result < net::OK) {
310 LOG(ERROR) << "Failed to flush socket.";
311 last_error_ = result;
312 state_ = CLOSED;
313 if (!callback.is_null()) callback.Run();
314 return;
315 }
316
317 state_ = READY;
318 if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) {
319 DVLOG(1) << "Partial flush complete. Retrying.";
320 // Only a partial write was completed. Flush again to finish the write.
321 drainable_io_buffer_->SetOffset(
322 drainable_io_buffer_->BytesConsumed() + result);
323 Flush(callback);
324 return;
325 }
326
327 DVLOG(1) << "Socket flush complete.";
328 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
329 io_buffer_->size());
330 state_ = EMPTY;
331 buffer_used_ = 0;
332 if (!callback.is_null()) callback.Run();
333 }
334
335 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698