Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(452)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "net/base/io_buffer.h"
9 #include "net/socket/stream_socket.h"
10
11 namespace gcm {
12
13 namespace {
14
15 // TODO(zea): consider having dynamically-sized buffers if this becomes too
16 // expensive.
17 const uint32 kDefaultBufferSize = 8*1024;
18
19 } // namespace
20
21 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
22 : socket_(socket),
23 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
24 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
25 kDefaultBufferSize)),
26 next_pos_(0),
27 last_error_(net::OK),
28 weak_ptr_factory_(this) {
29 DCHECK(socket->IsConnected());
30 }
31
32 SocketInputStream::~SocketInputStream() {
33 }
34
35 bool SocketInputStream::Next(const void** data, int* size) {
36 if (GetState() != EMPTY && GetState() != READY) {
37 NOTREACHED() << "Invalid input stream read attempt.";
38 return false;
39 }
40
41 if (GetState() == EMPTY) {
42 DVLOG(1) << "No unread data remaining, ending read.";
43 return false;
44 }
45
46 DCHECK_EQ(GetState(), READY)
47 << " Input stream must have pending data before reading.";
48 DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
49 *data = io_buffer_->data() + next_pos_;
50 *size = read_buffer_->BytesConsumed() - next_pos_;
51 next_pos_ = read_buffer_->BytesConsumed();
52 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
53 return true;
54 }
55
56 void SocketInputStream::BackUp(int count) {
57 DCHECK(GetState() == READY || GetState() == EMPTY);
58 DCHECK_GT(count, 0);
59 DCHECK_LE(count, next_pos_);
60
61 next_pos_ -= count;
62 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
63 << "Current position now at " << next_pos_
64 << " of " << read_buffer_->BytesConsumed();
65 }
66
67 bool SocketInputStream::Skip(int count) {
68 NOTIMPLEMENTED();
69 return false;
70 }
71
72 int64 SocketInputStream::ByteCount() const {
73 DCHECK_NE(GetState(), CLOSED);
74 DCHECK_NE(GetState(), READING);
75 return read_buffer_->BytesConsumed() - next_pos_;
76 }
77
78 net::Error SocketInputStream::Refresh(const base::Closure& callback,
79 int byte_limit) {
80 DCHECK_NE(GetState(), CLOSED);
81 DCHECK_NE(GetState(), READING);
82 DCHECK_GT(byte_limit, 0);
83
84 if (byte_limit > read_buffer_->BytesRemaining()) {
85 NOTREACHED() << "Out of buffer space, closing input stream.";
86 CloseStream(net::ERR_UNEXPECTED, base::Closure());
87 return net::OK;
88 }
89
90 if (!socket_->IsConnected()) {
91 LOG(ERROR) << "Socket was disconnected, closing input stream";
92 CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
93 return net::OK;
94 }
95
96 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
97 int result = socket_->Read(
98 read_buffer_,
99 byte_limit,
100 base::Bind(&SocketInputStream::RefreshCompletionCallback,
101 weak_ptr_factory_.GetWeakPtr(),
102 callback));
103 DVLOG(1) << "Read returned " << result;
104 if (result == net::ERR_IO_PENDING) {
105 last_error_ = net::ERR_IO_PENDING;
106 return net::ERR_IO_PENDING;
107 }
108
109 RefreshCompletionCallback(base::Closure(), result);
110 return net::OK;
111 }
112
113 void SocketInputStream::RebuildBuffer() {
114 DVLOG(1) << "Rebuilding input stream, consumed "
115 << next_pos_ << " bytes.";
116 DCHECK_NE(GetState(), READING);
117 DCHECK_NE(GetState(), CLOSED);
118
119 int unread_data_size = 0;
120 const void* unread_data_ptr = NULL;
121 Next(&unread_data_ptr, &unread_data_size);
122 ResetInternal();
123
124 if (unread_data_ptr != io_buffer_->data()) {
125 DVLOG(1) << "Have " << unread_data_size
126 << " unread bytes remaining, shifting.";
127 // Move any remaining unread data to the start of the buffer;
128 std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
129 } else {
130 DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
131 }
132 read_buffer_->DidConsume(unread_data_size);
133 }
134
135 net::Error SocketInputStream::last_error() const {
136 return last_error_;
137 }
138
139 SocketInputStream::State SocketInputStream::GetState() const {
140 if (last_error_ < net::ERR_IO_PENDING)
141 return CLOSED;
142
143 if (last_error_ == net::ERR_IO_PENDING)
144 return READING;
145
146 DCHECK_EQ(last_error_, net::OK);
147 if (read_buffer_->BytesConsumed() == next_pos_)
148 return EMPTY;
149
150 return READY;
151 }
152
153 void SocketInputStream::RefreshCompletionCallback(
154 const base::Closure& callback, int result) {
155 // If an error occurred before the completion callback could complete, ignore
156 // the result.
157 if (GetState() == CLOSED)
158 return;
159
160 // Result == 0 implies EOF, which is treated as an error.
161 if (result == 0)
162 result = net::ERR_UNEXPECTED;
akalin 2013/10/15 00:56:07 ERR_UNEXPECTED is the error code equivalent to DCH
Nicolas Zea 2013/10/15 20:07:38 Done.
163
164 DCHECK_NE(result, net::ERR_IO_PENDING);
165
166 if (result < net::OK) {
167 DVLOG(1) << "Failed to refresh socket: " << result;
168 CloseStream(static_cast<net::Error>(result), callback);
169 return;
170 }
171
172 DCHECK_GT(result, 0);
173 last_error_ = net::OK;
174 read_buffer_->DidConsume(result);
175
176 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
177 << "Current position " << next_pos_
178 << " of " << read_buffer_->BytesConsumed() << ".";
179
180 if (!callback.is_null())
181 callback.Run();
182 }
183
184 void SocketInputStream::ResetInternal() {
185 read_buffer_->SetOffset(0);
186 next_pos_ = 0;
187 last_error_ = net::OK;
188 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
189 }
190
191 void SocketInputStream::CloseStream(net::Error error,
192 const base::Closure& callback) {
193 DCHECK_LT(error, net::ERR_IO_PENDING);
194 ResetInternal();
195 last_error_ = error;
196 LOG(ERROR) << "Closing stream with result " << error;
197 if (!callback.is_null())
198 callback.Run();
199 }
200
201 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
202 : socket_(socket),
203 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
204 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
205 kDefaultBufferSize)),
206 next_pos_(0),
207 last_error_(net::OK),
208 weak_ptr_factory_(this) {
209 DCHECK(socket->IsConnected());
210 }
211
212 SocketOutputStream::~SocketOutputStream() {
213 }
214
215 bool SocketOutputStream::Next(void** data, int* size) {
216 DCHECK_NE(GetState(), CLOSED);
217 DCHECK_NE(GetState(), FLUSHING);
218 if (next_pos_ == write_buffer_->size())
219 return false;
220
221 *data = write_buffer_->data() + next_pos_;
222 *size = write_buffer_->size() - next_pos_;
223 next_pos_ = write_buffer_->size();
224 return true;
225 }
226
227 void SocketOutputStream::BackUp(int count) {
228 DCHECK_GE(count, 0);
229 if (count > next_pos_)
230 next_pos_ = 0;
231 next_pos_ -= count;
232 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
233 << next_pos_ << " bytes used.";
234 }
235
236 int64 SocketOutputStream::ByteCount() const {
237 DCHECK_NE(GetState(), CLOSED);
238 DCHECK_NE(GetState(), FLUSHING);
239 return next_pos_;
240 }
241
242 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
243 DCHECK_EQ(GetState(), READY);
244
245 if (!socket_->IsConnected()) {
246 LOG(ERROR) << "Socket was disconnected, closing output stream";
247 last_error_ = net::ERR_CONNECTION_CLOSED;
248 return net::OK;
249 }
250
251 DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
252 int result = socket_->Write(
253 write_buffer_,
254 next_pos_,
255 base::Bind(&SocketOutputStream::FlushCompletionCallback,
256 weak_ptr_factory_.GetWeakPtr(),
257 callback));
258 DVLOG(1) << "Write returned " << result;
259 if (result == net::ERR_IO_PENDING) {
260 last_error_ = net::ERR_IO_PENDING;
261 return net::ERR_IO_PENDING;
262 }
263
264 FlushCompletionCallback(base::Closure(), result);
265 return net::OK;
266 }
267
268 SocketOutputStream::State SocketOutputStream::GetState() const{
269 if (last_error_ < net::ERR_IO_PENDING)
270 return CLOSED;
271
272 if (last_error_ == net::ERR_IO_PENDING)
273 return FLUSHING;
274
275 DCHECK_EQ(last_error_, net::OK);
276 if (next_pos_ == 0)
277 return EMPTY;
278
279 return READY;
280 }
281
282 net::Error SocketOutputStream::last_error() const {
283 return last_error_;
284 }
285
286 void SocketOutputStream::FlushCompletionCallback(
287 const base::Closure& callback, int result) {
288 // If an error occurred before the completion callback could complete, ignore
289 // the result.
290 if (GetState() == CLOSED)
291 return;
292
293 if (result < net::OK) {
akalin 2013/10/15 00:56:07 from chromium-dev discussion, sounds like a 0 resu
Nicolas Zea 2013/10/15 20:07:38 Done.
294 LOG(ERROR) << "Failed to flush socket.";
295 last_error_ = static_cast<net::Error>(result);
296 if (!callback.is_null())
297 callback.Run();
298 return;
299 }
300
301 DCHECK_GT(result, net::OK);
302 last_error_ = net::OK;
303
304 if (write_buffer_->BytesConsumed() + result < next_pos_) {
305 DVLOG(1) << "Partial flush complete. Retrying.";
306 // Only a partial write was completed. Flush again to finish the write.
307 write_buffer_->DidConsume(result);
308 Flush(callback);
309 return;
310 }
311
312 DVLOG(1) << "Socket flush complete.";
313 write_buffer_->SetOffset(0);
314 next_pos_ = 0;
315 if (!callback.is_null())
316 callback.Run();
317 }
318
319 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698