Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(393)

Side by Side Diff: chrome/browser/extensions/api/cast_channel/cast_socket.cc

Issue 79673003: Refactor CastSocket code for the following: (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/extensions/api/cast_channel/cast_socket.h" 5 #include "chrome/browser/extensions/api/cast_channel/cast_socket.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback_helpers.h" 10 #include "base/callback_helpers.h"
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 67
68 CastSocket::CastSocket(const std::string& owner_extension_id, 68 CastSocket::CastSocket(const std::string& owner_extension_id,
69 const GURL& url, 69 const GURL& url,
70 CastSocket::Delegate* delegate, 70 CastSocket::Delegate* delegate,
71 net::NetLog* net_log) : 71 net::NetLog* net_log) :
72 ApiResource(owner_extension_id), 72 ApiResource(owner_extension_id),
73 channel_id_(0), 73 channel_id_(0),
74 url_(url), 74 url_(url),
75 delegate_(delegate), 75 delegate_(delegate),
76 auth_required_(false), 76 auth_required_(false),
77 error_state_(CHANNEL_ERROR_NONE),
78 ready_state_(READY_STATE_NONE),
79 write_callback_pending_(false),
80 read_callback_pending_(false),
81 current_message_size_(0), 77 current_message_size_(0),
82 net_log_(net_log), 78 net_log_(net_log),
83 next_state_(CONN_STATE_NONE), 79 connect_state_(CONN_STATE_NONE),
84 in_connect_loop_(false) { 80 write_state_(WRITE_STATE_NONE),
81 read_state_(READ_STATE_NONE),
82 error_state_(CHANNEL_ERROR_NONE),
83 ready_state_(READY_STATE_NONE) {
85 DCHECK(net_log_); 84 DCHECK(net_log_);
86 net_log_source_.type = net::NetLog::SOURCE_SOCKET; 85 net_log_source_.type = net::NetLog::SOURCE_SOCKET;
87 net_log_source_.id = net_log_->NextID(); 86 net_log_source_.id = net_log_->NextID();
88 87
89 // We reuse these buffers for each message. 88 // We reuse these buffers for each message.
90 header_read_buffer_ = new net::GrowableIOBuffer(); 89 header_read_buffer_ = new net::GrowableIOBuffer();
91 header_read_buffer_->SetCapacity(kMessageHeaderSize); 90 header_read_buffer_->SetCapacity(kMessageHeaderSize);
92 body_read_buffer_ = new net::GrowableIOBuffer(); 91 body_read_buffer_ = new net::GrowableIOBuffer();
93 body_read_buffer_->SetCapacity(kMaxMessageSize); 92 body_read_buffer_->SetCapacity(kMaxMessageSize);
94 current_read_buffer_ = header_read_buffer_; 93 current_read_buffer_ = header_read_buffer_;
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
144 net::SSLInfo ssl_info; 143 net::SSLInfo ssl_info;
145 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) 144 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get())
146 return false; 145 return false;
147 bool result = net::X509Certificate::GetDEREncoded( 146 bool result = net::X509Certificate::GetDEREncoded(
148 ssl_info.cert->os_cert_handle(), cert); 147 ssl_info.cert->os_cert_handle(), cert);
149 if (result) 148 if (result)
150 VLOG(1) << "Successfully extracted peer certificate: " << *cert; 149 VLOG(1) << "Successfully extracted peer certificate: " << *cert;
151 return result; 150 return result;
152 } 151 }
153 152
154 int CastSocket::SendAuthChallenge() { 153 bool CastSocket::VerifyChallengeReply() {
155 CastMessage challenge_message; 154 return AuthenticateChallengeReply(*challenge_reply_.get(), peer_cert_);
156 CreateAuthChallengeMessage(&challenge_message);
157 VLOG(1) << "Sending challenge: " << CastMessageToString(challenge_message);
158 int result = SendMessageInternal(
159 challenge_message,
160 base::Bind(&CastSocket::OnChallengeEvent, AsWeakPtr()));
161 return (result < 0) ? result : net::OK;
162 }
163
164 int CastSocket::ReadAuthChallengeReply() {
165 int result = ReadData();
166 return (result < 0) ? result : net::OK;
167 }
168
169 void CastSocket::OnConnectComplete(int result) {
170 int rv = DoConnectLoop(result);
171 if (rv != net::ERR_IO_PENDING)
172 DoConnectCallback(rv);
173 }
174
175 void CastSocket::OnChallengeEvent(int result) {
176 // result >= 0 means read or write succeeded synchronously.
177 int rv = DoConnectLoop(result >= 0 ? net::OK : result);
178 if (rv != net::ERR_IO_PENDING)
179 DoConnectCallback(rv);
180 } 155 }
181 156
182 void CastSocket::Connect(const net::CompletionCallback& callback) { 157 void CastSocket::Connect(const net::CompletionCallback& callback) {
183 DCHECK(CalledOnValidThread()); 158 DCHECK(CalledOnValidThread());
184 int result = net::ERR_CONNECTION_FAILED; 159 connect_callback_ = callback;
160 // Post a task so that we can safely run the callback
Wez 2013/11/26 02:45:56 You mean so that if the Connect() completes synchr
Munjal (Google) 2013/11/26 20:06:14 Actually I removed this PostTask. Upon thinking ab
Wez 2013/11/27 04:15:19 That makes the code super-fragile to code changes
Munjal (Google) 2013/12/02 19:22:09 Several things to keep in mind: - DoConnectLoop do
161 base::MessageLoop::current()->PostTask(
162 FROM_HERE,
163 base::Bind(&CastSocket::ConnectInternal, AsWeakPtr()));
164 }
165
166 void CastSocket::ConnectInternal() {
167 DCHECK(CalledOnValidThread());
185 VLOG(1) << "Connect readyState = " << ready_state_; 168 VLOG(1) << "Connect readyState = " << ready_state_;
186 if (ready_state_ != READY_STATE_NONE) { 169 if (ready_state_ != READY_STATE_NONE) {
Wez 2013/11/26 02:45:56 Under what circumstances could |ready_state_| not
Munjal (Google) 2013/11/26 20:06:14 When the caller calls Connect twice. In that case
Wez 2013/11/27 04:15:19 You mean that JS calls a connect method twice? Or
Munjal (Google) 2013/12/02 19:22:09 Yes, if Js calls connect method twice. Of course D
187 callback.Run(result); 170 DoConnectCallback(net::ERR_CONNECTION_FAILED);
188 return; 171 return;
189 } 172 }
190 if (!ParseChannelUrl(url_)) { 173 if (!ParseChannelUrl(url_)) {
191 CloseWithError(cast_channel::CHANNEL_ERROR_CONNECT_ERROR); 174 CloseWithError(cast_channel::CHANNEL_ERROR_CONNECT_ERROR);
192 callback.Run(result); 175 DoConnectCallback(net::ERR_CONNECTION_FAILED);
193 return; 176 return;
194 } 177 }
195 connect_callback_ = callback; 178 connect_state_ = CONN_STATE_TCP_CONNECT;
196 next_state_ = CONN_STATE_TCP_CONNECT; 179 DoConnectLoop(net::OK);
197 int rv = DoConnectLoop(net::OK);
198 if (rv != net::ERR_IO_PENDING)
199 DoConnectCallback(rv);
200 } 180 }
201 181
202 // This method performs the state machine transitions for connection flow. 182 // This method performs the state machine transitions for connection flow.
203 // There are two entry points to this method: 183 // There are two entry points to this method:
204 // 1. public Connect method: this starts the flow 184 // 1. ConnectInternal method: this starts the flow
205 // 2. OnConnectComplete: callback method called when an async operation 185 // 2. Callback from network operations that finish asynchronously
206 // is done. OnConnectComplete calls this method to continue the state 186 void CastSocket::DoConnectLoop(int result) {
207 // machine transitions.
208 int CastSocket::DoConnectLoop(int result) {
209 // Avoid re-entrancy as a result of synchronous completion.
210 if (in_connect_loop_)
211 return net::ERR_IO_PENDING;
212 in_connect_loop_ = true;
213
214 // Network operations can either finish synchronously or asynchronously. 187 // Network operations can either finish synchronously or asynchronously.
215 // This method executes the state machine transitions in a loop so that 188 // This method executes the state machine transitions in a loop so that
216 // correct state transitions happen even when network operations finish 189 // correct state transitions happen even when network operations finish
217 // synchronously. 190 // synchronously.
218 int rv = result; 191 int rv = result;
219 do { 192 do {
220 ConnectionState state = next_state_; 193 ConnectionState state = connect_state_;
221 // All the Do* methods do not set next_state_ in case of an 194 // All the Do* methods do not set connect_state_ in case of an
222 // error. So set next_state_ to NONE to figure out if the Do* 195 // error. So set connect_state_ to NONE to figure out if the Do*
223 // method changed state or not. 196 // method changed state or not.
224 next_state_ = CONN_STATE_NONE; 197 connect_state_ = CONN_STATE_NONE;
225 switch (state) { 198 switch (state) {
226 case CONN_STATE_TCP_CONNECT: 199 case CONN_STATE_TCP_CONNECT:
227 rv = DoTcpConnect(); 200 rv = DoTcpConnect();
228 break; 201 break;
229 case CONN_STATE_TCP_CONNECT_COMPLETE: 202 case CONN_STATE_TCP_CONNECT_COMPLETE:
230 rv = DoTcpConnectComplete(rv); 203 rv = DoTcpConnectComplete(rv);
231 break; 204 break;
232 case CONN_STATE_SSL_CONNECT: 205 case CONN_STATE_SSL_CONNECT:
233 DCHECK_EQ(net::OK, rv); 206 DCHECK_EQ(net::OK, rv);
234 rv = DoSslConnect(); 207 rv = DoSslConnect();
235 break; 208 break;
236 case CONN_STATE_SSL_CONNECT_COMPLETE: 209 case CONN_STATE_SSL_CONNECT_COMPLETE:
237 rv = DoSslConnectComplete(rv); 210 rv = DoSslConnectComplete(rv);
238 break; 211 break;
239 case CONN_STATE_AUTH_CHALLENGE_SEND: 212 case CONN_STATE_AUTH_CHALLENGE_SEND:
240 rv = DoAuthChallengeSend(); 213 rv = DoAuthChallengeSend();
241 break; 214 break;
242 case CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE: 215 case CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE:
243 rv = DoAuthChallengeSendComplete(rv); 216 rv = DoAuthChallengeSendComplete(rv);
244 break; 217 break;
245 case CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE: 218 case CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE:
246 rv = DoAuthChallengeReplyComplete(rv); 219 rv = DoAuthChallengeReplyComplete(rv);
247 break; 220 break;
248
249 default: 221 default:
250 NOTREACHED() << "BUG in CastSocket state machine code"; 222 NOTREACHED() << "BUG in CastSocket connection state machine code";
251 break; 223 break;
252 } 224 }
253 } while (rv != net::ERR_IO_PENDING && next_state_ != CONN_STATE_NONE); 225 } while (rv != net::ERR_IO_PENDING && connect_state_ != CONN_STATE_NONE);
254 // Get out of the loop either when: 226 // Get out of the loop either when:
255 // a. A network operation is pending, OR 227 // a. A network operation is pending, OR
256 // b. The Do* method called did not change state 228 // b. The Do* method called did not change state
257 229
258 in_connect_loop_ = false; 230 // If there is no pending IO and if we still got out of the loop then
259 231 // either we are done successfully or there was an error; invoke the
260 return rv; 232 // callback in either case.
233 if (rv != net::ERR_IO_PENDING)
234 DoConnectCallback(rv);
261 } 235 }
262 236
263 int CastSocket::DoTcpConnect() { 237 int CastSocket::DoTcpConnect() {
264 VLOG(1) << "DoTcpConnect"; 238 VLOG(1) << "DoTcpConnect";
265 next_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; 239 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE;
266 tcp_socket_ = CreateTcpSocket(); 240 tcp_socket_ = CreateTcpSocket();
267 return tcp_socket_->Connect( 241 return tcp_socket_->Connect(
268 base::Bind(&CastSocket::OnConnectComplete, AsWeakPtr())); 242 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr()));
269 } 243 }
270 244
271 int CastSocket::DoTcpConnectComplete(int result) { 245 int CastSocket::DoTcpConnectComplete(int result) {
272 VLOG(1) << "DoTcpConnectComplete: " << result; 246 VLOG(1) << "DoTcpConnectComplete: " << result;
273 if (result == net::OK) { 247 if (result == net::OK) {
274 // Enable TCP protocol-level keep-alive. 248 // Enable TCP protocol-level keep-alive.
275 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); 249 bool success = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs);
276 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; 250 LOG_IF(WARNING, !success) << "Failed to SetKeepAlive.";
277 next_state_ = CONN_STATE_SSL_CONNECT; 251 connect_state_ = CONN_STATE_SSL_CONNECT;
278 } 252 }
279 return result; 253 return result;
280 } 254 }
281 255
282 int CastSocket::DoSslConnect() { 256 int CastSocket::DoSslConnect() {
283 VLOG(1) << "DoSslConnect"; 257 VLOG(1) << "DoSslConnect";
284 next_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; 258 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE;
285 socket_ = CreateSslSocket(); 259 socket_ = CreateSslSocket();
286 return socket_->Connect( 260 return socket_->Connect(
287 base::Bind(&CastSocket::OnConnectComplete, AsWeakPtr())); 261 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr()));
288 } 262 }
289 263
290 int CastSocket::DoSslConnectComplete(int result) { 264 int CastSocket::DoSslConnectComplete(int result) {
291 VLOG(1) << "DoSslConnectComplete: " << result; 265 VLOG(1) << "DoSslConnectComplete: " << result;
292 if (result == net::ERR_CERT_AUTHORITY_INVALID && 266 if (result == net::ERR_CERT_AUTHORITY_INVALID &&
293 peer_cert_.empty() && 267 peer_cert_.empty() &&
294 ExtractPeerCert(&peer_cert_)) { 268 ExtractPeerCert(&peer_cert_)) {
295 next_state_ = CONN_STATE_TCP_CONNECT; 269 connect_state_ = CONN_STATE_TCP_CONNECT;
296 } else if (result == net::OK && auth_required_) { 270 } else if (result == net::OK && auth_required_) {
297 next_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; 271 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND;
298 } 272 }
299 return result; 273 return result;
300 } 274 }
301 275
302 int CastSocket::DoAuthChallengeSend() { 276 int CastSocket::DoAuthChallengeSend() {
303 VLOG(1) << "DoAuthChallengeSend"; 277 VLOG(1) << "DoAuthChallengeSend";
304 next_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; 278 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE;
305 return SendAuthChallenge(); 279 CastMessage challenge_message;
280 CreateAuthChallengeMessage(&challenge_message);
281 VLOG(1) << "Sending challenge: " << CastMessageToString(challenge_message);
282 // Post a task to send a challenge message to avoid re-entrancy
283 PostTaskToSendCastMessage(
284 challenge_message,
285 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr()));
286 // Always return IO_PENDING since we always get the result asynchronously
287 return net::ERR_IO_PENDING;
306 } 288 }
307 289
308 int CastSocket::DoAuthChallengeSendComplete(int result) { 290 int CastSocket::DoAuthChallengeSendComplete(int result) {
309 VLOG(1) << "DoAuthChallengeSendComplete: " << result; 291 VLOG(1) << "DoAuthChallengeSendComplete: " << result;
310 if (result != net::OK) 292 if (result < 0)
Wez 2013/11/26 02:45:56 Didn't we already fix these?
Munjal (Google) 2013/11/26 20:06:14 Yes, we did but I refactored that method a bit and
311 return result; 293 return result;
312 next_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; 294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE;
313 return ReadAuthChallengeReply(); 295 PostTaskToStartReadLoop();
296 // Always return IO_PENDING since we always get the result asynchronously
297 return net::ERR_IO_PENDING;
314 } 298 }
315 299
316 int CastSocket::DoAuthChallengeReplyComplete(int result) { 300 int CastSocket::DoAuthChallengeReplyComplete(int result) {
317 VLOG(1) << "DoAuthChallengeReplyComplete: " << result; 301 VLOG(1) << "DoAuthChallengeReplyComplete: " << result;
318 if (result != net::OK) 302 if (result < 0)
319 return result; 303 return result;
320 if (!VerifyChallengeReply()) 304 if (!VerifyChallengeReply())
321 return net::ERR_FAILED; 305 return net::ERR_FAILED;
322 VLOG(1) << "Auth challenge verification succeeded"; 306 VLOG(1) << "Auth challenge verification succeeded";
323 return net::OK; 307 return net::OK;
324 } 308 }
325 309
326 bool CastSocket::VerifyChallengeReply() {
327 return AuthenticateChallengeReply(*challenge_reply_.get(), peer_cert_);
328 }
329
330 void CastSocket::DoConnectCallback(int result) { 310 void CastSocket::DoConnectCallback(int result) {
331 ready_state_ = (result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED; 311 ready_state_ = (result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED;
332 error_state_ = (result == net::OK) ? 312 error_state_ = (result == net::OK) ?
333 CHANNEL_ERROR_NONE : CHANNEL_ERROR_CONNECT_ERROR; 313 CHANNEL_ERROR_NONE : CHANNEL_ERROR_CONNECT_ERROR;
314 // Start the ReadData loop if not already started.
315 if (result == net::OK)
316 PostTaskToStartReadLoop();
334 base::ResetAndReturn(&connect_callback_).Run(result); 317 base::ResetAndReturn(&connect_callback_).Run(result);
335 // Start the ReadData loop if not already started.
336 // If auth_required_ is true we would've started a ReadData loop already.
337 // TODO(munjal): This is a bit ugly. Refactor read and write code.
338 if (result == net::OK && !auth_required_)
339 ReadData();
340 } 318 }
341 319
342 void CastSocket::Close(const net::CompletionCallback& callback) { 320 void CastSocket::Close(const net::CompletionCallback& callback) {
343 DCHECK(CalledOnValidThread()); 321 DCHECK(CalledOnValidThread());
344 VLOG(1) << "Close ReadyState = " << ready_state_; 322 VLOG(1) << "Close ReadyState = " << ready_state_;
345 tcp_socket_.reset(NULL); 323 tcp_socket_.reset(NULL);
346 socket_.reset(NULL); 324 socket_.reset(NULL);
347 cert_verifier_.reset(NULL); 325 cert_verifier_.reset(NULL);
348 transport_security_state_.reset(NULL); 326 transport_security_state_.reset(NULL);
349 ready_state_ = READY_STATE_CLOSED; 327 ready_state_ = READY_STATE_CLOSED;
350 callback.Run(net::OK); 328 callback.Run(net::OK);
351 } 329 }
352 330
353 void CastSocket::SendMessage(const MessageInfo& message, 331 void CastSocket::SendMessage(const MessageInfo& message,
354 const net::CompletionCallback& callback) { 332 const net::CompletionCallback& callback) {
355 DCHECK(CalledOnValidThread()); 333 DCHECK(CalledOnValidThread());
356 VLOG(1) << "Send ReadyState " << ready_state_;
357 int result = net::ERR_FAILED;
358 if (ready_state_ != READY_STATE_OPEN) { 334 if (ready_state_ != READY_STATE_OPEN) {
359 callback.Run(result); 335 callback.Run(net::ERR_FAILED);
360 return; 336 return;
361 } 337 }
362 CastMessage message_proto; 338 CastMessage message_proto;
363 if (!MessageInfoToCastMessage(message, &message_proto)) { 339 if (!MessageInfoToCastMessage(message, &message_proto)) {
364 CloseWithError(cast_channel::CHANNEL_ERROR_INVALID_MESSAGE); 340 callback.Run(net::ERR_FAILED);
365 // TODO(mfoltz): Do a better job of signaling cast_channel errors to the
366 // caller.
367 callback.Run(net::OK);
368 return; 341 return;
369 } 342 }
370 SendMessageInternal(message_proto, callback); 343
344 SendCastMessageInternal(message_proto, callback);
371 } 345 }
372 346
373 int CastSocket::SendMessageInternal(const CastMessage& message_proto, 347 void CastSocket::PostTaskToSendCastMessage(
374 const net::CompletionCallback& callback) { 348 const CastMessage& message, const net::CompletionCallback& callback) {
375 WriteRequest write_request(callback); 349 base::MessageLoop::current()->PostTask(
376 if (!write_request.SetContent(message_proto)) 350 FROM_HERE,
377 return net::ERR_FAILED; 351 base::Bind(&CastSocket::SendCastMessageInternal, AsWeakPtr(),
378 write_queue_.push(write_request); 352 message, callback));
Wez 2013/11/26 02:45:56 This means you're posting a task on every send; wh
Munjal (Google) 2013/11/26 20:06:14 We don't post a task for every SendMessage. We onl
Wez 2013/11/27 04:15:19 In that case can't SendAuthChallenge do the post d
Munjal (Google) 2013/12/02 19:22:09 Done. I thought it might be handy to have this hel
379 return WriteData();
380 } 353 }
381 354
382 int CastSocket::WriteData() { 355 void CastSocket::SendCastMessageInternal(
356 const CastMessage& message,
357 const net::CompletionCallback& callback) {
358 WriteRequest write_request(callback);
359 if (!write_request.SetContent(message)) {
360 callback.Run(net::ERR_FAILED);
361 return;
362 }
363
364 write_queue_.push(write_request);
365 if (write_state_ == WRITE_STATE_NONE) {
366 write_state_ = WRITE_STATE_WRITE;
367 DoWriteLoop(net::OK);
368 }
369 }
370
371 void CastSocket::ClearWriteQueue() {
372 while (!write_queue_.empty())
373 write_queue_.pop();
374 }
375
376 void CastSocket::DoWriteLoop(int result) {
383 DCHECK(CalledOnValidThread()); 377 DCHECK(CalledOnValidThread());
384 VLOG(1) << "WriteData q = " << write_queue_.size(); 378 VLOG(1) << "WriteData q = " << write_queue_.size();
385 if (write_queue_.empty() || write_callback_pending_)
386 return net::ERR_FAILED;
387 379
380 if (write_queue_.empty())
381 return;
382
383 int rv = result;
384 do {
385 WriteState state = write_state_;
386 write_state_ = WRITE_STATE_NONE;
387 switch (state) {
388 case WRITE_STATE_WRITE:
389 rv = DoWrite();
390 break;
391 case WRITE_STATE_WRITE_COMPLETE:
392 rv = DoWriteComplete(rv);
393 break;
394 case WRITE_STATE_ERROR:
395 rv = DoWriteError(rv);
396 break;
397 default:
398 NOTREACHED() << "BUG in CastSocket write state machine code";
399 break;
400 }
401 } while (!write_queue_.empty() &&
402 rv != net::ERR_IO_PENDING &&
403 write_state_ != WRITE_STATE_NONE);
404 }
405
406 int CastSocket::DoWrite() {
407 DCHECK(!write_queue_.empty());
388 WriteRequest& request = write_queue_.front(); 408 WriteRequest& request = write_queue_.front();
389 409
390 VLOG(1) << "WriteData byte_count = " << request.io_buffer->size() 410 VLOG(1) << "WriteData byte_count = " << request.io_buffer->size()
391 << " bytes_written " << request.io_buffer->BytesConsumed(); 411 << " bytes_written " << request.io_buffer->BytesConsumed();
392 412
393 write_callback_pending_ = true; 413 write_state_ = WRITE_STATE_WRITE_COMPLETE;
Wez 2013/11/26 02:45:56 Surely at this point the write is pending?
Munjal (Google) 2013/11/26 20:06:14 Yes. But the caller - DoWriteLoop - will get an IO
Wez 2013/11/27 04:15:19 Right - so the write has _not_ completed - we're i
Munjal (Google) 2013/12/02 19:22:09 I think the confusion is in the name - this is the
394 int result = socket_->Write( 414
415 return socket_->Write(
395 request.io_buffer.get(), 416 request.io_buffer.get(),
396 request.io_buffer->BytesRemaining(), 417 request.io_buffer->BytesRemaining(),
397 base::Bind(&CastSocket::OnWriteData, AsWeakPtr())); 418 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr()));
398
399 if (result != net::ERR_IO_PENDING)
400 OnWriteData(result);
401
402 return result;
403 } 419 }
404 420
405 void CastSocket::OnWriteData(int result) { 421 int CastSocket::DoWriteComplete(int result) {
406 DCHECK(CalledOnValidThread());
407 VLOG(1) << "OnWriteComplete result = " << result;
408 DCHECK(write_callback_pending_);
409 DCHECK(!write_queue_.empty()); 422 DCHECK(!write_queue_.empty());
410 write_callback_pending_ = false;
411 WriteRequest& request = write_queue_.front(); 423 WriteRequest& request = write_queue_.front();
412 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
413 424
414 if (result >= 0) { 425 if (result < 0) {
415 io_buffer->DidConsume(result); 426 // ERROR: do the write completion callback and get into error state
416 if (io_buffer->BytesRemaining() > 0) { 427 // to close the session
417 VLOG(1) << "OnWriteComplete size = " << io_buffer->size() 428 request.callback.Run(result);
Wez 2013/11/26 02:45:56 If the write was triggered by connect-time activit
Munjal (Google) 2013/11/26 20:06:14 If the Write was triggered by connect-time activit
Wez 2013/11/27 04:15:19 The case I'm asking about is DoWriteLoop -> DoWrit
Munjal (Google) 2013/12/02 19:22:09 You are right. Fixed now. We should not access any
418 << " consumed " << io_buffer->BytesConsumed() 429 write_queue_.pop();
419 << " remaining " << io_buffer->BytesRemaining() 430 write_state_ = WRITE_STATE_ERROR;
420 << " # requests " << write_queue_.size(); 431 return result;
421 WriteData();
422 return;
423 }
424 DCHECK_EQ(io_buffer->BytesConsumed(), io_buffer->size());
425 DCHECK_EQ(io_buffer->BytesRemaining(), 0);
426 result = io_buffer->BytesConsumed();
427 } 432 }
428 433
429 request.callback.Run(result); 434 if (result > 0) {
430 write_queue_.pop(); 435 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
431 436 // Some bytes were successfully written
432 VLOG(1) << "OnWriteComplete size = " << io_buffer->size() 437 io_buffer->DidConsume(result);
433 << " consumed " << io_buffer->BytesConsumed() 438 if (io_buffer->BytesRemaining() == 0) {
434 << " remaining " << io_buffer->BytesRemaining() 439 // A message is sent fully, do the write completion callback
435 << " # requests " << write_queue_.size(); 440 request.callback.Run(io_buffer->BytesConsumed());
436 441 write_queue_.pop();
437 if (result < 0) { 442 }
438 CloseWithError(CHANNEL_ERROR_SOCKET_ERROR);
439 return;
440 } 443 }
441 444
442 if (!write_queue_.empty()) 445 write_state_ = WRITE_STATE_WRITE;
Wez 2013/11/26 02:45:56 Under what circumstances do we ever return to WRIT
Munjal (Google) 2013/11/26 20:06:14 We never return to WRITE_STATE_NONE. We only start
443 WriteData(); 446 return net::OK;
444 } 447 }
445 448
446 int CastSocket::ReadData() { 449 int CastSocket::DoWriteError(int result) {
450 DCHECK(!write_queue_.empty());
451 DCHECK_LT(result, 0);
452 // TODO(munjal): Consider reporting error to all pending writes
453 ClearWriteQueue();
454 CloseWithError(CHANNEL_ERROR_SOCKET_ERROR);
455 return net::OK;
456 }
457
458 void CastSocket::PostTaskToStartReadLoop() {
447 DCHECK(CalledOnValidThread()); 459 DCHECK(CalledOnValidThread());
448 if (!socket_.get()) 460 // Post a task to avoid re-entrancy into DoConnectLoop
449 return net::ERR_FAILED; 461 base::MessageLoop::current()->PostTask(
450 DCHECK(!read_callback_pending_); 462 FROM_HERE,
451 read_callback_pending_ = true; 463 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr()));
Wez 2013/11/26 02:45:56 As for read, why do this? Why not start the read d
Munjal (Google) 2013/11/26 20:06:14 This felt cleaner and more robust since otherwise
Wez 2013/11/27 04:15:19 The amount of logic should be the same; what I'm s
Munjal (Google) 2013/12/02 19:22:09 If it finishes synchronously then it will invoke t
464 }
465
466 void CastSocket::StartReadLoop() {
467 // If we are in READ_STATE_NONE then get into appropriate
468 // starting state and start the read loop
469 if (read_state_ == READ_STATE_NONE) {
470 read_state_ = READ_STATE_READ;
471 DoReadLoop(net::OK);
472 }
473 }
474
475 void CastSocket::DoReadLoop(int result) {
476 DCHECK(CalledOnValidThread());
477 int rv = result;
478 do {
479 ReadState state = read_state_;
480 read_state_ = READ_STATE_NONE;
481
482 switch (state) {
483 case READ_STATE_READ:
484 rv = DoRead();
485 break;
486 case READ_STATE_READ_COMPLETE:
487 rv = DoReadComplete(rv);
488 break;
489 case READ_STATE_ERROR:
490 rv = DoReadError(rv);
491 break;
492 default:
493 NOTREACHED() << "BUG in read state machine";
494 break;
495 }
496 } while (rv != net::ERR_IO_PENDING && read_state_ != READ_STATE_NONE);
497 }
498
499 int CastSocket::DoRead() {
500 read_state_ = READ_STATE_READ_COMPLETE;
452 // Figure out if we are reading the header or body, and the remaining bytes. 501 // Figure out if we are reading the header or body, and the remaining bytes.
453 uint32 num_bytes_to_read = 0; 502 uint32 num_bytes_to_read = 0;
454 if (header_read_buffer_->RemainingCapacity() > 0) { 503 if (header_read_buffer_->RemainingCapacity() > 0) {
455 current_read_buffer_ = header_read_buffer_; 504 current_read_buffer_ = header_read_buffer_;
456 num_bytes_to_read = header_read_buffer_->RemainingCapacity(); 505 num_bytes_to_read = header_read_buffer_->RemainingCapacity();
457 DCHECK_LE(num_bytes_to_read, kMessageHeaderSize); 506 DCHECK_LE(num_bytes_to_read, kMessageHeaderSize);
458 } else { 507 } else {
459 DCHECK_GT(current_message_size_, 0U); 508 DCHECK_GT(current_message_size_, 0U);
460 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); 509 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset();
461 current_read_buffer_ = body_read_buffer_; 510 current_read_buffer_ = body_read_buffer_;
462 DCHECK_LE(num_bytes_to_read, kMaxMessageSize); 511 DCHECK_LE(num_bytes_to_read, kMaxMessageSize);
463 } 512 }
464 DCHECK_GT(num_bytes_to_read, 0U); 513 DCHECK_GT(num_bytes_to_read, 0U);
514
465 // We read up to num_bytes_to_read into |current_read_buffer_|. 515 // We read up to num_bytes_to_read into |current_read_buffer_|.
466 int result = socket_->Read( 516 return socket_->Read(
467 current_read_buffer_.get(), 517 current_read_buffer_.get(),
468 num_bytes_to_read, 518 num_bytes_to_read,
469 base::Bind(&CastSocket::OnReadData, AsWeakPtr())); 519 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr()));
470 VLOG(1) << "ReadData result = " << result;
471 if (result > 0) {
472 OnReadData(result);
473 } else if (result != net::ERR_IO_PENDING) {
474 CloseWithError(CHANNEL_ERROR_SOCKET_ERROR);
475 }
476 return result;
477 } 520 }
478 521
479 void CastSocket::OnReadData(int result) { 522 int CastSocket::DoReadComplete(int result) {
480 DCHECK(CalledOnValidThread()); 523 DCHECK(CalledOnValidThread());
481 VLOG(1) << "OnReadData result = " << result 524 VLOG(1) << "DoReadDataComplete result = " << result
482 << " header offset = " << header_read_buffer_->offset() 525 << " header offset = " << header_read_buffer_->offset()
483 << " body offset = " << body_read_buffer_->offset(); 526 << " body offset = " << body_read_buffer_->offset();
484 read_callback_pending_ = false;
485 if (result <= 0) { 527 if (result <= 0) {
486 CloseWithError(CHANNEL_ERROR_SOCKET_ERROR); 528 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
487 return; 529 read_state_ = READ_STATE_ERROR;
530 return result;
488 } 531 }
532
489 // We read some data. Move the offset in the current buffer forward. 533 // We read some data. Move the offset in the current buffer forward.
490 DCHECK_LE(current_read_buffer_->offset() + result, 534 DCHECK_LE(current_read_buffer_->offset() + result,
491 current_read_buffer_->capacity()); 535 current_read_buffer_->capacity());
492 current_read_buffer_->set_offset(current_read_buffer_->offset() + result); 536 current_read_buffer_->set_offset(current_read_buffer_->offset() + result);
493 537
494 bool should_continue = true; 538 bool success = true;
495 if (current_read_buffer_.get() == header_read_buffer_.get() && 539 if (current_read_buffer_.get() == header_read_buffer_.get() &&
496 current_read_buffer_->RemainingCapacity() == 0) { 540 current_read_buffer_->RemainingCapacity() == 0) {
497 // If we have read a full header, process the contents. 541 // If we have read a full header, process the contents.
498 should_continue = ProcessHeader(); 542 success = ProcessHeader();
499 } else if (current_read_buffer_.get() == body_read_buffer_.get() && 543 } else if (current_read_buffer_.get() == body_read_buffer_.get() &&
500 static_cast<uint32>(current_read_buffer_->offset()) == 544 static_cast<uint32>(current_read_buffer_->offset()) ==
501 current_message_size_) { 545 current_message_size_) {
502 // If we have read a full body, process the contents. 546 // If we have read a full body, process the contents.
503 should_continue = ProcessBody(); 547 success = ProcessBody();
504 } 548 }
505 if (should_continue) 549
506 ReadData(); 550 if (success)
551 read_state_ = READ_STATE_READ;
552 return net::OK;
553 }
554
555 int CastSocket::DoReadError(int result) {
556 DCHECK_LT(result, 0);
557 CloseWithError(error_state_);
558 return net::OK;
507 } 559 }
508 560
509 bool CastSocket::ProcessHeader() { 561 bool CastSocket::ProcessHeader() {
510 DCHECK_EQ(static_cast<uint32>(header_read_buffer_->offset()), 562 DCHECK_EQ(static_cast<uint32>(header_read_buffer_->offset()),
511 kMessageHeaderSize); 563 kMessageHeaderSize);
512 MessageHeader header; 564 MessageHeader header;
513 MessageHeader::ReadFromIOBuffer(header_read_buffer_.get(), &header); 565 MessageHeader::ReadFromIOBuffer(header_read_buffer_.get(), &header);
514 if (header.message_size > kMaxMessageSize) { 566 if (header.message_size > kMaxMessageSize) {
515 CloseWithError(cast_channel::CHANNEL_ERROR_INVALID_MESSAGE); 567 error_state_ = cast_channel::CHANNEL_ERROR_INVALID_MESSAGE;
516 return false; 568 return false;
517 } 569 }
518 VLOG(1) << "Parsed header { message_size: " << header.message_size << " }"; 570 VLOG(1) << "Parsed header { message_size: " << header.message_size << " }";
519 current_message_size_ = header.message_size; 571 current_message_size_ = header.message_size;
520 return true; 572 return true;
521 } 573 }
522 574
523 bool CastSocket::ProcessBody() { 575 bool CastSocket::ProcessBody() {
524 DCHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()), 576 DCHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()),
525 current_message_size_); 577 current_message_size_);
526 if (!ParseMessageFromBody()) { 578 if (!ParseMessageFromBody()) {
527 CloseWithError(cast_channel::CHANNEL_ERROR_INVALID_MESSAGE); 579 error_state_ = cast_channel::CHANNEL_ERROR_INVALID_MESSAGE;
528 return false; 580 return false;
529 } 581 }
530 current_message_size_ = 0; 582 current_message_size_ = 0;
531 header_read_buffer_->set_offset(0); 583 header_read_buffer_->set_offset(0);
532 body_read_buffer_->set_offset(0); 584 body_read_buffer_->set_offset(0);
533 current_read_buffer_ = header_read_buffer_; 585 current_read_buffer_ = header_read_buffer_;
534 return true; 586 return true;
535 } 587 }
536 588
537 bool CastSocket::ParseMessageFromBody() { 589 bool CastSocket::ParseMessageFromBody() {
538 DCHECK(CalledOnValidThread()); 590 DCHECK(CalledOnValidThread());
539 DCHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()), 591 DCHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()),
540 current_message_size_); 592 current_message_size_);
541 CastMessage message_proto; 593 CastMessage message_proto;
542 if (!message_proto.ParseFromArray( 594 if (!message_proto.ParseFromArray(
543 body_read_buffer_->StartOfBuffer(), 595 body_read_buffer_->StartOfBuffer(),
544 current_message_size_)) 596 current_message_size_)) {
545 return false; 597 return false;
598 }
546 VLOG(1) << "Parsed message " << CastMessageToString(message_proto); 599 VLOG(1) << "Parsed message " << CastMessageToString(message_proto);
547 // If the message is an auth message then we handle it internally. 600 // If the message is an auth message then we handle it internally.
548 if (IsAuthMessage(message_proto)) { 601 if (IsAuthMessage(message_proto)) {
549 challenge_reply_.reset(new CastMessage(message_proto)); 602 challenge_reply_.reset(new CastMessage(message_proto));
550 OnChallengeEvent(net::OK); 603 DoConnectLoop(net::OK);
551 } else if (delegate_) { 604 } else if (delegate_) {
552 MessageInfo message; 605 MessageInfo message;
553 if (!CastMessageToMessageInfo(message_proto, &message)) 606 if (!CastMessageToMessageInfo(message_proto, &message))
554 return false; 607 return false;
555 delegate_->OnMessage(this, message); 608 delegate_->OnMessage(this, message);
556 } 609 }
557 return true; 610 return true;
558 } 611 }
559 612
560 // static 613 // static
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
670 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), 723 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data),
671 message_data.size()); 724 message_data.size());
672 return true; 725 return true;
673 } 726 }
674 727
675 CastSocket::WriteRequest::~WriteRequest() { } 728 CastSocket::WriteRequest::~WriteRequest() { }
676 729
677 } // namespace cast_channel 730 } // namespace cast_channel
678 } // namespace api 731 } // namespace api
679 } // namespace extensions 732 } // namespace extensions
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698