Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(180)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: StringPiece + UnreadByteCount Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
akalin 2013/10/16 07:16:31 include bind.h
Nicolas Zea 2013/10/16 19:56:18 Done.
7 #include "base/callback.h"
8 #include "net/base/io_buffer.h"
9 #include "net/socket/stream_socket.h"
10
11 namespace gcm {
12
13 namespace {
14
15 // TODO(zea): consider having dynamically-sized buffers if this becomes too
16 // expensive.
17 const uint32 kDefaultBufferSize = 8*1024;
18
19 } // namespace
20
21 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
22 : socket_(socket),
23 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
24 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
25 kDefaultBufferSize)),
26 next_pos_(0),
27 last_error_(net::OK),
28 weak_ptr_factory_(this) {
29 DCHECK(socket->IsConnected());
30 }
31
32 SocketInputStream::~SocketInputStream() {
33 }
34
35 bool SocketInputStream::Next(const void** data, int* size) {
36 if (GetState() != EMPTY && GetState() != READY) {
37 NOTREACHED() << "Invalid input stream read attempt.";
38 return false;
39 }
40
41 if (GetState() == EMPTY) {
42 DVLOG(1) << "No unread data remaining, ending read.";
43 return false;
44 }
45
46 DCHECK_EQ(GetState(), READY)
47 << " Input stream must have pending data before reading.";
48 DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
49 *data = io_buffer_->data() + next_pos_;
50 *size = UnreadByteCount();
51 next_pos_ = read_buffer_->BytesConsumed();
52 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
53 return true;
54 }
55
56 void SocketInputStream::BackUp(int count) {
57 DCHECK(GetState() == READY || GetState() == EMPTY);
58 DCHECK_GT(count, 0);
59 DCHECK_LE(count, next_pos_);
60
61 next_pos_ -= count;
62 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
63 << "Current position now at " << next_pos_
64 << " of " << read_buffer_->BytesConsumed();
65 }
66
67 bool SocketInputStream::Skip(int count) {
68 NOTIMPLEMENTED();
69 return false;
70 }
71
72 int64 SocketInputStream::ByteCount() const {
73 DCHECK_NE(GetState(), CLOSED);
74 DCHECK_NE(GetState(), READING);
75 return next_pos_;
76 }
77
78 size_t SocketInputStream::UnreadByteCount() const {
79 DCHECK_NE(GetState(), CLOSED);
80 DCHECK_NE(GetState(), READING);
81 return read_buffer_->BytesConsumed() - next_pos_;
82 }
83
84 net::Error SocketInputStream::Refresh(const base::Closure& callback,
85 int byte_limit) {
86 DCHECK_NE(GetState(), CLOSED);
87 DCHECK_NE(GetState(), READING);
88 DCHECK_GT(byte_limit, 0);
89
90 if (byte_limit > read_buffer_->BytesRemaining()) {
91 NOTREACHED() << "Out of buffer space, closing input stream.";
92 CloseStream(net::ERR_UNEXPECTED, base::Closure());
93 return net::OK;
94 }
95
96 if (!socket_->IsConnected()) {
97 LOG(ERROR) << "Socket was disconnected, closing input stream";
98 CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
99 return net::OK;
100 }
101
102 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
103 int result = socket_->Read(
104 read_buffer_,
105 byte_limit,
106 base::Bind(&SocketInputStream::RefreshCompletionCallback,
107 weak_ptr_factory_.GetWeakPtr(),
108 callback));
109 DVLOG(1) << "Read returned " << result;
110 if (result == net::ERR_IO_PENDING) {
111 last_error_ = net::ERR_IO_PENDING;
112 return net::ERR_IO_PENDING;
113 }
114
115 RefreshCompletionCallback(base::Closure(), result);
116 return net::OK;
117 }
118
119 void SocketInputStream::RebuildBuffer() {
120 DVLOG(1) << "Rebuilding input stream, consumed "
121 << next_pos_ << " bytes.";
122 DCHECK_NE(GetState(), READING);
123 DCHECK_NE(GetState(), CLOSED);
124
125 int unread_data_size = 0;
126 const void* unread_data_ptr = NULL;
127 Next(&unread_data_ptr, &unread_data_size);
128 ResetInternal();
129
130 if (unread_data_ptr != io_buffer_->data()) {
131 DVLOG(1) << "Have " << unread_data_size
132 << " unread bytes remaining, shifting.";
133 // Move any remaining unread data to the start of the buffer;
134 std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
135 } else {
136 DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
137 }
138 read_buffer_->DidConsume(unread_data_size);
139 }
140
141 net::Error SocketInputStream::last_error() const {
142 return last_error_;
143 }
144
145 SocketInputStream::State SocketInputStream::GetState() const {
146 if (last_error_ < net::ERR_IO_PENDING)
147 return CLOSED;
148
149 if (last_error_ == net::ERR_IO_PENDING)
150 return READING;
151
152 DCHECK_EQ(last_error_, net::OK);
153 if (read_buffer_->BytesConsumed() == next_pos_)
154 return EMPTY;
155
156 return READY;
157 }
158
159 void SocketInputStream::RefreshCompletionCallback(
160 const base::Closure& callback, int result) {
161 // If an error occurred before the completion callback could complete, ignore
162 // the result.
163 if (GetState() == CLOSED)
164 return;
165
166 // Result == 0 implies EOF, which is treated as an error.
167 if (result == 0)
168 result = net::ERR_CONNECTION_CLOSED;
169
170 DCHECK_NE(result, net::ERR_IO_PENDING);
171
172 if (result < net::OK) {
173 DVLOG(1) << "Failed to refresh socket: " << result;
174 CloseStream(static_cast<net::Error>(result), callback);
175 return;
176 }
177
178 DCHECK_GT(result, 0);
179 last_error_ = net::OK;
180 read_buffer_->DidConsume(result);
181
182 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
183 << "Current position " << next_pos_
184 << " of " << read_buffer_->BytesConsumed() << ".";
185
186 if (!callback.is_null())
187 callback.Run();
188 }
189
190 void SocketInputStream::ResetInternal() {
191 read_buffer_->SetOffset(0);
192 next_pos_ = 0;
193 last_error_ = net::OK;
194 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
195 }
196
197 void SocketInputStream::CloseStream(net::Error error,
198 const base::Closure& callback) {
199 DCHECK_LT(error, net::ERR_IO_PENDING);
200 ResetInternal();
201 last_error_ = error;
202 LOG(ERROR) << "Closing stream with result " << error;
203 if (!callback.is_null())
204 callback.Run();
205 }
206
207 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
208 : socket_(socket),
209 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
210 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
211 kDefaultBufferSize)),
212 next_pos_(0),
213 last_error_(net::OK),
214 weak_ptr_factory_(this) {
215 DCHECK(socket->IsConnected());
216 }
217
218 SocketOutputStream::~SocketOutputStream() {
219 }
220
221 bool SocketOutputStream::Next(void** data, int* size) {
222 DCHECK_NE(GetState(), CLOSED);
223 DCHECK_NE(GetState(), FLUSHING);
224 if (next_pos_ == write_buffer_->size())
225 return false;
226
227 *data = write_buffer_->data() + next_pos_;
228 *size = write_buffer_->size() - next_pos_;
229 next_pos_ = write_buffer_->size();
230 return true;
231 }
232
233 void SocketOutputStream::BackUp(int count) {
234 DCHECK_GE(count, 0);
235 if (count > next_pos_)
236 next_pos_ = 0;
237 next_pos_ -= count;
238 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
239 << next_pos_ << " bytes used.";
240 }
241
242 int64 SocketOutputStream::ByteCount() const {
243 DCHECK_NE(GetState(), CLOSED);
244 DCHECK_NE(GetState(), FLUSHING);
245 return next_pos_;
246 }
247
248 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
249 DCHECK_EQ(GetState(), READY);
250
251 if (!socket_->IsConnected()) {
252 LOG(ERROR) << "Socket was disconnected, closing output stream";
253 last_error_ = net::ERR_CONNECTION_CLOSED;
254 return net::OK;
255 }
256
257 DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
258 int result = socket_->Write(
259 write_buffer_,
260 next_pos_,
261 base::Bind(&SocketOutputStream::FlushCompletionCallback,
262 weak_ptr_factory_.GetWeakPtr(),
263 callback));
264 DVLOG(1) << "Write returned " << result;
265 if (result == net::ERR_IO_PENDING) {
266 last_error_ = net::ERR_IO_PENDING;
267 return net::ERR_IO_PENDING;
268 }
269
270 FlushCompletionCallback(base::Closure(), result);
271 return net::OK;
272 }
273
274 SocketOutputStream::State SocketOutputStream::GetState() const{
275 if (last_error_ < net::ERR_IO_PENDING)
276 return CLOSED;
277
278 if (last_error_ == net::ERR_IO_PENDING)
279 return FLUSHING;
280
281 DCHECK_EQ(last_error_, net::OK);
282 if (next_pos_ == 0)
283 return EMPTY;
284
285 return READY;
286 }
287
288 net::Error SocketOutputStream::last_error() const {
289 return last_error_;
290 }
291
292 void SocketOutputStream::FlushCompletionCallback(
293 const base::Closure& callback, int result) {
294 // If an error occurred before the completion callback could complete, ignore
295 // the result.
296 if (GetState() == CLOSED)
297 return;
298
299 // Result == 0 implies EOF, which is treated as an error.
300 if (result == 0)
301 result = net::ERR_CONNECTION_CLOSED;
302
303 DCHECK_NE(result, net::ERR_IO_PENDING);
304
305 if (result < net::OK) {
306 LOG(ERROR) << "Failed to flush socket.";
307 last_error_ = static_cast<net::Error>(result);
308 if (!callback.is_null())
309 callback.Run();
310 return;
311 }
312
313 DCHECK_GT(result, net::OK);
314 last_error_ = net::OK;
315
316 if (write_buffer_->BytesConsumed() + result < next_pos_) {
317 DVLOG(1) << "Partial flush complete. Retrying.";
318 // Only a partial write was completed. Flush again to finish the write.
319 write_buffer_->DidConsume(result);
320 Flush(callback);
321 return;
322 }
323
324 DVLOG(1) << "Socket flush complete.";
325 write_buffer_->SetOffset(0);
326 next_pos_ = 0;
327 if (!callback.is_null())
328 callback.Run();
329 }
330
331 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698