Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Side by Side Diff: extensions/browser/api/cast_channel/cast_socket.cc

Issue 417403002: Remove weak pointers from CastSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Respond to Wez comments. Fire callbacks in CloseInternal. Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "extensions/browser/api/cast_channel/cast_socket.h" 5 #include "extensions/browser/api/cast_channel/cast_socket.h"
6 6
7 #include <stdlib.h> 7 #include <stdlib.h>
8 #include <string.h> 8 #include <string.h>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 net_log_source_.id = net_log_->NextID(); 92 net_log_source_.id = net_log_->NextID();
93 93
94 // Reuse these buffers for each message. 94 // Reuse these buffers for each message.
95 header_read_buffer_ = new net::GrowableIOBuffer(); 95 header_read_buffer_ = new net::GrowableIOBuffer();
96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); 96 header_read_buffer_->SetCapacity(MessageHeader::header_size());
97 body_read_buffer_ = new net::GrowableIOBuffer(); 97 body_read_buffer_ = new net::GrowableIOBuffer();
98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); 98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size());
99 current_read_buffer_ = header_read_buffer_; 99 current_read_buffer_ = header_read_buffer_;
100 } 100 }
101 101
102 CastSocket::~CastSocket() { } 102 CastSocket::~CastSocket() {
103 CloseInternal();
104 }
103 105
104 ReadyState CastSocket::ready_state() const { 106 ReadyState CastSocket::ready_state() const {
105 return ready_state_; 107 return ready_state_;
106 } 108 }
107 109
108 ChannelError CastSocket::error_state() const { 110 ChannelError CastSocket::error_state() const {
109 return error_state_; 111 return error_state_;
110 } 112 }
111 113
112 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { 114 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 DCHECK(CalledOnValidThread()); 171 DCHECK(CalledOnValidThread());
170 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; 172 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_;
171 if (ready_state_ != READY_STATE_NONE) { 173 if (ready_state_ != READY_STATE_NONE) {
172 callback.Run(net::ERR_CONNECTION_FAILED); 174 callback.Run(net::ERR_CONNECTION_FAILED);
173 return; 175 return;
174 } 176 }
175 ready_state_ = READY_STATE_CONNECTING; 177 ready_state_ = READY_STATE_CONNECTING;
176 connect_callback_ = callback; 178 connect_callback_ = callback;
177 connect_state_ = CONN_STATE_TCP_CONNECT; 179 connect_state_ = CONN_STATE_TCP_CONNECT;
178 if (connect_timeout_.InMicroseconds() > 0) { 180 if (connect_timeout_.InMicroseconds() > 0) {
179 GetTimer()->Start( 181 DCHECK(connect_timeout_callback_.IsCancelled());
180 FROM_HERE, 182 connect_timeout_callback_.Reset(base::Bind(&CastSocket::CancelConnect,
181 connect_timeout_, 183 base::Unretained(this)));
182 base::Bind(&CastSocket::CancelConnect, AsWeakPtr())); 184 GetTimer()->Start(FROM_HERE,
185 connect_timeout_,
186 connect_timeout_callback_.callback());
183 } 187 }
184 DoConnectLoop(net::OK); 188 DoConnectLoop(net::OK);
185 } 189 }
186 190
187 void CastSocket::PostTaskToStartConnectLoop(int result) { 191 void CastSocket::PostTaskToStartConnectLoop(int result) {
188 DCHECK(CalledOnValidThread()); 192 DCHECK(CalledOnValidThread());
189 base::MessageLoop::current()->PostTask( 193 DCHECK(connect_loop_callback_.IsCancelled());
190 FROM_HERE, 194 connect_loop_callback_.Reset(base::Bind(&CastSocket::DoConnectLoop,
191 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr(), result)); 195 base::Unretained(this),
196 result));
197 base::MessageLoop::current()->PostTask(FROM_HERE,
198 connect_loop_callback_.callback());
192 } 199 }
193 200
194 void CastSocket::CancelConnect() { 201 void CastSocket::CancelConnect() {
195 DCHECK(CalledOnValidThread()); 202 DCHECK(CalledOnValidThread());
196 // Stop all pending connection setup tasks and report back to the client. 203 // Stop all pending connection setup tasks and report back to the client.
197 is_canceled_ = true; 204 is_canceled_ = true;
198 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; 205 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection.";
199 DoConnectCallback(net::ERR_TIMED_OUT); 206 DoConnectCallback(net::ERR_TIMED_OUT);
200 } 207 }
201 208
202 // This method performs the state machine transitions for connection flow. 209 // This method performs the state machine transitions for connection flow.
203 // There are two entry points to this method: 210 // There are two entry points to this method:
204 // 1. Connect method: this starts the flow 211 // 1. Connect method: this starts the flow
205 // 2. Callback from network operations that finish asynchronously 212 // 2. Callback from network operations that finish asynchronously
206 void CastSocket::DoConnectLoop(int result) { 213 void CastSocket::DoConnectLoop(int result) {
214 connect_loop_callback_.Cancel();
207 if (is_canceled_) { 215 if (is_canceled_) {
208 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; 216 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop.";
209 return; 217 return;
210 } 218 }
211 // Network operations can either finish synchronously or asynchronously. 219 // Network operations can either finish synchronously or asynchronously.
212 // This method executes the state machine transitions in a loop so that 220 // This method executes the state machine transitions in a loop so that
213 // correct state transitions happen even when network operations finish 221 // correct state transitions happen even when network operations finish
214 // synchronously. 222 // synchronously.
215 int rv = result; 223 int rv = result;
216 do { 224 do {
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 // b. The Do* method called did not change state 259 // b. The Do* method called did not change state
252 260
253 // Connect loop is finished: if there is no pending IO invoke the callback. 261 // Connect loop is finished: if there is no pending IO invoke the callback.
254 if (rv != net::ERR_IO_PENDING) { 262 if (rv != net::ERR_IO_PENDING) {
255 GetTimer()->Stop(); 263 GetTimer()->Stop();
256 DoConnectCallback(rv); 264 DoConnectCallback(rv);
257 } 265 }
258 } 266 }
259 267
260 int CastSocket::DoTcpConnect() { 268 int CastSocket::DoTcpConnect() {
269 DCHECK(connect_loop_callback_.IsCancelled());
261 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; 270 VLOG_WITH_CONNECTION(1) << "DoTcpConnect";
262 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; 271 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE;
263 tcp_socket_ = CreateTcpSocket(); 272 tcp_socket_ = CreateTcpSocket();
264 return tcp_socket_->Connect( 273 return tcp_socket_->Connect(
265 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); 274 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this)));
266 } 275 }
267 276
268 int CastSocket::DoTcpConnectComplete(int result) { 277 int CastSocket::DoTcpConnectComplete(int result) {
269 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; 278 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result;
270 if (result == net::OK) { 279 if (result == net::OK) {
271 // Enable TCP protocol-level keep-alive. 280 // Enable TCP protocol-level keep-alive.
272 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); 281 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs);
273 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; 282 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive.";
274 connect_state_ = CONN_STATE_SSL_CONNECT; 283 connect_state_ = CONN_STATE_SSL_CONNECT;
275 } 284 }
276 return result; 285 return result;
277 } 286 }
278 287
279 int CastSocket::DoSslConnect() { 288 int CastSocket::DoSslConnect() {
289 DCHECK(connect_loop_callback_.IsCancelled());
280 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; 290 VLOG_WITH_CONNECTION(1) << "DoSslConnect";
281 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; 291 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE;
282 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); 292 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>());
283 return socket_->Connect( 293 return socket_->Connect(
284 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); 294 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this)));
285 } 295 }
286 296
287 int CastSocket::DoSslConnectComplete(int result) { 297 int CastSocket::DoSslConnectComplete(int result) {
288 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; 298 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result;
289 if (result == net::ERR_CERT_AUTHORITY_INVALID && 299 if (result == net::ERR_CERT_AUTHORITY_INVALID &&
290 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { 300 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) {
291 connect_state_ = CONN_STATE_TCP_CONNECT; 301 connect_state_ = CONN_STATE_TCP_CONNECT;
292 } else if (result == net::OK && 302 } else if (result == net::OK &&
293 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { 303 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) {
294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; 304 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND;
295 } 305 }
296 return result; 306 return result;
297 } 307 }
298 308
299 int CastSocket::DoAuthChallengeSend() { 309 int CastSocket::DoAuthChallengeSend() {
300 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; 310 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend";
301 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; 311 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE;
302 CastMessage challenge_message; 312 CastMessage challenge_message;
303 CreateAuthChallengeMessage(&challenge_message); 313 CreateAuthChallengeMessage(&challenge_message);
304 VLOG_WITH_CONNECTION(1) << "Sending challenge: " 314 VLOG_WITH_CONNECTION(1) << "Sending challenge: "
305 << CastMessageToString(challenge_message); 315 << CastMessageToString(challenge_message);
306 // Post a task to send auth challenge so that DoWriteLoop is not nested inside 316 // Post a task to send auth challenge so that DoWriteLoop is not nested inside
307 // DoConnectLoop. This is not strictly necessary but keeps the write loop 317 // DoConnectLoop. This is not strictly necessary but keeps the write loop
308 // code decoupled from connect loop code. 318 // code decoupled from connect loop code.
319 DCHECK(send_auth_challenge_callback_.IsCancelled());
320 send_auth_challenge_callback_.Reset(
321 base::Bind(&CastSocket::SendCastMessageInternal,
322 base::Unretained(this),
323 challenge_message,
324 base::Bind(&CastSocket::DoAuthChallengeSendWriteComplete,
325 base::Unretained(this))));
309 base::MessageLoop::current()->PostTask( 326 base::MessageLoop::current()->PostTask(
310 FROM_HERE, 327 FROM_HERE,
311 base::Bind(&CastSocket::SendCastMessageInternal, 328 send_auth_challenge_callback_.callback());
312 AsWeakPtr(),
313 challenge_message,
314 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())));
315 // Always return IO_PENDING since the result is always asynchronous. 329 // Always return IO_PENDING since the result is always asynchronous.
316 return net::ERR_IO_PENDING; 330 return net::ERR_IO_PENDING;
317 } 331 }
318 332
333 void CastSocket::DoAuthChallengeSendWriteComplete(int result) {
334 send_auth_challenge_callback_.Cancel();
335 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result;
336 DCHECK_GT(result, 0);
337 DCHECK_EQ(write_queue_.size(), 1UL);
338 PostTaskToStartConnectLoop(result);
339 }
340
319 int CastSocket::DoAuthChallengeSendComplete(int result) { 341 int CastSocket::DoAuthChallengeSendComplete(int result) {
320 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; 342 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result;
321 if (result < 0) 343 if (result < 0)
322 return result; 344 return result;
323 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; 345 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE;
324 // Post a task to start read loop so that DoReadLoop is not nested inside 346 // Post a task to start read loop so that DoReadLoop is not nested inside
325 // DoConnectLoop. This is not strictly necessary but keeps the read loop 347 // DoConnectLoop. This is not strictly necessary but keeps the read loop
326 // code decoupled from connect loop code. 348 // code decoupled from connect loop code.
327 PostTaskToStartReadLoop(); 349 PostTaskToStartReadLoop();
328 // Always return IO_PENDING since the result is always asynchronous. 350 // Always return IO_PENDING since the result is always asynchronous.
(...skipping 18 matching lines...) Expand all
347 } else if (result == net::ERR_TIMED_OUT) { 369 } else if (result == net::ERR_TIMED_OUT) {
348 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; 370 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT;
349 } else { 371 } else {
350 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; 372 error_state_ = CHANNEL_ERROR_CONNECT_ERROR;
351 } 373 }
352 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; 374 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback";
353 base::ResetAndReturn(&connect_callback_).Run(result); 375 base::ResetAndReturn(&connect_callback_).Run(result);
354 } 376 }
355 377
356 void CastSocket::Close(const net::CompletionCallback& callback) { 378 void CastSocket::Close(const net::CompletionCallback& callback) {
357 DCHECK(CalledOnValidThread()); 379 CloseInternal();
380 callback.Run(net::OK);
381 }
382
383 void CastSocket::CloseInternal() {
384 // TODO(mfoltz): Enforce this when CastChannelAPITest is rewritten to create
385 // and free sockets on the same thread. crbug.com/398242
386 // DCHECK(CalledOnValidThread());
387 if (ready_state_ == READY_STATE_CLOSED) {
388 return;
389 }
358 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; 390 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_;
359 tcp_socket_.reset(); 391 tcp_socket_.reset();
360 socket_.reset(); 392 socket_.reset();
361 cert_verifier_.reset(); 393 cert_verifier_.reset();
362 transport_security_state_.reset(); 394 transport_security_state_.reset();
395 GetTimer()->Stop();
396
397 // Fire any remaining callbacks passed into us.
398 if (!connect_callback_.is_null()) {
399 connect_callback_.Run(net::ERR_CONNECTION_FAILED);
Wez 2014/07/31 22:57:53 So there is no possibility of this callback deleti
mark a. foltz 2014/08/01 23:21:48 Yes this is possible and that would be bad. The o
400 connect_callback_.Reset();
401 }
402 for (; !write_queue_.empty(); write_queue_.pop()) {
403 net::CompletionCallback& callback = write_queue_.front().callback;
404 callback.Run(net::ERR_FAILED);
405 callback.Reset();
406 }
407
408 // Cancel callbacks that we queued ourselves to re-enter the connect or read
409 // loops.
410 connect_loop_callback_.Cancel();
411 send_auth_challenge_callback_.Cancel();
412 read_loop_callback_.Cancel();
413 connect_timeout_callback_.Cancel();
363 ready_state_ = READY_STATE_CLOSED; 414 ready_state_ = READY_STATE_CLOSED;
364 callback.Run(net::OK);
365 // |callback| can delete |this|
366 } 415 }
367 416
368 void CastSocket::SendMessage(const MessageInfo& message, 417 void CastSocket::SendMessage(const MessageInfo& message,
369 const net::CompletionCallback& callback) { 418 const net::CompletionCallback& callback) {
370 DCHECK(CalledOnValidThread()); 419 DCHECK(CalledOnValidThread());
371 if (ready_state_ != READY_STATE_OPEN) { 420 if (ready_state_ != READY_STATE_OPEN) {
372 callback.Run(net::ERR_FAILED); 421 callback.Run(net::ERR_FAILED);
373 return; 422 return;
374 } 423 }
375 CastMessage message_proto; 424 CastMessage message_proto;
376 if (!MessageInfoToCastMessage(message, &message_proto)) { 425 if (!MessageInfoToCastMessage(message, &message_proto)) {
377 callback.Run(net::ERR_FAILED); 426 callback.Run(net::ERR_FAILED);
378 return; 427 return;
379 } 428 }
380
381 SendCastMessageInternal(message_proto, callback); 429 SendCastMessageInternal(message_proto, callback);
382 } 430 }
383 431
384 void CastSocket::SendCastMessageInternal( 432 void CastSocket::SendCastMessageInternal(
385 const CastMessage& message, 433 const CastMessage& message,
386 const net::CompletionCallback& callback) { 434 const net::CompletionCallback& callback) {
387 WriteRequest write_request(callback); 435 WriteRequest write_request(callback);
388 if (!write_request.SetContent(message)) { 436 if (!write_request.SetContent(message)) {
389 callback.Run(net::ERR_FAILED); 437 callback.Run(net::ERR_FAILED);
390 return; 438 return;
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
447 495
448 int CastSocket::DoWrite() { 496 int CastSocket::DoWrite() {
449 DCHECK(!write_queue_.empty()); 497 DCHECK(!write_queue_.empty());
450 WriteRequest& request = write_queue_.front(); 498 WriteRequest& request = write_queue_.front();
451 499
452 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " 500 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
453 << request.io_buffer->size() << " bytes_written " 501 << request.io_buffer->size() << " bytes_written "
454 << request.io_buffer->BytesConsumed(); 502 << request.io_buffer->BytesConsumed();
455 503
456 write_state_ = WRITE_STATE_WRITE_COMPLETE; 504 write_state_ = WRITE_STATE_WRITE_COMPLETE;
457
458 return socket_->Write( 505 return socket_->Write(
459 request.io_buffer.get(), 506 request.io_buffer.get(),
460 request.io_buffer->BytesRemaining(), 507 request.io_buffer->BytesRemaining(),
461 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr())); 508 base::Bind(&CastSocket::DoWriteLoop, base::Unretained(this)));
462 } 509 }
463 510
464 int CastSocket::DoWriteComplete(int result) { 511 int CastSocket::DoWriteComplete(int result) {
465 DCHECK(!write_queue_.empty()); 512 DCHECK(!write_queue_.empty());
466 if (result <= 0) { // NOTE that 0 also indicates an error 513 if (result <= 0) { // NOTE that 0 also indicates an error
467 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; 514 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
468 write_state_ = WRITE_STATE_ERROR; 515 write_state_ = WRITE_STATE_ERROR;
469 return result == 0 ? net::ERR_FAILED : result; 516 return result == 0 ? net::ERR_FAILED : result;
470 } 517 }
471 518
472 // Some bytes were successfully written 519 // Some bytes were successfully written
473 WriteRequest& request = write_queue_.front(); 520 WriteRequest& request = write_queue_.front();
474 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; 521 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
475 io_buffer->DidConsume(result); 522 io_buffer->DidConsume(result);
476 if (io_buffer->BytesRemaining() == 0) // Message fully sent 523 if (io_buffer->BytesRemaining() == 0) // Message fully sent
477 write_state_ = WRITE_STATE_DO_CALLBACK; 524 write_state_ = WRITE_STATE_DO_CALLBACK;
478 else 525 else
479 write_state_ = WRITE_STATE_WRITE; 526 write_state_ = WRITE_STATE_WRITE;
480 527
481 return net::OK; 528 return net::OK;
482 } 529 }
483 530
484 int CastSocket::DoWriteCallback() { 531 int CastSocket::DoWriteCallback() {
485 DCHECK(!write_queue_.empty()); 532 DCHECK(!write_queue_.empty());
533 write_state_ = WRITE_STATE_WRITE;
486 WriteRequest& request = write_queue_.front(); 534 WriteRequest& request = write_queue_.front();
487 int bytes_consumed = request.io_buffer->BytesConsumed(); 535 int bytes_consumed = request.io_buffer->BytesConsumed();
488 536 request.callback.Run(bytes_consumed);
489 // If inside connection flow, then there should be exaclty one item in 537 write_queue_.pop();
490 // the write queue.
491 if (ready_state_ == READY_STATE_CONNECTING) {
492 write_queue_.pop();
493 DCHECK(write_queue_.empty());
494 PostTaskToStartConnectLoop(bytes_consumed);
495 } else {
496 WriteRequest& request = write_queue_.front();
497 request.callback.Run(bytes_consumed);
498 write_queue_.pop();
499 }
500 write_state_ = WRITE_STATE_WRITE;
501 return net::OK; 538 return net::OK;
502 } 539 }
503 540
504 int CastSocket::DoWriteError(int result) { 541 int CastSocket::DoWriteError(int result) {
505 DCHECK(!write_queue_.empty()); 542 DCHECK(!write_queue_.empty());
506 DCHECK_LT(result, 0); 543 DCHECK_LT(result, 0);
507 544
508 // If inside connection flow, then there should be exactly one item in 545 // If inside connection flow, then there should be exactly one item in
509 // the write queue. 546 // the write queue.
510 if (ready_state_ == READY_STATE_CONNECTING) { 547 if (ready_state_ == READY_STATE_CONNECTING) {
511 write_queue_.pop(); 548 write_queue_.pop();
512 DCHECK(write_queue_.empty()); 549 DCHECK(write_queue_.empty());
513 PostTaskToStartConnectLoop(result); 550 PostTaskToStartConnectLoop(result);
514 // Connect loop will handle the error. Return net::OK so that write flow 551 // Connect loop will handle the error. Return net::OK so that write flow
515 // does not try to report error also. 552 // does not try to report error also.
516 return net::OK; 553 return net::OK;
517 } 554 }
518 555
519 while (!write_queue_.empty()) { 556 while (!write_queue_.empty()) {
520 WriteRequest& request = write_queue_.front(); 557 WriteRequest& request = write_queue_.front();
521 request.callback.Run(result); 558 request.callback.Run(result);
522 write_queue_.pop(); 559 write_queue_.pop();
523 } 560 }
524 return net::ERR_FAILED; 561 return net::ERR_FAILED;
525 } 562 }
526 563
527 void CastSocket::PostTaskToStartReadLoop() { 564 void CastSocket::PostTaskToStartReadLoop() {
528 DCHECK(CalledOnValidThread()); 565 DCHECK(CalledOnValidThread());
529 base::MessageLoop::current()->PostTask( 566 DCHECK(read_loop_callback_.IsCancelled());
530 FROM_HERE, 567 read_loop_callback_.Reset(base::Bind(&CastSocket::StartReadLoop,
531 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr())); 568 base::Unretained(this)));
569 base::MessageLoop::current()->PostTask(FROM_HERE,
570 read_loop_callback_.callback());
532 } 571 }
533 572
534 void CastSocket::StartReadLoop() { 573 void CastSocket::StartReadLoop() {
574 read_loop_callback_.Cancel();
535 // Read loop would have already been started if read state is not NONE 575 // Read loop would have already been started if read state is not NONE
536 if (read_state_ == READ_STATE_NONE) { 576 if (read_state_ == READ_STATE_NONE) {
537 read_state_ = READ_STATE_READ; 577 read_state_ = READ_STATE_READ;
538 DoReadLoop(net::OK); 578 DoReadLoop(net::OK);
539 } 579 }
540 } 580 }
541 581
542 void CastSocket::DoReadLoop(int result) { 582 void CastSocket::DoReadLoop(int result) {
543 DCHECK(CalledOnValidThread()); 583 DCHECK(CalledOnValidThread());
544 // Network operations can either finish synchronously or asynchronously. 584 // Network operations can either finish synchronously or asynchronously.
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
596 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); 636 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset();
597 current_read_buffer_ = body_read_buffer_; 637 current_read_buffer_ = body_read_buffer_;
598 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); 638 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size());
599 } 639 }
600 CHECK_GT(num_bytes_to_read, 0U); 640 CHECK_GT(num_bytes_to_read, 0U);
601 641
602 // Read up to num_bytes_to_read into |current_read_buffer_|. 642 // Read up to num_bytes_to_read into |current_read_buffer_|.
603 return socket_->Read( 643 return socket_->Read(
604 current_read_buffer_.get(), 644 current_read_buffer_.get(),
605 num_bytes_to_read, 645 num_bytes_to_read,
606 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr())); 646 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this)));
607 } 647 }
608 648
609 int CastSocket::DoReadComplete(int result) { 649 int CastSocket::DoReadComplete(int result) {
610 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result 650 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result
611 << " header offset = " 651 << " header offset = "
612 << header_read_buffer_->offset() 652 << header_read_buffer_->offset()
613 << " body offset = " << body_read_buffer_->offset(); 653 << " body offset = " << body_read_buffer_->offset();
614 if (result <= 0) { // 0 means EOF: the peer closed the socket 654 if (result <= 0) { // 0 means EOF: the peer closed the socket
615 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; 655 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket";
616 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; 656 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
716 return false; 756 return false;
717 } 757 }
718 CastSocket::MessageHeader header; 758 CastSocket::MessageHeader header;
719 header.SetMessageSize(message_size); 759 header.SetMessageSize(message_size);
720 header.PrependToString(message_data); 760 header.PrependToString(message_data);
721 return true; 761 return true;
722 } 762 }
723 763
724 void CastSocket::CloseWithError(ChannelError error) { 764 void CastSocket::CloseWithError(ChannelError error) {
725 DCHECK(CalledOnValidThread()); 765 DCHECK(CalledOnValidThread());
726 socket_.reset(NULL); 766 CloseInternal();
727 ready_state_ = READY_STATE_CLOSED;
728 error_state_ = error; 767 error_state_ = error;
729 if (delegate_) 768 if (delegate_)
730 delegate_->OnError(this, error); 769 delegate_->OnError(this, error);
731 } 770 }
732 771
733 std::string CastSocket::CastUrl() const { 772 std::string CastSocket::CastUrl() const {
734 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? 773 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ?
735 "casts://" : "cast://") + ip_endpoint_.ToString(); 774 "casts://" : "cast://") + ip_endpoint_.ToString();
736 } 775 }
737 776
(...skipping 11 matching lines...) Expand all
749 DCHECK_LT(size, static_cast<size_t>(kuint32max)); 788 DCHECK_LT(size, static_cast<size_t>(kuint32max));
750 DCHECK_GT(size, 0U); 789 DCHECK_GT(size, 0U);
751 message_size = size; 790 message_size = size;
752 } 791 }
753 792
754 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, 793 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle,
755 // if bit-for-bit compatible. 794 // if bit-for-bit compatible.
756 void CastSocket::MessageHeader::PrependToString(std::string* str) { 795 void CastSocket::MessageHeader::PrependToString(std::string* str) {
757 MessageHeader output = *this; 796 MessageHeader output = *this;
758 output.message_size = base::HostToNet32(message_size); 797 output.message_size = base::HostToNet32(message_size);
759 size_t header_size = base::checked_cast<size_t,uint32>( 798 size_t header_size = base::checked_cast<size_t, uint32>(
760 MessageHeader::header_size()); 799 MessageHeader::header_size());
761 scoped_ptr<char, base::FreeDeleter> char_array( 800 scoped_ptr<char, base::FreeDeleter> char_array(
762 static_cast<char*>(malloc(header_size))); 801 static_cast<char*>(malloc(header_size)));
763 memcpy(char_array.get(), &output, header_size); 802 memcpy(char_array.get(), &output, header_size);
764 str->insert(0, char_array.get(), header_size); 803 str->insert(0, char_array.get(), header_size);
765 } 804 }
766 805
767 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, 806 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle,
768 // if bit-for-bit compatible. 807 // if bit-for-bit compatible.
769 void CastSocket::MessageHeader::ReadFromIOBuffer( 808 void CastSocket::MessageHeader::ReadFromIOBuffer(
770 net::GrowableIOBuffer* buffer, MessageHeader* header) { 809 net::GrowableIOBuffer* buffer, MessageHeader* header) {
771 uint32 message_size; 810 uint32 message_size;
772 size_t header_size = base::checked_cast<size_t,uint32>( 811 size_t header_size = base::checked_cast<size_t, uint32>(
773 MessageHeader::header_size()); 812 MessageHeader::header_size());
774 memcpy(&message_size, buffer->StartOfBuffer(), header_size); 813 memcpy(&message_size, buffer->StartOfBuffer(), header_size);
775 header->message_size = base::NetToHost32(message_size); 814 header->message_size = base::NetToHost32(message_size);
776 } 815 }
777 816
778 std::string CastSocket::MessageHeader::ToString() { 817 std::string CastSocket::MessageHeader::ToString() {
779 return "{message_size: " + base::UintToString(message_size) + "}"; 818 return "{message_size: " + base::UintToString(message_size) + "}";
780 } 819 }
781 820
782 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) 821 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback)
783 : callback(callback) { } 822 : callback(callback) { }
784 823
785 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { 824 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) {
786 DCHECK(!io_buffer.get()); 825 DCHECK(!io_buffer.get());
787 std::string message_data; 826 std::string message_data;
788 if (!Serialize(message_proto, &message_data)) 827 if (!Serialize(message_proto, &message_data))
789 return false; 828 return false;
790 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), 829 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data),
791 message_data.size()); 830 message_data.size());
792 return true; 831 return true;
793 } 832 }
794 833
795 CastSocket::WriteRequest::~WriteRequest() { } 834 CastSocket::WriteRequest::~WriteRequest() { }
796 835
797 } // namespace cast_channel 836 } // namespace cast_channel
798 } // namespace core_api 837 } // namespace core_api
799 } // namespace extensions 838 } // namespace extensions
800 839
801 #undef VLOG_WITH_CONNECTION 840 #undef VLOG_WITH_CONNECTION
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698