Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(206)

Side by Side Diff: extensions/browser/api/cast_channel/cast_socket.cc

Issue 417403002: Remove weak pointers from CastSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Refactor to use CancelableCallback Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "extensions/browser/api/cast_channel/cast_socket.h" 5 #include "extensions/browser/api/cast_channel/cast_socket.h"
6 6
7 #include <stdlib.h> 7 #include <stdlib.h>
8 #include <string.h> 8 #include <string.h>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 net_log_source_.id = net_log_->NextID(); 92 net_log_source_.id = net_log_->NextID();
93 93
94 // Reuse these buffers for each message. 94 // Reuse these buffers for each message.
95 header_read_buffer_ = new net::GrowableIOBuffer(); 95 header_read_buffer_ = new net::GrowableIOBuffer();
96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); 96 header_read_buffer_->SetCapacity(MessageHeader::header_size());
97 body_read_buffer_ = new net::GrowableIOBuffer(); 97 body_read_buffer_ = new net::GrowableIOBuffer();
98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); 98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size());
99 current_read_buffer_ = header_read_buffer_; 99 current_read_buffer_ = header_read_buffer_;
100 } 100 }
101 101
102 CastSocket::~CastSocket() { } 102 CastSocket::~CastSocket() {
103 CloseInternal();
104 }
103 105
104 ReadyState CastSocket::ready_state() const { 106 ReadyState CastSocket::ready_state() const {
105 return ready_state_; 107 return ready_state_;
106 } 108 }
107 109
108 ChannelError CastSocket::error_state() const { 110 ChannelError CastSocket::error_state() const {
109 return error_state_; 111 return error_state_;
110 } 112 }
111 113
112 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { 114 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 DCHECK(CalledOnValidThread()); 171 DCHECK(CalledOnValidThread());
170 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; 172 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_;
171 if (ready_state_ != READY_STATE_NONE) { 173 if (ready_state_ != READY_STATE_NONE) {
172 callback.Run(net::ERR_CONNECTION_FAILED); 174 callback.Run(net::ERR_CONNECTION_FAILED);
173 return; 175 return;
174 } 176 }
175 ready_state_ = READY_STATE_CONNECTING; 177 ready_state_ = READY_STATE_CONNECTING;
176 connect_callback_ = callback; 178 connect_callback_ = callback;
177 connect_state_ = CONN_STATE_TCP_CONNECT; 179 connect_state_ = CONN_STATE_TCP_CONNECT;
178 if (connect_timeout_.InMicroseconds() > 0) { 180 if (connect_timeout_.InMicroseconds() > 0) {
179 GetTimer()->Start( 181 DCHECK(cancel_connect_callback_.IsCancelled());
180 FROM_HERE, 182 cancel_connect_callback_.Reset(base::Bind(&CastSocket::CancelConnect,
181 connect_timeout_, 183 base::Unretained(this)));
Wez 2014/07/30 19:14:53 nit: Style guide would prefer wrapping this after
mark a. foltz 2014/07/31 18:33:17 It prefers wrapping at parenthesis
182 base::Bind(&CastSocket::CancelConnect, AsWeakPtr())); 184 GetTimer()->Start(FROM_HERE,
185 connect_timeout_,
186 cancel_connect_callback_.callback());
183 } 187 }
184 DoConnectLoop(net::OK); 188 DoConnectLoop(net::OK);
185 } 189 }
186 190
187 void CastSocket::PostTaskToStartConnectLoop(int result) { 191 void CastSocket::PostTaskToStartConnectLoop(int result) {
188 DCHECK(CalledOnValidThread()); 192 DCHECK(CalledOnValidThread());
189 base::MessageLoop::current()->PostTask( 193 DCHECK(connect_loop_callback_.IsCancelled());
190 FROM_HERE, 194 connect_loop_callback_.Reset(base::Bind(&CastSocket::DoConnectLoop,
191 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr(), result)); 195 base::Unretained(this),
196 result));
197 base::MessageLoop::current()->PostTask(FROM_HERE,
198 connect_loop_callback_.callback());
192 } 199 }
193 200
194 void CastSocket::CancelConnect() { 201 void CastSocket::CancelConnect() {
195 DCHECK(CalledOnValidThread()); 202 DCHECK(CalledOnValidThread());
196 // Stop all pending connection setup tasks and report back to the client. 203 // Stop all pending connection setup tasks and report back to the client.
197 is_canceled_ = true; 204 is_canceled_ = true;
198 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; 205 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection.";
199 DoConnectCallback(net::ERR_TIMED_OUT); 206 DoConnectCallback(net::ERR_TIMED_OUT);
200 } 207 }
201 208
202 // This method performs the state machine transitions for connection flow. 209 // This method performs the state machine transitions for connection flow.
203 // There are two entry points to this method: 210 // There are two entry points to this method:
204 // 1. Connect method: this starts the flow 211 // 1. Connect method: this starts the flow
205 // 2. Callback from network operations that finish asynchronously 212 // 2. Callback from network operations that finish asynchronously
206 void CastSocket::DoConnectLoop(int result) { 213 void CastSocket::DoConnectLoop(int result) {
214 connect_loop_callback_.Cancel();
207 if (is_canceled_) { 215 if (is_canceled_) {
208 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; 216 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop.";
209 return; 217 return;
210 } 218 }
211 // Network operations can either finish synchronously or asynchronously. 219 // Network operations can either finish synchronously or asynchronously.
212 // This method executes the state machine transitions in a loop so that 220 // This method executes the state machine transitions in a loop so that
213 // correct state transitions happen even when network operations finish 221 // correct state transitions happen even when network operations finish
214 // synchronously. 222 // synchronously.
215 int rv = result; 223 int rv = result;
216 do { 224 do {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 if (rv != net::ERR_IO_PENDING) { 262 if (rv != net::ERR_IO_PENDING) {
255 GetTimer()->Stop(); 263 GetTimer()->Stop();
256 DoConnectCallback(rv); 264 DoConnectCallback(rv);
257 } 265 }
258 } 266 }
259 267
260 int CastSocket::DoTcpConnect() { 268 int CastSocket::DoTcpConnect() {
261 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; 269 VLOG_WITH_CONNECTION(1) << "DoTcpConnect";
262 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; 270 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE;
263 tcp_socket_ = CreateTcpSocket(); 271 tcp_socket_ = CreateTcpSocket();
272 DCHECK(connect_loop_callback_.IsCancelled());
Wez 2014/07/30 19:14:53 nit: Check this on entry to DoTcpConnect()?
Wez 2014/07/30 19:14:53 IIUC this is basically checking that after a PostT
mark a. foltz 2014/07/31 18:33:17 Done.
mark a. foltz 2014/07/31 18:33:17 There should be at most one pending callback to re
264 return tcp_socket_->Connect( 273 return tcp_socket_->Connect(
265 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); 274 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this)));
266 } 275 }
267 276
268 int CastSocket::DoTcpConnectComplete(int result) { 277 int CastSocket::DoTcpConnectComplete(int result) {
269 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; 278 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result;
270 if (result == net::OK) { 279 if (result == net::OK) {
271 // Enable TCP protocol-level keep-alive. 280 // Enable TCP protocol-level keep-alive.
272 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); 281 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs);
273 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; 282 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive.";
274 connect_state_ = CONN_STATE_SSL_CONNECT; 283 connect_state_ = CONN_STATE_SSL_CONNECT;
275 } 284 }
276 return result; 285 return result;
277 } 286 }
278 287
279 int CastSocket::DoSslConnect() { 288 int CastSocket::DoSslConnect() {
280 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; 289 VLOG_WITH_CONNECTION(1) << "DoSslConnect";
281 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; 290 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE;
282 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); 291 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>());
292 DCHECK(connect_loop_callback_.IsCancelled());
Wez 2014/07/30 19:14:53 nit: Check this on entry to the method?
mark a. foltz 2014/07/31 18:33:16 Done.
283 return socket_->Connect( 293 return socket_->Connect(
284 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); 294 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this)));
285 } 295 }
286 296
287 int CastSocket::DoSslConnectComplete(int result) { 297 int CastSocket::DoSslConnectComplete(int result) {
288 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; 298 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result;
289 if (result == net::ERR_CERT_AUTHORITY_INVALID && 299 if (result == net::ERR_CERT_AUTHORITY_INVALID &&
290 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { 300 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) {
291 connect_state_ = CONN_STATE_TCP_CONNECT; 301 connect_state_ = CONN_STATE_TCP_CONNECT;
292 } else if (result == net::OK && 302 } else if (result == net::OK &&
293 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { 303 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) {
294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; 304 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND;
295 } 305 }
296 return result; 306 return result;
297 } 307 }
298 308
299 int CastSocket::DoAuthChallengeSend() { 309 int CastSocket::DoAuthChallengeSend() {
300 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; 310 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend";
301 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; 311 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE;
302 CastMessage challenge_message; 312 CastMessage challenge_message;
303 CreateAuthChallengeMessage(&challenge_message); 313 CreateAuthChallengeMessage(&challenge_message);
304 VLOG_WITH_CONNECTION(1) << "Sending challenge: " 314 VLOG_WITH_CONNECTION(1) << "Sending challenge: "
305 << CastMessageToString(challenge_message); 315 << CastMessageToString(challenge_message);
306 // Post a task to send auth challenge so that DoWriteLoop is not nested inside 316 // Post a task to send auth challenge so that DoWriteLoop is not nested inside
307 // DoConnectLoop. This is not strictly necessary but keeps the write loop 317 // DoConnectLoop. This is not strictly necessary but keeps the write loop
308 // code decoupled from connect loop code. 318 // code decoupled from connect loop code.
319 DCHECK(send_auth_challenge_callback_.IsCancelled());
320 send_auth_challenge_callback_.Reset(
321 base::Bind(&CastSocket::SendCastMessageInternal,
322 base::Unretained(this),
323 challenge_message,
324 base::Bind(&CastSocket::DoAuthChallengeSendWriteComplete,
325 base::Unretained(this))));
Wez 2014/07/30 19:14:53 Ick... do we really need a clean stack for SendCas
mark a. foltz 2014/07/31 18:33:17 It appears to be "not strictly necessary" but othe
309 base::MessageLoop::current()->PostTask( 326 base::MessageLoop::current()->PostTask(
310 FROM_HERE, 327 FROM_HERE,
311 base::Bind(&CastSocket::SendCastMessageInternal, 328 send_auth_challenge_callback_.callback());
312 AsWeakPtr(),
313 challenge_message,
314 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())));
315 // Always return IO_PENDING since the result is always asynchronous. 329 // Always return IO_PENDING since the result is always asynchronous.
316 return net::ERR_IO_PENDING; 330 return net::ERR_IO_PENDING;
317 } 331 }
318 332
333 void CastSocket::DoAuthChallengeSendWriteComplete(int result) {
334 send_auth_challenge_callback_.Cancel();
335 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result;
336 DCHECK_GT(result, 0);
337 DCHECK_EQ(write_queue_.size(), 1UL);
338 PostTaskToStartConnectLoop(result);
339 }
340
319 int CastSocket::DoAuthChallengeSendComplete(int result) { 341 int CastSocket::DoAuthChallengeSendComplete(int result) {
320 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; 342 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result;
321 if (result < 0) 343 if (result < 0)
322 return result; 344 return result;
323 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; 345 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE;
324 // Post a task to start read loop so that DoReadLoop is not nested inside 346 // Post a task to start read loop so that DoReadLoop is not nested inside
325 // DoConnectLoop. This is not strictly necessary but keeps the read loop 347 // DoConnectLoop. This is not strictly necessary but keeps the read loop
326 // code decoupled from connect loop code. 348 // code decoupled from connect loop code.
327 PostTaskToStartReadLoop(); 349 PostTaskToStartReadLoop();
328 // Always return IO_PENDING since the result is always asynchronous. 350 // Always return IO_PENDING since the result is always asynchronous.
(...skipping 18 matching lines...) Expand all
347 } else if (result == net::ERR_TIMED_OUT) { 369 } else if (result == net::ERR_TIMED_OUT) {
348 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; 370 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT;
349 } else { 371 } else {
350 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; 372 error_state_ = CHANNEL_ERROR_CONNECT_ERROR;
351 } 373 }
352 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; 374 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback";
353 base::ResetAndReturn(&connect_callback_).Run(result); 375 base::ResetAndReturn(&connect_callback_).Run(result);
354 } 376 }
355 377
356 void CastSocket::Close(const net::CompletionCallback& callback) { 378 void CastSocket::Close(const net::CompletionCallback& callback) {
357 DCHECK(CalledOnValidThread()); 379 CloseInternal();
380 callback.Run(net::OK);
381 // |callback| can delete |this|
Wez 2014/07/30 19:14:53 nit: Callers must inherently cope with asynchronou
mark a. foltz 2014/07/31 18:33:16 This comment is out of date and removed. Deletion
382 }
383
384 void CastSocket::CloseInternal() {
385 // TODO(mfoltz): Enforce this when CastChannelAPITest is rewritten to create
386 // and free sockets on the same thread. crbug.com/398242
387 // DCHECK(CalledOnValidThread());
388 if (ready_state_ == READY_STATE_CLOSED) {
389 return;
390 }
358 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; 391 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_;
359 tcp_socket_.reset(); 392 tcp_socket_.reset();
360 socket_.reset(); 393 socket_.reset();
361 cert_verifier_.reset(); 394 cert_verifier_.reset();
362 transport_security_state_.reset(); 395 transport_security_state_.reset();
396 GetTimer()->Stop();
397 // Reset callbacks passed into us. Or should we invoke them with an error?
Wez 2014/07/30 19:14:53 nit: Suggest blank line before this comment, and b
Wez 2014/07/30 19:14:53 If the socket is being closed then there shouldn't
mark a. foltz 2014/07/31 18:33:16 Done.
mark a. foltz 2014/07/31 18:33:17 Actually, there may be a Promise pending on the ca
398 connect_callback_.Reset();
399 for (; !write_queue_.empty(); write_queue_.pop()) {
400 write_queue_.front().callback.Reset();
401 }
402 // Cancel callbacks that we queued ourselves to re-enter the connect or read
403 // loops.
404 connect_loop_callback_.Cancel();
405 send_auth_challenge_callback_.Cancel();
406 read_loop_callback_.Cancel();
407 cancel_connect_callback_.Cancel();
363 ready_state_ = READY_STATE_CLOSED; 408 ready_state_ = READY_STATE_CLOSED;
364 callback.Run(net::OK);
365 // |callback| can delete |this|
366 } 409 }
367 410
368 void CastSocket::SendMessage(const MessageInfo& message, 411 void CastSocket::SendMessage(const MessageInfo& message,
369 const net::CompletionCallback& callback) { 412 const net::CompletionCallback& callback) {
370 DCHECK(CalledOnValidThread()); 413 DCHECK(CalledOnValidThread());
371 if (ready_state_ != READY_STATE_OPEN) { 414 if (ready_state_ != READY_STATE_OPEN) {
372 callback.Run(net::ERR_FAILED); 415 callback.Run(net::ERR_FAILED);
373 return; 416 return;
374 } 417 }
375 CastMessage message_proto; 418 CastMessage message_proto;
376 if (!MessageInfoToCastMessage(message, &message_proto)) { 419 if (!MessageInfoToCastMessage(message, &message_proto)) {
377 callback.Run(net::ERR_FAILED); 420 callback.Run(net::ERR_FAILED);
378 return; 421 return;
379 } 422 }
380
381 SendCastMessageInternal(message_proto, callback); 423 SendCastMessageInternal(message_proto, callback);
382 } 424 }
383 425
384 void CastSocket::SendCastMessageInternal( 426 void CastSocket::SendCastMessageInternal(
385 const CastMessage& message, 427 const CastMessage& message,
386 const net::CompletionCallback& callback) { 428 const net::CompletionCallback& callback) {
387 WriteRequest write_request(callback); 429 WriteRequest write_request(callback);
388 if (!write_request.SetContent(message)) { 430 if (!write_request.SetContent(message)) {
389 callback.Run(net::ERR_FAILED); 431 callback.Run(net::ERR_FAILED);
390 return; 432 return;
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
447 489
448 int CastSocket::DoWrite() { 490 int CastSocket::DoWrite() {
449 DCHECK(!write_queue_.empty()); 491 DCHECK(!write_queue_.empty());
450 WriteRequest& request = write_queue_.front(); 492 WriteRequest& request = write_queue_.front();
451 493
452 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " 494 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
453 << request.io_buffer->size() << " bytes_written " 495 << request.io_buffer->size() << " bytes_written "
454 << request.io_buffer->BytesConsumed(); 496 << request.io_buffer->BytesConsumed();
455 497
456 write_state_ = WRITE_STATE_WRITE_COMPLETE; 498 write_state_ = WRITE_STATE_WRITE_COMPLETE;
457
458 return socket_->Write( 499 return socket_->Write(
459 request.io_buffer.get(), 500 request.io_buffer.get(),
460 request.io_buffer->BytesRemaining(), 501 request.io_buffer->BytesRemaining(),
461 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr())); 502 base::Bind(&CastSocket::DoWriteLoop, base::Unretained(this)));
462 } 503 }
463 504
464 int CastSocket::DoWriteComplete(int result) { 505 int CastSocket::DoWriteComplete(int result) {
465 DCHECK(!write_queue_.empty()); 506 DCHECK(!write_queue_.empty());
466 if (result <= 0) { // NOTE that 0 also indicates an error 507 if (result <= 0) { // NOTE that 0 also indicates an error
467 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; 508 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
468 write_state_ = WRITE_STATE_ERROR; 509 write_state_ = WRITE_STATE_ERROR;
469 return result == 0 ? net::ERR_FAILED : result; 510 return result == 0 ? net::ERR_FAILED : result;
470 } 511 }
471 512
472 // Some bytes were successfully written 513 // Some bytes were successfully written
473 WriteRequest& request = write_queue_.front(); 514 WriteRequest& request = write_queue_.front();
474 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; 515 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
475 io_buffer->DidConsume(result); 516 io_buffer->DidConsume(result);
476 if (io_buffer->BytesRemaining() == 0) // Message fully sent 517 if (io_buffer->BytesRemaining() == 0) // Message fully sent
477 write_state_ = WRITE_STATE_DO_CALLBACK; 518 write_state_ = WRITE_STATE_DO_CALLBACK;
478 else 519 else
479 write_state_ = WRITE_STATE_WRITE; 520 write_state_ = WRITE_STATE_WRITE;
480 521
481 return net::OK; 522 return net::OK;
482 } 523 }
483 524
484 int CastSocket::DoWriteCallback() { 525 int CastSocket::DoWriteCallback() {
485 DCHECK(!write_queue_.empty()); 526 DCHECK(!write_queue_.empty());
527 write_state_ = WRITE_STATE_WRITE;
486 WriteRequest& request = write_queue_.front(); 528 WriteRequest& request = write_queue_.front();
487 int bytes_consumed = request.io_buffer->BytesConsumed(); 529 int bytes_consumed = request.io_buffer->BytesConsumed();
488 530 request.callback.Run(bytes_consumed);
489 // If inside connection flow, then there should be exaclty one item in 531 write_queue_.pop();
490 // the write queue.
491 if (ready_state_ == READY_STATE_CONNECTING) {
492 write_queue_.pop();
493 DCHECK(write_queue_.empty());
494 PostTaskToStartConnectLoop(bytes_consumed);
495 } else {
496 WriteRequest& request = write_queue_.front();
497 request.callback.Run(bytes_consumed);
498 write_queue_.pop();
499 }
500 write_state_ = WRITE_STATE_WRITE;
501 return net::OK; 532 return net::OK;
502 } 533 }
503 534
504 int CastSocket::DoWriteError(int result) { 535 int CastSocket::DoWriteError(int result) {
505 DCHECK(!write_queue_.empty()); 536 DCHECK(!write_queue_.empty());
506 DCHECK_LT(result, 0); 537 DCHECK_LT(result, 0);
507 538
508 // If inside connection flow, then there should be exactly one item in 539 // If inside connection flow, then there should be exactly one item in
509 // the write queue. 540 // the write queue.
510 if (ready_state_ == READY_STATE_CONNECTING) { 541 if (ready_state_ == READY_STATE_CONNECTING) {
511 write_queue_.pop(); 542 write_queue_.pop();
512 DCHECK(write_queue_.empty()); 543 DCHECK(write_queue_.empty());
513 PostTaskToStartConnectLoop(result); 544 PostTaskToStartConnectLoop(result);
514 // Connect loop will handle the error. Return net::OK so that write flow 545 // Connect loop will handle the error. Return net::OK so that write flow
515 // does not try to report error also. 546 // does not try to report error also.
516 return net::OK; 547 return net::OK;
517 } 548 }
518 549
519 while (!write_queue_.empty()) { 550 while (!write_queue_.empty()) {
520 WriteRequest& request = write_queue_.front(); 551 WriteRequest& request = write_queue_.front();
521 request.callback.Run(result); 552 request.callback.Run(result);
522 write_queue_.pop(); 553 write_queue_.pop();
523 } 554 }
524 return net::ERR_FAILED; 555 return net::ERR_FAILED;
525 } 556 }
526 557
527 void CastSocket::PostTaskToStartReadLoop() { 558 void CastSocket::PostTaskToStartReadLoop() {
528 DCHECK(CalledOnValidThread()); 559 DCHECK(CalledOnValidThread());
529 base::MessageLoop::current()->PostTask( 560 DCHECK(read_loop_callback_.IsCancelled());
530 FROM_HERE, 561 read_loop_callback_.Reset(base::Bind(&CastSocket::StartReadLoop,
531 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr())); 562 base::Unretained(this)));
563 base::MessageLoop::current()->PostTask(FROM_HERE,
564 read_loop_callback_.callback());
532 } 565 }
533 566
534 void CastSocket::StartReadLoop() { 567 void CastSocket::StartReadLoop() {
568 read_loop_callback_.Cancel();
535 // Read loop would have already been started if read state is not NONE 569 // Read loop would have already been started if read state is not NONE
536 if (read_state_ == READ_STATE_NONE) { 570 if (read_state_ == READ_STATE_NONE) {
537 read_state_ = READ_STATE_READ; 571 read_state_ = READ_STATE_READ;
538 DoReadLoop(net::OK); 572 DoReadLoop(net::OK);
539 } 573 }
540 } 574 }
541 575
542 void CastSocket::DoReadLoop(int result) { 576 void CastSocket::DoReadLoop(int result) {
543 DCHECK(CalledOnValidThread()); 577 DCHECK(CalledOnValidThread());
544 // Network operations can either finish synchronously or asynchronously. 578 // Network operations can either finish synchronously or asynchronously.
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
596 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); 630 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset();
597 current_read_buffer_ = body_read_buffer_; 631 current_read_buffer_ = body_read_buffer_;
598 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); 632 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size());
599 } 633 }
600 CHECK_GT(num_bytes_to_read, 0U); 634 CHECK_GT(num_bytes_to_read, 0U);
601 635
602 // Read up to num_bytes_to_read into |current_read_buffer_|. 636 // Read up to num_bytes_to_read into |current_read_buffer_|.
603 return socket_->Read( 637 return socket_->Read(
604 current_read_buffer_.get(), 638 current_read_buffer_.get(),
605 num_bytes_to_read, 639 num_bytes_to_read,
606 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr())); 640 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this)));
607 } 641 }
608 642
609 int CastSocket::DoReadComplete(int result) { 643 int CastSocket::DoReadComplete(int result) {
610 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result 644 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result
611 << " header offset = " 645 << " header offset = "
612 << header_read_buffer_->offset() 646 << header_read_buffer_->offset()
613 << " body offset = " << body_read_buffer_->offset(); 647 << " body offset = " << body_read_buffer_->offset();
614 if (result <= 0) { // 0 means EOF: the peer closed the socket 648 if (result <= 0) { // 0 means EOF: the peer closed the socket
615 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; 649 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket";
616 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; 650 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
716 return false; 750 return false;
717 } 751 }
718 CastSocket::MessageHeader header; 752 CastSocket::MessageHeader header;
719 header.SetMessageSize(message_size); 753 header.SetMessageSize(message_size);
720 header.PrependToString(message_data); 754 header.PrependToString(message_data);
721 return true; 755 return true;
722 } 756 }
723 757
724 void CastSocket::CloseWithError(ChannelError error) { 758 void CastSocket::CloseWithError(ChannelError error) {
725 DCHECK(CalledOnValidThread()); 759 DCHECK(CalledOnValidThread());
726 socket_.reset(NULL); 760 CloseInternal();
727 ready_state_ = READY_STATE_CLOSED;
728 error_state_ = error; 761 error_state_ = error;
729 if (delegate_) 762 if (delegate_)
730 delegate_->OnError(this, error); 763 delegate_->OnError(this, error);
731 } 764 }
732 765
733 std::string CastSocket::CastUrl() const { 766 std::string CastSocket::CastUrl() const {
734 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? 767 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ?
735 "casts://" : "cast://") + ip_endpoint_.ToString(); 768 "casts://" : "cast://") + ip_endpoint_.ToString();
736 } 769 }
737 770
(...skipping 11 matching lines...) Expand all
749 DCHECK_LT(size, static_cast<size_t>(kuint32max)); 782 DCHECK_LT(size, static_cast<size_t>(kuint32max));
750 DCHECK_GT(size, 0U); 783 DCHECK_GT(size, 0U);
751 message_size = size; 784 message_size = size;
752 } 785 }
753 786
754 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, 787 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle,
755 // if bit-for-bit compatible. 788 // if bit-for-bit compatible.
756 void CastSocket::MessageHeader::PrependToString(std::string* str) { 789 void CastSocket::MessageHeader::PrependToString(std::string* str) {
757 MessageHeader output = *this; 790 MessageHeader output = *this;
758 output.message_size = base::HostToNet32(message_size); 791 output.message_size = base::HostToNet32(message_size);
759 size_t header_size = base::checked_cast<size_t,uint32>( 792 size_t header_size = base::checked_cast<size_t, uint32>(
760 MessageHeader::header_size()); 793 MessageHeader::header_size());
761 scoped_ptr<char, base::FreeDeleter> char_array( 794 scoped_ptr<char, base::FreeDeleter> char_array(
762 static_cast<char*>(malloc(header_size))); 795 static_cast<char*>(malloc(header_size)));
763 memcpy(char_array.get(), &output, header_size); 796 memcpy(char_array.get(), &output, header_size);
764 str->insert(0, char_array.get(), header_size); 797 str->insert(0, char_array.get(), header_size);
765 } 798 }
766 799
767 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, 800 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle,
768 // if bit-for-bit compatible. 801 // if bit-for-bit compatible.
769 void CastSocket::MessageHeader::ReadFromIOBuffer( 802 void CastSocket::MessageHeader::ReadFromIOBuffer(
770 net::GrowableIOBuffer* buffer, MessageHeader* header) { 803 net::GrowableIOBuffer* buffer, MessageHeader* header) {
771 uint32 message_size; 804 uint32 message_size;
772 size_t header_size = base::checked_cast<size_t,uint32>( 805 size_t header_size = base::checked_cast<size_t, uint32>(
773 MessageHeader::header_size()); 806 MessageHeader::header_size());
774 memcpy(&message_size, buffer->StartOfBuffer(), header_size); 807 memcpy(&message_size, buffer->StartOfBuffer(), header_size);
775 header->message_size = base::NetToHost32(message_size); 808 header->message_size = base::NetToHost32(message_size);
776 } 809 }
777 810
778 std::string CastSocket::MessageHeader::ToString() { 811 std::string CastSocket::MessageHeader::ToString() {
779 return "{message_size: " + base::UintToString(message_size) + "}"; 812 return "{message_size: " + base::UintToString(message_size) + "}";
780 } 813 }
781 814
782 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) 815 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback)
783 : callback(callback) { } 816 : callback(callback) { }
784 817
785 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { 818 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) {
786 DCHECK(!io_buffer.get()); 819 DCHECK(!io_buffer.get());
787 std::string message_data; 820 std::string message_data;
788 if (!Serialize(message_proto, &message_data)) 821 if (!Serialize(message_proto, &message_data))
789 return false; 822 return false;
790 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), 823 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data),
791 message_data.size()); 824 message_data.size());
792 return true; 825 return true;
793 } 826 }
794 827
795 CastSocket::WriteRequest::~WriteRequest() { } 828 CastSocket::WriteRequest::~WriteRequest() { }
796 829
797 } // namespace cast_channel 830 } // namespace cast_channel
798 } // namespace core_api 831 } // namespace core_api
799 } // namespace extensions 832 } // namespace extensions
800 833
801 #undef VLOG_WITH_CONNECTION 834 #undef VLOG_WITH_CONNECTION
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698