Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Side by Side Diff: google_apis/gcm/base/socket_stream.cc

Issue 23684017: [GCM] Initial work to set up directory structure and introduce socket integration (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/callback.h"
8 #include "base/message_loop/message_loop.h"
akalin 2013/10/11 16:47:22 if you remove all task posting, you can remove thi
Nicolas Zea 2013/10/11 18:28:32 Done.
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
11
12 namespace gcm {
13
14 namespace {
15
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
17 // expensive.
18 const uint32 kDefaultBufferSize = 8*1024;
19
20 } // namespace
21
22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
23 : socket_(socket),
24 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
25 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
26 kDefaultBufferSize)),
27 next_pos_(0),
28 last_error_(net::OK),
29 state_(EMPTY),
30 weak_ptr_factory_(this) {
31 DCHECK(socket->IsConnected());
32 }
33
34 SocketInputStream::~SocketInputStream() {
35 }
36
37 bool SocketInputStream::Next(const void** data, int* size) {
38 DCHECK_NE(state_, CLOSED);
39 DCHECK_NE(state_, READING);
40
41 if (state_ == EMPTY) {
42 DVLOG(1) << "No unread data remaining, ending read.";
43 return false;
44 }
45
46 DCHECK_EQ(state_, READY)
47 << " Input stream must have pending data before reading.";
48 DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
49 *data = io_buffer_->data() + next_pos_;
50 *size = read_buffer_->BytesConsumed() - next_pos_;
51 next_pos_ = read_buffer_->BytesConsumed();
52 state_ = EMPTY;
53 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
54 return true;
55 }
56
57 void SocketInputStream::BackUp(int count) {
58 DCHECK(state_ == READY || state_ == EMPTY);
59 DCHECK_GT(count, 0);
60 DCHECK_LE(count, next_pos_);
61
62 next_pos_ -= count;
63 state_ = READY;
64 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
65 << "Current position now at " << next_pos_
66 << " of " << read_buffer_->BytesConsumed();
67 }
68
69 bool SocketInputStream::Skip(int count) {
70 NOTIMPLEMENTED();
71 return false;
72 }
73
74 int64 SocketInputStream::ByteCount() const {
75 DCHECK_NE(state_, CLOSED);
76 DCHECK_NE(state_, READING);
77 return read_buffer_->BytesConsumed() - next_pos_;
78 }
79
80 net::Error SocketInputStream::Refresh(const base::Closure& callback,
81 int byte_limit) {
82 DCHECK_NE(state_, CLOSED);
83 DCHECK_NE(state_, READING);
84 DCHECK_GT(byte_limit, 0);
85
86 if (byte_limit > read_buffer_->BytesRemaining()) {
87 NOTREACHED() << "Out of buffer space, closing input stream.";
88 CloseStream(net::ERR_UNEXPECTED, base::Closure());
89 return net::OK;
90 }
91
92 if (!socket_->IsConnected()) {
93 LOG(ERROR) << "Socket was disconnected, closing input stream";
94 CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
95 return net::OK;
96 }
97
98 state_ = READING;
99
100 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
101 int result = socket_->Read(
102 read_buffer_,
103 byte_limit,
104 base::Bind(&SocketInputStream::RefreshCompletionCallback,
105 weak_ptr_factory_.GetWeakPtr(),
106 callback));
107 DVLOG(1) << "Read returned " << result;
108 if (result != net::ERR_IO_PENDING) {
109 RefreshCompletionCallback(base::Closure(), result);
110 return net::OK;
111 }
112 return net::ERR_IO_PENDING;
113 }
114
115 void SocketInputStream::RebuildBuffer() {
116 DVLOG(1) << "Rebuilding input stream, consumed "
117 << next_pos_ << " bytes.";
118 DCHECK_NE(state_, READING);
119 DCHECK_NE(state_, CLOSED);
120
121 int unread_data_size = 0;
122 const void* unread_data_ptr = NULL;
123 Next(&unread_data_ptr, &unread_data_size);
124 ResetInternal();
125
126 if (unread_data_ptr != io_buffer_->data()) {
127 DVLOG(1) << "Have " << unread_data_size
128 << " unread bytes remaining, shifting.";
129 // Move any remaining unread data to the start of the buffer;
130 std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
131 } else {
132 DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
133 }
134 read_buffer_->DidConsume(unread_data_size);
135 if (unread_data_size > 0)
136 state_ = READY;
137 }
138
139 net::Error SocketInputStream::last_error() const {
140 return last_error_;
141 }
142
143 SocketInputStream::State SocketInputStream::state() const {
144 return state_;
145 }
146
147 void SocketInputStream::RefreshCompletionCallback(
148 const base::Closure& callback, int result) {
149 DCHECK_EQ(state_, READING);
akalin 2013/10/11 18:47:09 is this DCHECK still valid? you immediately check
Nicolas Zea 2013/10/11 23:41:29 Done.
150 if (state_ == CLOSED) {
151 // An error occured before the completion callback could complete. Ignore
152 // the result.
153 return;
154 }
155
156 // Result == 0 implies EOF, which is treated as an error.
157 if (result < net::OK || result == 0) {
158 DVLOG(1) << "Failed to refresh socket: " << result;
159 CloseStream(static_cast<net::Error>(result), callback);
akalin 2013/10/11 16:47:22 if result == 0, then you'll call this with 'net::O
Nicolas Zea 2013/10/11 18:28:32 Done.
160 return;
161 }
162 DCHECK_GT(result, 0);
163
164 state_ = READY;
165 read_buffer_->DidConsume(result);
166 if (next_pos_ == read_buffer_->BytesConsumed())
167 state_ = EMPTY;
168
169 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
170 << "Current position " << next_pos_
171 << " of " << read_buffer_->BytesConsumed() << ".";
172
173 if (!callback.is_null())
174 callback.Run();
175 }
176
177 void SocketInputStream::ResetInternal() {
178 read_buffer_->SetOffset(0);
179 next_pos_ = 0;
180 last_error_ = net::OK;
181 state_ = EMPTY;
182 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
183 }
184
185 void SocketInputStream::CloseStream(net::Error error,
186 const base::Closure& callback) {
187 ResetInternal();
188 state_ = CLOSED;
189 last_error_ = error;
190 LOG(ERROR) << "Closing stream with result " << error;
191 if (!callback.is_null())
192 callback.Run();
193 }
194
195 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
196 : socket_(socket),
197 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
198 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
199 kDefaultBufferSize)),
200 buffer_used_(0),
201 state_(EMPTY),
202 weak_ptr_factory_(this) {
203 DCHECK(socket->IsConnected());
204 }
205
206 SocketOutputStream::~SocketOutputStream() {
207 }
208
209 bool SocketOutputStream::Next(void** data, int* size) {
210 DCHECK_NE(state_, CLOSED);
211 DCHECK_NE(state_, FLUSHING);
212 if (buffer_used_ == write_buffer_->size())
213 return false;
214
215 *data = write_buffer_->data() + buffer_used_;
216 *size = write_buffer_->size() - buffer_used_;
217 buffer_used_ = write_buffer_->size();
218 state_ = READY;
219 return true;
220 }
221
222 void SocketOutputStream::BackUp(int count) {
223 DCHECK_GE(count, 0);
224 if (count > buffer_used_)
225 buffer_used_ = 0;
226 buffer_used_ -= count;
227 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
228 << buffer_used_ << " bytes used.";
229 }
230
231 int64 SocketOutputStream::ByteCount() const {
232 DCHECK_NE(state_, CLOSED);
233 DCHECK_NE(state_, FLUSHING);
234 return buffer_used_;
235 }
236
237 void SocketOutputStream::Flush(const base::Closure& callback) {
238 DCHECK_EQ(state_, READY);
239 state_ = FLUSHING;
240
241 if (!socket_->IsConnected()) {
242 LOG(ERROR) << "Socket was disconnected, closing output stream";
243 last_error_ = net::ERR_CONNECTION_CLOSED;
244 state_ = CLOSED;
245 if (!callback.is_null())
246 callback.Run();
247 return;
248 }
249
250 DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket.";
251 int result = socket_->Write(
252 write_buffer_,
253 buffer_used_,
254 base::Bind(&SocketOutputStream::FlushCompletionCallback,
255 weak_ptr_factory_.GetWeakPtr(),
256 callback));
257 DVLOG(1) << "Write returned " << result;
258 if (result != net::ERR_IO_PENDING) {
259 base::MessageLoop::current()->PostTask(
akalin 2013/10/11 16:47:22 avoid task-posting analogous to SocketInputStream?
Nicolas Zea 2013/10/11 18:28:32 Done.
260 FROM_HERE,
261 base::Bind(&SocketOutputStream::FlushCompletionCallback,
262 weak_ptr_factory_.GetWeakPtr(),
263 callback,
264 result));
265 }
266 }
267
268 SocketOutputStream::State SocketOutputStream::state() const{
269 return state_;
270 }
271
272 net::Error SocketOutputStream::last_error() const {
273 return last_error_;
274 }
275
276 void SocketOutputStream::FlushCompletionCallback(
277 const base::Closure& callback, int result) {
278 DCHECK_EQ(state_, FLUSHING);
279 if (result < net::OK) {
280 LOG(ERROR) << "Failed to flush socket.";
281 last_error_ = static_cast<net::Error>(result);
282 state_ = CLOSED;
283 if (!callback.is_null())
284 callback.Run();
285 return;
286 }
287
288 state_ = READY;
289 if (write_buffer_->BytesConsumed() + result < buffer_used_) {
290 DVLOG(1) << "Partial flush complete. Retrying.";
291 // Only a partial write was completed. Flush again to finish the write.
292 write_buffer_->DidConsume(result);
293 Flush(callback);
294 return;
295 }
296
297 DVLOG(1) << "Socket flush complete.";
298 write_buffer_->SetOffset(0);
299 state_ = EMPTY;
300 buffer_used_ = 0;
301 if (!callback.is_null())
302 callback.Run();
303 }
304
305 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698