Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(263)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "base/message_loop/message_loop.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
11
12 namespace gcm {
13
14 namespace {
15
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
17 // expensive.
18 const uint32 kDefaultBufferSize = 8*1024;
19
20 } // namespace
21
22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
23 : socket_(socket),
24 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
akalin 2013/10/07 23:00:08 plain IOBuffer here?
Nicolas Zea 2013/10/09 00:44:22 Done.
25 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
26 kDefaultBufferSize)),
27 buffer_read_pos_(0),
28 last_error_(net::OK),
29 state_(EMPTY),
30 weak_ptr_factory_(this) {
31 DCHECK(socket->IsConnected());
32 }
33
34 SocketInputStream::~SocketInputStream() {
35 }
36
37 bool SocketInputStream::Next(const void** data, int* size) {
38 DCHECK_NE(state_, CLOSED);
39 DCHECK_NE(state_, READING);
40
41 if (state_ == EMPTY) {
42 DVLOG(1) << "No unread data remaining, ending read.";
43 return false;
44 }
45
46 DCHECK_EQ(state_, READY)
47 << " Input stream must have pending data before reading.";
48 DCHECK_NE(drainable_io_buffer_->BytesConsumed(), buffer_read_pos_);
49 *data = io_buffer_->data() + buffer_read_pos_;
50 *size = drainable_io_buffer_->BytesConsumed() - buffer_read_pos_;
51 buffer_read_pos_ = drainable_io_buffer_->BytesConsumed();
52 state_ = EMPTY;
53 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
54 return true;
55 }
56
57 void SocketInputStream::BackUp(int count) {
58 DCHECK(state_ == READY || state_ == EMPTY);
59 DCHECK_GE(count, 0);
60 DCHECK_LE(count, buffer_read_pos_);
61
62 buffer_read_pos_ -= count;
63 state_ = READY;
64 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
65 << "Current position now at " << buffer_read_pos_
66 << " of " << drainable_io_buffer_->BytesConsumed();
67 }
68
69 bool SocketInputStream::Skip(int count) {
70 NOTIMPLEMENTED();
71 return false;
72 }
73
74 int64 SocketInputStream::ByteCount() const {
75 DCHECK_NE(state_, CLOSED);
76 DCHECK_NE(state_, READING);
77 return drainable_io_buffer_->BytesConsumed() - buffer_read_pos_;
78 }
79
80 void SocketInputStream::Refresh(const base::Closure& callback,
81 int byte_limit) {
82 DCHECK_NE(state_, CLOSED);
83 DCHECK_NE(state_, READING);
84 DCHECK_GT(byte_limit, 0);
85 DCHECK_LE(byte_limit, drainable_io_buffer_->BytesRemaining());
86
87 if (drainable_io_buffer_->BytesRemaining() < byte_limit) {
akalin 2013/10/07 23:00:08 can you reverse the order of this comparison? (to
Nicolas Zea 2013/10/09 00:44:22 Done.
88 LOG(ERROR) << "Out of buffer space, closing input stream.";
89 CloseStream(net::ERR_UNEXPECTED, callback);
90 return;
91 }
92
93 if (!socket_->IsConnected()) {
94 LOG(ERROR) << "Socket was disconnected, closing input stream";
95 CloseStream(net::ERR_CONNECTION_CLOSED, callback);
96 return;
97 }
98
99 state_ = READING;
100
101 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
102 int result = socket_->Read(
103 drainable_io_buffer_,
104 byte_limit,
105 base::Bind(&SocketInputStream::RefreshCompletionCallback,
106 weak_ptr_factory_.GetWeakPtr(),
107 callback));
108 DVLOG(1) << "Read returned " << result;
109 if (result != net::ERR_IO_PENDING) {
110 base::MessageLoop::current()->PostTask(
111 FROM_HERE,
112 base::Bind(&SocketInputStream::RefreshCompletionCallback,
113 weak_ptr_factory_.GetWeakPtr(),
114 callback,
115 result));
116 }
117 }
118
119 void SocketInputStream::RebuildBuffer() {
120 DVLOG(1) << "Resetting input stream, consumed "
121 << buffer_read_pos_ << " bytes.";
122 DCHECK_NE(state_, READING);
123 DCHECK_NE(state_, CLOSED);
124
125 int last_read_pos = buffer_read_pos_;
126 char* unread_data_ptr = io_buffer_->data() + last_read_pos;
127 int unread_buffer_size =
128 drainable_io_buffer_->BytesConsumed() - last_read_pos;
129 ResetInternal();
130
131 if (unread_buffer_size > 0) {
132 drainable_io_buffer_->SetOffset(unread_buffer_size);
akalin 2013/10/07 23:00:08 i feel like you want to call SetOffset even when u
Nicolas Zea 2013/10/09 00:44:22 ResetInternal builds a new drainable_io_buffer_, w
akalin 2013/10/10 08:47:40 forgot to address this? (now that we're not buildi
Nicolas Zea 2013/10/11 01:14:30 n/a in newest version I believe.
133 state_ = READY;
134
135 if (last_read_pos != 0) {
136 DVLOG(1) << "Have " << unread_buffer_size
137 << " unread bytes remaining, shifting.";
138 // Move any remaining unread data to the start of the buffer;
139 std::memmove(io_buffer_->data(), unread_data_ptr, unread_buffer_size);
140 } else {
141 DVLOG(1) << "Have " << unread_buffer_size << " unread bytes remaining.";
142 }
143 }
144 }
145
146 net::Error SocketInputStream::last_error() const {
147 return last_error_;
148 }
149
150 SocketInputStream::State SocketInputStream::state() const {
151 return state_;
152 }
153
154 void SocketInputStream::RefreshCompletionCallback(
155 const base::Closure& callback, int result) {
156 DCHECK_EQ(state_, READING);
157 if (state_ == CLOSED) {
158 // An error occured before the completion callback could complete. Ignore
159 // the result.
160 return;
161 }
162
163 if (result < net::OK) {
164 DVLOG(1) << "Failed to refresh socket: " << result;
165 CloseStream(static_cast<net::Error>(result), callback);
166 return;
167 }
168 DCHECK_GT(result, 0);
169
170 state_ = READY;
171 drainable_io_buffer_->DidConsume(result);
172 if (buffer_read_pos_ == drainable_io_buffer_->BytesConsumed())
173 state_ = EMPTY;
174
175 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
176 << "Current position " << buffer_read_pos_
177 << " of " << drainable_io_buffer_->BytesConsumed() << ".";
178
179 if (!callback.is_null())
180 callback.Run();
181 }
182
183 void SocketInputStream::ResetInternal() {
184 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
185 buffer_read_pos_ = 0;
186 state_ = EMPTY;
187
188 last_error_ = net::OK;
189
190 // Reset the offset by creating a new one. Note that DrainableIOBuffers don't
191 // actually allocate their own buffer memory like normal IOBuffers. This will
192 // just reset the pointers to point to the beginning of io_buffer_'s data.
193 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
akalin 2013/10/07 23:00:08 use setoffset here?
Nicolas Zea 2013/10/09 00:44:22 SetOffset (and DrainableIOBuffer in general) only
akalin 2013/10/09 22:04:28 Huh, are you sure? I'm looking at DrainableIOBuffe
Nicolas Zea 2013/10/09 23:24:49 Huh, you're right. I completely misread that code
194 kDefaultBufferSize);
195 }
196
197 void SocketInputStream::CloseStream(net::Error error,
198 const base::Closure& callback) {
199 ResetInternal();
200 state_ = CLOSED;
201 last_error_ = error;
202 LOG(ERROR) << "Closing stream with result " << error;
203 if (!callback.is_null())
204 callback.Run();
205 }
206
207 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
208 : socket_(socket),
209 io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)),
210 drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
211 kDefaultBufferSize)),
212 buffer_used_(0),
213 state_(EMPTY),
214 weak_ptr_factory_(this) {
215 DCHECK(socket->IsConnected());
216 }
217
218 SocketOutputStream::~SocketOutputStream() {
219 }
220
221 bool SocketOutputStream::Next(void** data, int* size) {
222 DCHECK_NE(state_, CLOSED);
223 DCHECK_NE(state_, FLUSHING);
224 if (buffer_used_ == drainable_io_buffer_->size())
225 return false;
226
227 *data = drainable_io_buffer_->data() + buffer_used_;
228 *size = drainable_io_buffer_->size() - buffer_used_;
229 buffer_used_ = drainable_io_buffer_->size();
230 state_ = READY;
231 return true;
232 }
233
234 void SocketOutputStream::BackUp(int count) {
235 DCHECK_GE(count, 0);
236 if (count > buffer_used_)
237 buffer_used_ = 0;
238 buffer_used_ -= count;
239 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
240 << buffer_used_ << " bytes used.";
241 }
242
243 int64 SocketOutputStream::ByteCount() const {
244 DCHECK_NE(state_, CLOSED);
245 DCHECK_NE(state_, FLUSHING);
246 return buffer_used_;
247 }
248
249 void SocketOutputStream::Flush(const base::Closure& callback) {
250 DCHECK_EQ(state_, READY);
251 state_ = FLUSHING;
252
253 if (!socket_->IsConnected()) {
254 LOG(ERROR) << "Socket was disconnected, closing output stream";
255 last_error_ = net::ERR_CONNECTION_CLOSED;
256 state_ = CLOSED;
257 if (!callback.is_null())
258 callback.Run();
259 return;
260 }
261
262 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
263 int result = socket_->Write(
264 drainable_io_buffer_,
265 buffer_used_,
266 base::Bind(&SocketOutputStream::FlushCompletionCallback,
267 weak_ptr_factory_.GetWeakPtr(),
268 callback));
269 DVLOG(1) << "Write returned " << result;
270 if (result != net::ERR_IO_PENDING) {
271 base::MessageLoop::current()->PostTask(
272 FROM_HERE,
273 base::Bind(&SocketOutputStream::FlushCompletionCallback,
274 weak_ptr_factory_.GetWeakPtr(),
275 callback,
276 result));
277 }
278 }
279
280 SocketOutputStream::State SocketOutputStream::state() const{
281 return state_;
282 }
283
284 net::Error SocketOutputStream::last_error() const {
285 return last_error_;
286 }
287
288 void SocketOutputStream::FlushCompletionCallback(
289 const base::Closure& callback, int result) {
290 DCHECK_EQ(state_, FLUSHING);
291 if (result < net::OK) {
292 LOG(ERROR) << "Failed to flush socket.";
293 last_error_ = static_cast<net::Error>(result);
294 state_ = CLOSED;
295 if (!callback.is_null())
296 callback.Run();
297 return;
298 }
299
300 state_ = READY;
301 if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) {
302 DVLOG(1) << "Partial flush complete. Retrying.";
303 // Only a partial write was completed. Flush again to finish the write.
304 drainable_io_buffer_->DidConsume(result);
305 Flush(callback);
306 return;
307 }
308
309 DVLOG(1) << "Socket flush complete.";
310 drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(),
311 kDefaultBufferSize);
312 state_ = EMPTY;
313 buffer_used_ = 0;
314 if (!callback.is_null())
315 callback.Run();
316 }
317
318 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698