Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(310)

Side by Side Diff: extensions/browser/api/cast_channel/cast_socket.cc

Issue 417403002: Remove weak pointers from CastSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Update comments. Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « extensions/browser/api/cast_channel/cast_socket.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "extensions/browser/api/cast_channel/cast_socket.h" 5 #include "extensions/browser/api/cast_channel/cast_socket.h"
6 6
7 #include <stdlib.h> 7 #include <stdlib.h>
8 #include <string.h> 8 #include <string.h>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 net_log_source_.id = net_log_->NextID(); 92 net_log_source_.id = net_log_->NextID();
93 93
94 // Reuse these buffers for each message. 94 // Reuse these buffers for each message.
95 header_read_buffer_ = new net::GrowableIOBuffer(); 95 header_read_buffer_ = new net::GrowableIOBuffer();
96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); 96 header_read_buffer_->SetCapacity(MessageHeader::header_size());
97 body_read_buffer_ = new net::GrowableIOBuffer(); 97 body_read_buffer_ = new net::GrowableIOBuffer();
98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); 98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size());
99 current_read_buffer_ = header_read_buffer_; 99 current_read_buffer_ = header_read_buffer_;
100 } 100 }
101 101
102 CastSocket::~CastSocket() { } 102 CastSocket::~CastSocket() {
103 // Ensure that resources are freed but do not run pending callbacks to avoid
104 // any re-entrancy.
105 CloseInternal();
106 }
103 107
104 ReadyState CastSocket::ready_state() const { 108 ReadyState CastSocket::ready_state() const {
105 return ready_state_; 109 return ready_state_;
106 } 110 }
107 111
108 ChannelError CastSocket::error_state() const { 112 ChannelError CastSocket::error_state() const {
109 return error_state_; 113 return error_state_;
110 } 114 }
111 115
112 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { 116 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 DCHECK(CalledOnValidThread()); 173 DCHECK(CalledOnValidThread());
170 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; 174 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_;
171 if (ready_state_ != READY_STATE_NONE) { 175 if (ready_state_ != READY_STATE_NONE) {
172 callback.Run(net::ERR_CONNECTION_FAILED); 176 callback.Run(net::ERR_CONNECTION_FAILED);
173 return; 177 return;
174 } 178 }
175 ready_state_ = READY_STATE_CONNECTING; 179 ready_state_ = READY_STATE_CONNECTING;
176 connect_callback_ = callback; 180 connect_callback_ = callback;
177 connect_state_ = CONN_STATE_TCP_CONNECT; 181 connect_state_ = CONN_STATE_TCP_CONNECT;
178 if (connect_timeout_.InMicroseconds() > 0) { 182 if (connect_timeout_.InMicroseconds() > 0) {
179 GetTimer()->Start( 183 DCHECK(connect_timeout_callback_.IsCancelled());
180 FROM_HERE, 184 connect_timeout_callback_.Reset(base::Bind(&CastSocket::CancelConnect,
181 connect_timeout_, 185 base::Unretained(this)));
182 base::Bind(&CastSocket::CancelConnect, AsWeakPtr())); 186 GetTimer()->Start(FROM_HERE,
187 connect_timeout_,
188 connect_timeout_callback_.callback());
183 } 189 }
184 DoConnectLoop(net::OK); 190 DoConnectLoop(net::OK);
185 } 191 }
186 192
187 void CastSocket::PostTaskToStartConnectLoop(int result) { 193 void CastSocket::PostTaskToStartConnectLoop(int result) {
188 DCHECK(CalledOnValidThread()); 194 DCHECK(CalledOnValidThread());
189 base::MessageLoop::current()->PostTask( 195 DCHECK(connect_loop_callback_.IsCancelled());
190 FROM_HERE, 196 connect_loop_callback_.Reset(base::Bind(&CastSocket::DoConnectLoop,
191 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr(), result)); 197 base::Unretained(this),
198 result));
199 base::MessageLoop::current()->PostTask(FROM_HERE,
200 connect_loop_callback_.callback());
192 } 201 }
193 202
194 void CastSocket::CancelConnect() { 203 void CastSocket::CancelConnect() {
195 DCHECK(CalledOnValidThread()); 204 DCHECK(CalledOnValidThread());
196 // Stop all pending connection setup tasks and report back to the client. 205 // Stop all pending connection setup tasks and report back to the client.
197 is_canceled_ = true; 206 is_canceled_ = true;
198 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; 207 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection.";
199 DoConnectCallback(net::ERR_TIMED_OUT); 208 DoConnectCallback(net::ERR_TIMED_OUT);
200 } 209 }
201 210
202 // This method performs the state machine transitions for connection flow. 211 // This method performs the state machine transitions for connection flow.
203 // There are two entry points to this method: 212 // There are two entry points to this method:
204 // 1. Connect method: this starts the flow 213 // 1. Connect method: this starts the flow
205 // 2. Callback from network operations that finish asynchronously 214 // 2. Callback from network operations that finish asynchronously
206 void CastSocket::DoConnectLoop(int result) { 215 void CastSocket::DoConnectLoop(int result) {
216 connect_loop_callback_.Cancel();
207 if (is_canceled_) { 217 if (is_canceled_) {
208 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; 218 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop.";
209 return; 219 return;
210 } 220 }
211 // Network operations can either finish synchronously or asynchronously. 221 // Network operations can either finish synchronously or asynchronously.
212 // This method executes the state machine transitions in a loop so that 222 // This method executes the state machine transitions in a loop so that
213 // correct state transitions happen even when network operations finish 223 // correct state transitions happen even when network operations finish
214 // synchronously. 224 // synchronously.
215 int rv = result; 225 int rv = result;
216 do { 226 do {
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 // b. The Do* method called did not change state 261 // b. The Do* method called did not change state
252 262
253 // Connect loop is finished: if there is no pending IO invoke the callback. 263 // Connect loop is finished: if there is no pending IO invoke the callback.
254 if (rv != net::ERR_IO_PENDING) { 264 if (rv != net::ERR_IO_PENDING) {
255 GetTimer()->Stop(); 265 GetTimer()->Stop();
256 DoConnectCallback(rv); 266 DoConnectCallback(rv);
257 } 267 }
258 } 268 }
259 269
260 int CastSocket::DoTcpConnect() { 270 int CastSocket::DoTcpConnect() {
271 DCHECK(connect_loop_callback_.IsCancelled());
261 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; 272 VLOG_WITH_CONNECTION(1) << "DoTcpConnect";
262 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; 273 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE;
263 tcp_socket_ = CreateTcpSocket(); 274 tcp_socket_ = CreateTcpSocket();
264 return tcp_socket_->Connect( 275 return tcp_socket_->Connect(
265 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); 276 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this)));
266 } 277 }
267 278
268 int CastSocket::DoTcpConnectComplete(int result) { 279 int CastSocket::DoTcpConnectComplete(int result) {
269 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; 280 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result;
270 if (result == net::OK) { 281 if (result == net::OK) {
271 // Enable TCP protocol-level keep-alive. 282 // Enable TCP protocol-level keep-alive.
272 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); 283 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs);
273 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; 284 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive.";
274 connect_state_ = CONN_STATE_SSL_CONNECT; 285 connect_state_ = CONN_STATE_SSL_CONNECT;
275 } 286 }
276 return result; 287 return result;
277 } 288 }
278 289
279 int CastSocket::DoSslConnect() { 290 int CastSocket::DoSslConnect() {
291 DCHECK(connect_loop_callback_.IsCancelled());
280 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; 292 VLOG_WITH_CONNECTION(1) << "DoSslConnect";
281 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; 293 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE;
282 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); 294 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>());
283 return socket_->Connect( 295 return socket_->Connect(
284 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); 296 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this)));
285 } 297 }
286 298
287 int CastSocket::DoSslConnectComplete(int result) { 299 int CastSocket::DoSslConnectComplete(int result) {
288 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; 300 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result;
289 if (result == net::ERR_CERT_AUTHORITY_INVALID && 301 if (result == net::ERR_CERT_AUTHORITY_INVALID &&
290 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { 302 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) {
291 connect_state_ = CONN_STATE_TCP_CONNECT; 303 connect_state_ = CONN_STATE_TCP_CONNECT;
292 } else if (result == net::OK && 304 } else if (result == net::OK &&
293 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { 305 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) {
294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; 306 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND;
295 } 307 }
296 return result; 308 return result;
297 } 309 }
298 310
299 int CastSocket::DoAuthChallengeSend() { 311 int CastSocket::DoAuthChallengeSend() {
300 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; 312 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend";
301 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; 313 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE;
302 CastMessage challenge_message; 314 CastMessage challenge_message;
303 CreateAuthChallengeMessage(&challenge_message); 315 CreateAuthChallengeMessage(&challenge_message);
304 VLOG_WITH_CONNECTION(1) << "Sending challenge: " 316 VLOG_WITH_CONNECTION(1) << "Sending challenge: "
305 << CastMessageToString(challenge_message); 317 << CastMessageToString(challenge_message);
306 // Post a task to send auth challenge so that DoWriteLoop is not nested inside 318 // Post a task to send auth challenge so that DoWriteLoop is not nested inside
307 // DoConnectLoop. This is not strictly necessary but keeps the write loop 319 // DoConnectLoop. This is not strictly necessary but keeps the write loop
308 // code decoupled from connect loop code. 320 // code decoupled from connect loop code.
321 DCHECK(send_auth_challenge_callback_.IsCancelled());
322 send_auth_challenge_callback_.Reset(
323 base::Bind(&CastSocket::SendCastMessageInternal,
324 base::Unretained(this),
325 challenge_message,
326 base::Bind(&CastSocket::DoAuthChallengeSendWriteComplete,
327 base::Unretained(this))));
309 base::MessageLoop::current()->PostTask( 328 base::MessageLoop::current()->PostTask(
310 FROM_HERE, 329 FROM_HERE,
311 base::Bind(&CastSocket::SendCastMessageInternal, 330 send_auth_challenge_callback_.callback());
312 AsWeakPtr(),
313 challenge_message,
314 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())));
315 // Always return IO_PENDING since the result is always asynchronous. 331 // Always return IO_PENDING since the result is always asynchronous.
316 return net::ERR_IO_PENDING; 332 return net::ERR_IO_PENDING;
317 } 333 }
318 334
335 void CastSocket::DoAuthChallengeSendWriteComplete(int result) {
336 send_auth_challenge_callback_.Cancel();
337 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result;
338 DCHECK_GT(result, 0);
339 DCHECK_EQ(write_queue_.size(), 1UL);
340 PostTaskToStartConnectLoop(result);
341 }
342
319 int CastSocket::DoAuthChallengeSendComplete(int result) { 343 int CastSocket::DoAuthChallengeSendComplete(int result) {
320 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; 344 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result;
321 if (result < 0) 345 if (result < 0)
322 return result; 346 return result;
323 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; 347 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE;
324 // Post a task to start read loop so that DoReadLoop is not nested inside 348 // Post a task to start read loop so that DoReadLoop is not nested inside
325 // DoConnectLoop. This is not strictly necessary but keeps the read loop 349 // DoConnectLoop. This is not strictly necessary but keeps the read loop
326 // code decoupled from connect loop code. 350 // code decoupled from connect loop code.
327 PostTaskToStartReadLoop(); 351 PostTaskToStartReadLoop();
328 // Always return IO_PENDING since the result is always asynchronous. 352 // Always return IO_PENDING since the result is always asynchronous.
(...skipping 18 matching lines...) Expand all
347 } else if (result == net::ERR_TIMED_OUT) { 371 } else if (result == net::ERR_TIMED_OUT) {
348 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; 372 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT;
349 } else { 373 } else {
350 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; 374 error_state_ = CHANNEL_ERROR_CONNECT_ERROR;
351 } 375 }
352 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; 376 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback";
353 base::ResetAndReturn(&connect_callback_).Run(result); 377 base::ResetAndReturn(&connect_callback_).Run(result);
354 } 378 }
355 379
356 void CastSocket::Close(const net::CompletionCallback& callback) { 380 void CastSocket::Close(const net::CompletionCallback& callback) {
357 DCHECK(CalledOnValidThread()); 381 CloseInternal();
382 RunPendingCallbacksOnClose();
383 // Run this callback last. It may delete the socket.
384 callback.Run(net::OK);
385 }
386
387 void CastSocket::CloseInternal() {
388 // TODO(mfoltz): Enforce this when CastChannelAPITest is rewritten to create
389 // and free sockets on the same thread. crbug.com/398242
390 // DCHECK(CalledOnValidThread());
391 if (ready_state_ == READY_STATE_CLOSED) {
392 return;
393 }
358 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; 394 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_;
359 tcp_socket_.reset(); 395 tcp_socket_.reset();
360 socket_.reset(); 396 socket_.reset();
361 cert_verifier_.reset(); 397 cert_verifier_.reset();
362 transport_security_state_.reset(); 398 transport_security_state_.reset();
399 GetTimer()->Stop();
400
401 // Cancel callbacks that we queued ourselves to re-enter the connect or read
402 // loops.
403 connect_loop_callback_.Cancel();
404 send_auth_challenge_callback_.Cancel();
405 read_loop_callback_.Cancel();
406 connect_timeout_callback_.Cancel();
363 ready_state_ = READY_STATE_CLOSED; 407 ready_state_ = READY_STATE_CLOSED;
364 callback.Run(net::OK); 408 }
365 // |callback| can delete |this| 409
410 void CastSocket::RunPendingCallbacksOnClose() {
411 DCHECK_EQ(ready_state_, READY_STATE_CLOSED);
412 if (!connect_callback_.is_null()) {
413 connect_callback_.Run(net::ERR_CONNECTION_FAILED);
414 connect_callback_.Reset();
415 }
416 for (; !write_queue_.empty(); write_queue_.pop()) {
417 net::CompletionCallback& callback = write_queue_.front().callback;
418 callback.Run(net::ERR_FAILED);
419 callback.Reset();
420 }
366 } 421 }
367 422
368 void CastSocket::SendMessage(const MessageInfo& message, 423 void CastSocket::SendMessage(const MessageInfo& message,
369 const net::CompletionCallback& callback) { 424 const net::CompletionCallback& callback) {
370 DCHECK(CalledOnValidThread()); 425 DCHECK(CalledOnValidThread());
371 if (ready_state_ != READY_STATE_OPEN) { 426 if (ready_state_ != READY_STATE_OPEN) {
372 callback.Run(net::ERR_FAILED); 427 callback.Run(net::ERR_FAILED);
373 return; 428 return;
374 } 429 }
375 CastMessage message_proto; 430 CastMessage message_proto;
376 if (!MessageInfoToCastMessage(message, &message_proto)) { 431 if (!MessageInfoToCastMessage(message, &message_proto)) {
377 callback.Run(net::ERR_FAILED); 432 callback.Run(net::ERR_FAILED);
378 return; 433 return;
379 } 434 }
380
381 SendCastMessageInternal(message_proto, callback); 435 SendCastMessageInternal(message_proto, callback);
382 } 436 }
383 437
384 void CastSocket::SendCastMessageInternal( 438 void CastSocket::SendCastMessageInternal(
385 const CastMessage& message, 439 const CastMessage& message,
386 const net::CompletionCallback& callback) { 440 const net::CompletionCallback& callback) {
387 WriteRequest write_request(callback); 441 WriteRequest write_request(callback);
388 if (!write_request.SetContent(message)) { 442 if (!write_request.SetContent(message)) {
389 callback.Run(net::ERR_FAILED); 443 callback.Run(net::ERR_FAILED);
390 return; 444 return;
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
447 501
448 int CastSocket::DoWrite() { 502 int CastSocket::DoWrite() {
449 DCHECK(!write_queue_.empty()); 503 DCHECK(!write_queue_.empty());
450 WriteRequest& request = write_queue_.front(); 504 WriteRequest& request = write_queue_.front();
451 505
452 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " 506 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
453 << request.io_buffer->size() << " bytes_written " 507 << request.io_buffer->size() << " bytes_written "
454 << request.io_buffer->BytesConsumed(); 508 << request.io_buffer->BytesConsumed();
455 509
456 write_state_ = WRITE_STATE_WRITE_COMPLETE; 510 write_state_ = WRITE_STATE_WRITE_COMPLETE;
457
458 return socket_->Write( 511 return socket_->Write(
459 request.io_buffer.get(), 512 request.io_buffer.get(),
460 request.io_buffer->BytesRemaining(), 513 request.io_buffer->BytesRemaining(),
461 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr())); 514 base::Bind(&CastSocket::DoWriteLoop, base::Unretained(this)));
462 } 515 }
463 516
464 int CastSocket::DoWriteComplete(int result) { 517 int CastSocket::DoWriteComplete(int result) {
465 DCHECK(!write_queue_.empty()); 518 DCHECK(!write_queue_.empty());
466 if (result <= 0) { // NOTE that 0 also indicates an error 519 if (result <= 0) { // NOTE that 0 also indicates an error
467 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; 520 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
468 write_state_ = WRITE_STATE_ERROR; 521 write_state_ = WRITE_STATE_ERROR;
469 return result == 0 ? net::ERR_FAILED : result; 522 return result == 0 ? net::ERR_FAILED : result;
470 } 523 }
471 524
472 // Some bytes were successfully written 525 // Some bytes were successfully written
473 WriteRequest& request = write_queue_.front(); 526 WriteRequest& request = write_queue_.front();
474 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; 527 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
475 io_buffer->DidConsume(result); 528 io_buffer->DidConsume(result);
476 if (io_buffer->BytesRemaining() == 0) // Message fully sent 529 if (io_buffer->BytesRemaining() == 0) // Message fully sent
477 write_state_ = WRITE_STATE_DO_CALLBACK; 530 write_state_ = WRITE_STATE_DO_CALLBACK;
478 else 531 else
479 write_state_ = WRITE_STATE_WRITE; 532 write_state_ = WRITE_STATE_WRITE;
480 533
481 return net::OK; 534 return net::OK;
482 } 535 }
483 536
484 int CastSocket::DoWriteCallback() { 537 int CastSocket::DoWriteCallback() {
485 DCHECK(!write_queue_.empty()); 538 DCHECK(!write_queue_.empty());
539 write_state_ = WRITE_STATE_WRITE;
486 WriteRequest& request = write_queue_.front(); 540 WriteRequest& request = write_queue_.front();
487 int bytes_consumed = request.io_buffer->BytesConsumed(); 541 int bytes_consumed = request.io_buffer->BytesConsumed();
488 542 request.callback.Run(bytes_consumed);
489 // If inside connection flow, then there should be exaclty one item in 543 write_queue_.pop();
490 // the write queue.
491 if (ready_state_ == READY_STATE_CONNECTING) {
492 write_queue_.pop();
493 DCHECK(write_queue_.empty());
494 PostTaskToStartConnectLoop(bytes_consumed);
495 } else {
496 WriteRequest& request = write_queue_.front();
497 request.callback.Run(bytes_consumed);
498 write_queue_.pop();
499 }
500 write_state_ = WRITE_STATE_WRITE;
501 return net::OK; 544 return net::OK;
502 } 545 }
503 546
504 int CastSocket::DoWriteError(int result) { 547 int CastSocket::DoWriteError(int result) {
505 DCHECK(!write_queue_.empty()); 548 DCHECK(!write_queue_.empty());
506 DCHECK_LT(result, 0); 549 DCHECK_LT(result, 0);
507 550
508 // If inside connection flow, then there should be exactly one item in 551 // If inside connection flow, then there should be exactly one item in
509 // the write queue. 552 // the write queue.
510 if (ready_state_ == READY_STATE_CONNECTING) { 553 if (ready_state_ == READY_STATE_CONNECTING) {
511 write_queue_.pop(); 554 write_queue_.pop();
512 DCHECK(write_queue_.empty()); 555 DCHECK(write_queue_.empty());
513 PostTaskToStartConnectLoop(result); 556 PostTaskToStartConnectLoop(result);
514 // Connect loop will handle the error. Return net::OK so that write flow 557 // Connect loop will handle the error. Return net::OK so that write flow
515 // does not try to report error also. 558 // does not try to report error also.
516 return net::OK; 559 return net::OK;
517 } 560 }
518 561
519 while (!write_queue_.empty()) { 562 while (!write_queue_.empty()) {
520 WriteRequest& request = write_queue_.front(); 563 WriteRequest& request = write_queue_.front();
521 request.callback.Run(result); 564 request.callback.Run(result);
522 write_queue_.pop(); 565 write_queue_.pop();
523 } 566 }
524 return net::ERR_FAILED; 567 return net::ERR_FAILED;
525 } 568 }
526 569
527 void CastSocket::PostTaskToStartReadLoop() { 570 void CastSocket::PostTaskToStartReadLoop() {
528 DCHECK(CalledOnValidThread()); 571 DCHECK(CalledOnValidThread());
529 base::MessageLoop::current()->PostTask( 572 DCHECK(read_loop_callback_.IsCancelled());
530 FROM_HERE, 573 read_loop_callback_.Reset(base::Bind(&CastSocket::StartReadLoop,
531 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr())); 574 base::Unretained(this)));
575 base::MessageLoop::current()->PostTask(FROM_HERE,
576 read_loop_callback_.callback());
532 } 577 }
533 578
534 void CastSocket::StartReadLoop() { 579 void CastSocket::StartReadLoop() {
580 read_loop_callback_.Cancel();
535 // Read loop would have already been started if read state is not NONE 581 // Read loop would have already been started if read state is not NONE
536 if (read_state_ == READ_STATE_NONE) { 582 if (read_state_ == READ_STATE_NONE) {
537 read_state_ = READ_STATE_READ; 583 read_state_ = READ_STATE_READ;
538 DoReadLoop(net::OK); 584 DoReadLoop(net::OK);
539 } 585 }
540 } 586 }
541 587
542 void CastSocket::DoReadLoop(int result) { 588 void CastSocket::DoReadLoop(int result) {
543 DCHECK(CalledOnValidThread()); 589 DCHECK(CalledOnValidThread());
544 // Network operations can either finish synchronously or asynchronously. 590 // Network operations can either finish synchronously or asynchronously.
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
596 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); 642 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset();
597 current_read_buffer_ = body_read_buffer_; 643 current_read_buffer_ = body_read_buffer_;
598 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); 644 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size());
599 } 645 }
600 CHECK_GT(num_bytes_to_read, 0U); 646 CHECK_GT(num_bytes_to_read, 0U);
601 647
602 // Read up to num_bytes_to_read into |current_read_buffer_|. 648 // Read up to num_bytes_to_read into |current_read_buffer_|.
603 return socket_->Read( 649 return socket_->Read(
604 current_read_buffer_.get(), 650 current_read_buffer_.get(),
605 num_bytes_to_read, 651 num_bytes_to_read,
606 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr())); 652 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this)));
607 } 653 }
608 654
609 int CastSocket::DoReadComplete(int result) { 655 int CastSocket::DoReadComplete(int result) {
610 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result 656 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result
611 << " header offset = " 657 << " header offset = "
612 << header_read_buffer_->offset() 658 << header_read_buffer_->offset()
613 << " body offset = " << body_read_buffer_->offset(); 659 << " body offset = " << body_read_buffer_->offset();
614 if (result <= 0) { // 0 means EOF: the peer closed the socket 660 if (result <= 0) { // 0 means EOF: the peer closed the socket
615 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; 661 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket";
616 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; 662 error_state_ = CHANNEL_ERROR_SOCKET_ERROR;
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
716 return false; 762 return false;
717 } 763 }
718 CastSocket::MessageHeader header; 764 CastSocket::MessageHeader header;
719 header.SetMessageSize(message_size); 765 header.SetMessageSize(message_size);
720 header.PrependToString(message_data); 766 header.PrependToString(message_data);
721 return true; 767 return true;
722 } 768 }
723 769
724 void CastSocket::CloseWithError(ChannelError error) { 770 void CastSocket::CloseWithError(ChannelError error) {
725 DCHECK(CalledOnValidThread()); 771 DCHECK(CalledOnValidThread());
726 socket_.reset(NULL); 772 CloseInternal();
727 ready_state_ = READY_STATE_CLOSED;
728 error_state_ = error; 773 error_state_ = error;
774 RunPendingCallbacksOnClose();
729 if (delegate_) 775 if (delegate_)
730 delegate_->OnError(this, error); 776 delegate_->OnError(this, error);
731 } 777 }
732 778
733 std::string CastSocket::CastUrl() const { 779 std::string CastSocket::CastUrl() const {
734 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? 780 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ?
735 "casts://" : "cast://") + ip_endpoint_.ToString(); 781 "casts://" : "cast://") + ip_endpoint_.ToString();
736 } 782 }
737 783
738 bool CastSocket::CalledOnValidThread() const { 784 bool CastSocket::CalledOnValidThread() const {
(...skipping 10 matching lines...) Expand all
749 DCHECK_LT(size, static_cast<size_t>(kuint32max)); 795 DCHECK_LT(size, static_cast<size_t>(kuint32max));
750 DCHECK_GT(size, 0U); 796 DCHECK_GT(size, 0U);
751 message_size = size; 797 message_size = size;
752 } 798 }
753 799
754 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, 800 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle,
755 // if bit-for-bit compatible. 801 // if bit-for-bit compatible.
756 void CastSocket::MessageHeader::PrependToString(std::string* str) { 802 void CastSocket::MessageHeader::PrependToString(std::string* str) {
757 MessageHeader output = *this; 803 MessageHeader output = *this;
758 output.message_size = base::HostToNet32(message_size); 804 output.message_size = base::HostToNet32(message_size);
759 size_t header_size = base::checked_cast<size_t,uint32>( 805 size_t header_size = base::checked_cast<size_t, uint32>(
760 MessageHeader::header_size()); 806 MessageHeader::header_size());
761 scoped_ptr<char, base::FreeDeleter> char_array( 807 scoped_ptr<char, base::FreeDeleter> char_array(
762 static_cast<char*>(malloc(header_size))); 808 static_cast<char*>(malloc(header_size)));
763 memcpy(char_array.get(), &output, header_size); 809 memcpy(char_array.get(), &output, header_size);
764 str->insert(0, char_array.get(), header_size); 810 str->insert(0, char_array.get(), header_size);
765 } 811 }
766 812
767 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, 813 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle,
768 // if bit-for-bit compatible. 814 // if bit-for-bit compatible.
769 void CastSocket::MessageHeader::ReadFromIOBuffer( 815 void CastSocket::MessageHeader::ReadFromIOBuffer(
770 net::GrowableIOBuffer* buffer, MessageHeader* header) { 816 net::GrowableIOBuffer* buffer, MessageHeader* header) {
771 uint32 message_size; 817 uint32 message_size;
772 size_t header_size = base::checked_cast<size_t,uint32>( 818 size_t header_size = base::checked_cast<size_t, uint32>(
773 MessageHeader::header_size()); 819 MessageHeader::header_size());
774 memcpy(&message_size, buffer->StartOfBuffer(), header_size); 820 memcpy(&message_size, buffer->StartOfBuffer(), header_size);
775 header->message_size = base::NetToHost32(message_size); 821 header->message_size = base::NetToHost32(message_size);
776 } 822 }
777 823
778 std::string CastSocket::MessageHeader::ToString() { 824 std::string CastSocket::MessageHeader::ToString() {
779 return "{message_size: " + base::UintToString(message_size) + "}"; 825 return "{message_size: " + base::UintToString(message_size) + "}";
780 } 826 }
781 827
782 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) 828 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback)
783 : callback(callback) { } 829 : callback(callback) { }
784 830
785 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { 831 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) {
786 DCHECK(!io_buffer.get()); 832 DCHECK(!io_buffer.get());
787 std::string message_data; 833 std::string message_data;
788 if (!Serialize(message_proto, &message_data)) 834 if (!Serialize(message_proto, &message_data))
789 return false; 835 return false;
790 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), 836 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data),
791 message_data.size()); 837 message_data.size());
792 return true; 838 return true;
793 } 839 }
794 840
795 CastSocket::WriteRequest::~WriteRequest() { } 841 CastSocket::WriteRequest::~WriteRequest() { }
796 842
797 } // namespace cast_channel 843 } // namespace cast_channel
798 } // namespace core_api 844 } // namespace core_api
799 } // namespace extensions 845 } // namespace extensions
800 846
801 #undef VLOG_WITH_CONNECTION 847 #undef VLOG_WITH_CONNECTION
OLDNEW
« no previous file with comments | « extensions/browser/api/cast_channel/cast_socket.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698