Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(424)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Use SetOffset instead of recreating DrainableIOBuffer Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "base/message_loop/message_loop.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
11
12 namespace gcm {
13
14 namespace {
15
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
17 // expensive.
18 const uint32 kDefaultBufferSize = 8*1024;
19
20 } // namespace
21
22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
23 : socket_(socket),
24 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
25 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
26 kDefaultBufferSize)),
27 next_pos_(0),
28 last_error_(net::OK),
29 state_(EMPTY),
30 weak_ptr_factory_(this) {
31 DCHECK(socket->IsConnected());
32 }
33
34 SocketInputStream::~SocketInputStream() {
35 }
36
37 bool SocketInputStream::Next(const void** data, int* size) {
38 DCHECK_NE(state_, CLOSED);
39 DCHECK_NE(state_, READING);
40
41 if (state_ == EMPTY) {
42 DVLOG(1) << "No unread data remaining, ending read.";
43 return false;
44 }
45
46 DCHECK_EQ(state_, READY)
47 << " Input stream must have pending data before reading.";
48 DCHECK_NE(read_buffer_->BytesConsumed(), next_pos_);
akalin 2013/10/10 08:47:40 don't you want something stronger, i.e. DCHECK_LT(
Nicolas Zea 2013/10/11 01:14:30 Done.
49 *data = io_buffer_->data() + next_pos_;
50 *size = read_buffer_->BytesConsumed() - next_pos_;
51 next_pos_ = read_buffer_->BytesConsumed();
52 state_ = EMPTY;
53 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
54 return true;
55 }
56
57 void SocketInputStream::BackUp(int count) {
58 DCHECK(state_ == READY || state_ == EMPTY);
59 DCHECK_GE(count, 0);
akalin 2013/10/10 08:47:40 perhaps change this to _GT? if not, then READY mig
Nicolas Zea 2013/10/11 01:14:30 Done.
60 DCHECK_LE(count, next_pos_);
61
62 next_pos_ -= count;
63 state_ = READY;
64 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
65 << "Current position now at " << next_pos_
66 << " of " << read_buffer_->BytesConsumed();
67 }
68
69 bool SocketInputStream::Skip(int count) {
70 NOTIMPLEMENTED();
71 return false;
72 }
73
74 int64 SocketInputStream::ByteCount() const {
75 DCHECK_NE(state_, CLOSED);
76 DCHECK_NE(state_, READING);
77 return read_buffer_->BytesConsumed() - next_pos_;
78 }
79
80 void SocketInputStream::Refresh(const base::Closure& callback,
81 int byte_limit) {
82 DCHECK_NE(state_, CLOSED);
83 DCHECK_NE(state_, READING);
84 DCHECK_GT(byte_limit, 0);
85
86 if (byte_limit > read_buffer_->BytesRemaining()) {
87 NOTREACHED();
akalin 2013/10/10 08:47:40 you want NOTREACHED() << ... since NOTREACHED() tu
Nicolas Zea 2013/10/11 01:14:30 Done.
88 LOG(ERROR) << "Out of buffer space, closing input stream.";
89 CloseStream(net::ERR_UNEXPECTED, callback);
90 return;
91 }
92
93 if (!socket_->IsConnected()) {
94 LOG(ERROR) << "Socket was disconnected, closing input stream";
95 CloseStream(net::ERR_CONNECTION_CLOSED, callback);
96 return;
97 }
98
99 state_ = READING;
100
101 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
102 int result = socket_->Read(
103 read_buffer_,
104 byte_limit,
105 base::Bind(&SocketInputStream::RefreshCompletionCallback,
106 weak_ptr_factory_.GetWeakPtr(),
107 callback));
108 DVLOG(1) << "Read returned " << result;
109 if (result != net::ERR_IO_PENDING) {
110 base::MessageLoop::current()->PostTask(
akalin 2013/10/10 08:47:40 forgot to mention this last time. This does simpli
Nicolas Zea 2013/10/11 01:14:30 Yeah, I guess I may as well keep it as consistent
111 FROM_HERE,
112 base::Bind(&SocketInputStream::RefreshCompletionCallback,
113 weak_ptr_factory_.GetWeakPtr(),
114 callback,
115 result));
116 }
117 }
118
119 void SocketInputStream::RebuildBuffer() {
120 DVLOG(1) << "Resetting input stream, consumed "
121 << next_pos_ << " bytes.";
122 DCHECK_NE(state_, READING);
123 DCHECK_NE(state_, CLOSED);
124
125 int last_read_pos = next_pos_;
126 char* unread_data_ptr = io_buffer_->data() + last_read_pos;
akalin 2013/10/10 08:47:40 use Next() instead of manually calculating?
Nicolas Zea 2013/10/11 01:14:30 Done.
127 int unread_buffer_size = read_buffer_->BytesConsumed() - last_read_pos;
128 ResetInternal();
129
130 if (unread_buffer_size > 0) {
131 read_buffer_->SetOffset(unread_buffer_size);
132 state_ = READY;
133
134 if (last_read_pos != 0) {
akalin 2013/10/10 08:47:40 you can do: if (unread_data_ptr != io_buffer_->da
Nicolas Zea 2013/10/11 01:14:30 Done.
135 DVLOG(1) << "Have " << unread_buffer_size
136 << " unread bytes remaining, shifting.";
137 // Move any remaining unread data to the start of the buffer;
138 std::memmove(io_buffer_->data(), unread_data_ptr, unread_buffer_size);
139 } else {
140 DVLOG(1) << "Have " << unread_buffer_size << " unread bytes remaining.";
141 }
142 }
143 }
144
145 net::Error SocketInputStream::last_error() const {
146 return last_error_;
147 }
148
149 SocketInputStream::State SocketInputStream::state() const {
150 return state_;
151 }
152
153 void SocketInputStream::RefreshCompletionCallback(
154 const base::Closure& callback, int result) {
155 DCHECK_EQ(state_, READING);
156 if (state_ == CLOSED) {
157 // An error occured before the completion callback could complete. Ignore
158 // the result.
159 return;
160 }
161
162 if (result < net::OK) {
163 DVLOG(1) << "Failed to refresh socket: " << result;
164 CloseStream(static_cast<net::Error>(result), callback);
165 return;
166 }
167 DCHECK_GT(result, 0);
akalin 2013/10/10 08:47:40 Isn't Read() allowed to return a result of 0? Doub
Nicolas Zea 2013/10/11 01:14:30 Done.
168
169 state_ = READY;
170 read_buffer_->DidConsume(result);
171 if (next_pos_ == read_buffer_->BytesConsumed())
172 state_ = EMPTY;
173
174 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
175 << "Current position " << next_pos_
176 << " of " << read_buffer_->BytesConsumed() << ".";
177
178 if (!callback.is_null())
179 callback.Run();
180 }
181
182 void SocketInputStream::ResetInternal() {
183 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
akalin 2013/10/10 08:47:40 follow declaration order to be parallel to the con
Nicolas Zea 2013/10/11 01:14:30 Done.
184 next_pos_ = 0;
185 state_ = EMPTY;
186 read_buffer_->SetOffset(0);
187
188 last_error_ = net::OK;
189 }
190
191 void SocketInputStream::CloseStream(net::Error error,
192 const base::Closure& callback) {
193 ResetInternal();
194 state_ = CLOSED;
195 last_error_ = error;
196 LOG(ERROR) << "Closing stream with result " << error;
197 if (!callback.is_null())
198 callback.Run();
199 }
200
201 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
202 : socket_(socket),
203 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
204 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
205 kDefaultBufferSize)),
206 buffer_used_(0),
207 state_(EMPTY),
208 weak_ptr_factory_(this) {
209 DCHECK(socket->IsConnected());
210 }
211
212 SocketOutputStream::~SocketOutputStream() {
213 }
214
215 bool SocketOutputStream::Next(void** data, int* size) {
216 DCHECK_NE(state_, CLOSED);
217 DCHECK_NE(state_, FLUSHING);
218 if (buffer_used_ == write_buffer_->size())
219 return false;
220
221 *data = write_buffer_->data() + buffer_used_;
222 *size = write_buffer_->size() - buffer_used_;
223 buffer_used_ = write_buffer_->size();
224 state_ = READY;
225 return true;
226 }
227
228 void SocketOutputStream::BackUp(int count) {
229 DCHECK_GE(count, 0);
230 if (count > buffer_used_)
231 buffer_used_ = 0;
232 buffer_used_ -= count;
233 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
234 << buffer_used_ << " bytes used.";
235 }
236
237 int64 SocketOutputStream::ByteCount() const {
238 DCHECK_NE(state_, CLOSED);
239 DCHECK_NE(state_, FLUSHING);
240 return buffer_used_;
241 }
242
243 void SocketOutputStream::Flush(const base::Closure& callback) {
244 DCHECK_EQ(state_, READY);
245 state_ = FLUSHING;
246
247 if (!socket_->IsConnected()) {
248 LOG(ERROR) << "Socket was disconnected, closing output stream";
249 last_error_ = net::ERR_CONNECTION_CLOSED;
250 state_ = CLOSED;
251 if (!callback.is_null())
252 callback.Run();
253 return;
254 }
255
256 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
257 int result = socket_->Write(
258 write_buffer_,
259 buffer_used_,
260 base::Bind(&SocketOutputStream::FlushCompletionCallback,
261 weak_ptr_factory_.GetWeakPtr(),
262 callback));
263 DVLOG(1) << "Write returned " << result;
264 if (result != net::ERR_IO_PENDING) {
265 base::MessageLoop::current()->PostTask(
266 FROM_HERE,
267 base::Bind(&SocketOutputStream::FlushCompletionCallback,
268 weak_ptr_factory_.GetWeakPtr(),
269 callback,
270 result));
271 }
272 }
273
274 SocketOutputStream::State SocketOutputStream::state() const{
275 return state_;
276 }
277
278 net::Error SocketOutputStream::last_error() const {
279 return last_error_;
280 }
281
282 void SocketOutputStream::FlushCompletionCallback(
283 const base::Closure& callback, int result) {
284 DCHECK_EQ(state_, FLUSHING);
285 if (result < net::OK) {
286 LOG(ERROR) << "Failed to flush socket.";
287 last_error_ = static_cast<net::Error>(result);
288 state_ = CLOSED;
289 if (!callback.is_null())
290 callback.Run();
291 return;
292 }
293
294 state_ = READY;
295 if (write_buffer_->BytesConsumed() + result < buffer_used_) {
296 DVLOG(1) << "Partial flush complete. Retrying.";
297 // Only a partial write was completed. Flush again to finish the write.
298 write_buffer_->DidConsume(result);
299 Flush(callback);
300 return;
301 }
302
303 DVLOG(1) << "Socket flush complete.";
304 write_buffer_->SetOffset(0);
305 state_ = EMPTY;
306 buffer_used_ = 0;
307 if (!callback.is_null())
308 callback.Run();
309 }
310
311 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698