OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "extensions/browser/api/cast_channel/cast_socket.h" | 5 #include "extensions/browser/api/cast_channel/cast_socket.h" |
6 | 6 |
7 #include <stdlib.h> | 7 #include <stdlib.h> |
8 #include <string.h> | 8 #include <string.h> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
92 net_log_source_.id = net_log_->NextID(); | 92 net_log_source_.id = net_log_->NextID(); |
93 | 93 |
94 // Reuse these buffers for each message. | 94 // Reuse these buffers for each message. |
95 header_read_buffer_ = new net::GrowableIOBuffer(); | 95 header_read_buffer_ = new net::GrowableIOBuffer(); |
96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); | 96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); |
97 body_read_buffer_ = new net::GrowableIOBuffer(); | 97 body_read_buffer_ = new net::GrowableIOBuffer(); |
98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); | 98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); |
99 current_read_buffer_ = header_read_buffer_; | 99 current_read_buffer_ = header_read_buffer_; |
100 } | 100 } |
101 | 101 |
102 CastSocket::~CastSocket() { } | 102 CastSocket::~CastSocket() { |
103 // Ensure that resources are freed but do not run pending callbacks to avoid | |
104 // any re-entrancy. | |
105 CloseInternal(); | |
Wez
2014/08/04 21:32:32
If the caller never deletes CastSocket, only calls
mark a. foltz
2014/08/04 22:20:43
On extension unload/browser close, the ApiResource
| |
106 } | |
103 | 107 |
104 ReadyState CastSocket::ready_state() const { | 108 ReadyState CastSocket::ready_state() const { |
105 return ready_state_; | 109 return ready_state_; |
106 } | 110 } |
107 | 111 |
108 ChannelError CastSocket::error_state() const { | 112 ChannelError CastSocket::error_state() const { |
109 return error_state_; | 113 return error_state_; |
110 } | 114 } |
111 | 115 |
112 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { | 116 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
169 DCHECK(CalledOnValidThread()); | 173 DCHECK(CalledOnValidThread()); |
170 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; | 174 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; |
171 if (ready_state_ != READY_STATE_NONE) { | 175 if (ready_state_ != READY_STATE_NONE) { |
172 callback.Run(net::ERR_CONNECTION_FAILED); | 176 callback.Run(net::ERR_CONNECTION_FAILED); |
173 return; | 177 return; |
174 } | 178 } |
175 ready_state_ = READY_STATE_CONNECTING; | 179 ready_state_ = READY_STATE_CONNECTING; |
176 connect_callback_ = callback; | 180 connect_callback_ = callback; |
177 connect_state_ = CONN_STATE_TCP_CONNECT; | 181 connect_state_ = CONN_STATE_TCP_CONNECT; |
178 if (connect_timeout_.InMicroseconds() > 0) { | 182 if (connect_timeout_.InMicroseconds() > 0) { |
179 GetTimer()->Start( | 183 DCHECK(connect_timeout_callback_.IsCancelled()); |
180 FROM_HERE, | 184 connect_timeout_callback_.Reset(base::Bind(&CastSocket::CancelConnect, |
181 connect_timeout_, | 185 base::Unretained(this))); |
182 base::Bind(&CastSocket::CancelConnect, AsWeakPtr())); | 186 GetTimer()->Start(FROM_HERE, |
187 connect_timeout_, | |
188 connect_timeout_callback_.callback()); | |
183 } | 189 } |
184 DoConnectLoop(net::OK); | 190 DoConnectLoop(net::OK); |
185 } | 191 } |
186 | 192 |
187 void CastSocket::PostTaskToStartConnectLoop(int result) { | 193 void CastSocket::PostTaskToStartConnectLoop(int result) { |
188 DCHECK(CalledOnValidThread()); | 194 DCHECK(CalledOnValidThread()); |
189 base::MessageLoop::current()->PostTask( | 195 DCHECK(connect_loop_callback_.IsCancelled()); |
190 FROM_HERE, | 196 connect_loop_callback_.Reset(base::Bind(&CastSocket::DoConnectLoop, |
191 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr(), result)); | 197 base::Unretained(this), |
198 result)); | |
199 base::MessageLoop::current()->PostTask(FROM_HERE, | |
200 connect_loop_callback_.callback()); | |
192 } | 201 } |
193 | 202 |
194 void CastSocket::CancelConnect() { | 203 void CastSocket::CancelConnect() { |
195 DCHECK(CalledOnValidThread()); | 204 DCHECK(CalledOnValidThread()); |
196 // Stop all pending connection setup tasks and report back to the client. | 205 // Stop all pending connection setup tasks and report back to the client. |
197 is_canceled_ = true; | 206 is_canceled_ = true; |
198 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; | 207 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; |
199 DoConnectCallback(net::ERR_TIMED_OUT); | 208 DoConnectCallback(net::ERR_TIMED_OUT); |
200 } | 209 } |
201 | 210 |
202 // This method performs the state machine transitions for connection flow. | 211 // This method performs the state machine transitions for connection flow. |
203 // There are two entry points to this method: | 212 // There are two entry points to this method: |
204 // 1. Connect method: this starts the flow | 213 // 1. Connect method: this starts the flow |
205 // 2. Callback from network operations that finish asynchronously | 214 // 2. Callback from network operations that finish asynchronously |
206 void CastSocket::DoConnectLoop(int result) { | 215 void CastSocket::DoConnectLoop(int result) { |
216 connect_loop_callback_.Cancel(); | |
207 if (is_canceled_) { | 217 if (is_canceled_) { |
208 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; | 218 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; |
209 return; | 219 return; |
210 } | 220 } |
211 // Network operations can either finish synchronously or asynchronously. | 221 // Network operations can either finish synchronously or asynchronously. |
212 // This method executes the state machine transitions in a loop so that | 222 // This method executes the state machine transitions in a loop so that |
213 // correct state transitions happen even when network operations finish | 223 // correct state transitions happen even when network operations finish |
214 // synchronously. | 224 // synchronously. |
215 int rv = result; | 225 int rv = result; |
216 do { | 226 do { |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
251 // b. The Do* method called did not change state | 261 // b. The Do* method called did not change state |
252 | 262 |
253 // Connect loop is finished: if there is no pending IO invoke the callback. | 263 // Connect loop is finished: if there is no pending IO invoke the callback. |
254 if (rv != net::ERR_IO_PENDING) { | 264 if (rv != net::ERR_IO_PENDING) { |
255 GetTimer()->Stop(); | 265 GetTimer()->Stop(); |
256 DoConnectCallback(rv); | 266 DoConnectCallback(rv); |
257 } | 267 } |
258 } | 268 } |
259 | 269 |
260 int CastSocket::DoTcpConnect() { | 270 int CastSocket::DoTcpConnect() { |
271 DCHECK(connect_loop_callback_.IsCancelled()); | |
261 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; | 272 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; |
262 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; | 273 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; |
263 tcp_socket_ = CreateTcpSocket(); | 274 tcp_socket_ = CreateTcpSocket(); |
264 return tcp_socket_->Connect( | 275 return tcp_socket_->Connect( |
265 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); | 276 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this))); |
266 } | 277 } |
267 | 278 |
268 int CastSocket::DoTcpConnectComplete(int result) { | 279 int CastSocket::DoTcpConnectComplete(int result) { |
269 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; | 280 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; |
270 if (result == net::OK) { | 281 if (result == net::OK) { |
271 // Enable TCP protocol-level keep-alive. | 282 // Enable TCP protocol-level keep-alive. |
272 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); | 283 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); |
273 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; | 284 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; |
274 connect_state_ = CONN_STATE_SSL_CONNECT; | 285 connect_state_ = CONN_STATE_SSL_CONNECT; |
275 } | 286 } |
276 return result; | 287 return result; |
277 } | 288 } |
278 | 289 |
279 int CastSocket::DoSslConnect() { | 290 int CastSocket::DoSslConnect() { |
291 DCHECK(connect_loop_callback_.IsCancelled()); | |
280 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; | 292 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; |
281 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; | 293 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; |
282 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); | 294 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); |
283 return socket_->Connect( | 295 return socket_->Connect( |
284 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); | 296 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this))); |
285 } | 297 } |
286 | 298 |
287 int CastSocket::DoSslConnectComplete(int result) { | 299 int CastSocket::DoSslConnectComplete(int result) { |
288 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; | 300 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; |
289 if (result == net::ERR_CERT_AUTHORITY_INVALID && | 301 if (result == net::ERR_CERT_AUTHORITY_INVALID && |
290 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { | 302 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { |
291 connect_state_ = CONN_STATE_TCP_CONNECT; | 303 connect_state_ = CONN_STATE_TCP_CONNECT; |
292 } else if (result == net::OK && | 304 } else if (result == net::OK && |
293 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { | 305 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { |
294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; | 306 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; |
295 } | 307 } |
296 return result; | 308 return result; |
297 } | 309 } |
298 | 310 |
299 int CastSocket::DoAuthChallengeSend() { | 311 int CastSocket::DoAuthChallengeSend() { |
300 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; | 312 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; |
301 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; | 313 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; |
302 CastMessage challenge_message; | 314 CastMessage challenge_message; |
303 CreateAuthChallengeMessage(&challenge_message); | 315 CreateAuthChallengeMessage(&challenge_message); |
304 VLOG_WITH_CONNECTION(1) << "Sending challenge: " | 316 VLOG_WITH_CONNECTION(1) << "Sending challenge: " |
305 << CastMessageToString(challenge_message); | 317 << CastMessageToString(challenge_message); |
306 // Post a task to send auth challenge so that DoWriteLoop is not nested inside | 318 // Post a task to send auth challenge so that DoWriteLoop is not nested inside |
307 // DoConnectLoop. This is not strictly necessary but keeps the write loop | 319 // DoConnectLoop. This is not strictly necessary but keeps the write loop |
308 // code decoupled from connect loop code. | 320 // code decoupled from connect loop code. |
321 DCHECK(send_auth_challenge_callback_.IsCancelled()); | |
322 send_auth_challenge_callback_.Reset( | |
323 base::Bind(&CastSocket::SendCastMessageInternal, | |
324 base::Unretained(this), | |
325 challenge_message, | |
326 base::Bind(&CastSocket::DoAuthChallengeSendWriteComplete, | |
327 base::Unretained(this)))); | |
309 base::MessageLoop::current()->PostTask( | 328 base::MessageLoop::current()->PostTask( |
310 FROM_HERE, | 329 FROM_HERE, |
311 base::Bind(&CastSocket::SendCastMessageInternal, | 330 send_auth_challenge_callback_.callback()); |
312 AsWeakPtr(), | |
313 challenge_message, | |
314 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr()))); | |
315 // Always return IO_PENDING since the result is always asynchronous. | 331 // Always return IO_PENDING since the result is always asynchronous. |
316 return net::ERR_IO_PENDING; | 332 return net::ERR_IO_PENDING; |
317 } | 333 } |
318 | 334 |
335 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { | |
336 send_auth_challenge_callback_.Cancel(); | |
337 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; | |
338 DCHECK_GT(result, 0); | |
339 DCHECK_EQ(write_queue_.size(), 1UL); | |
340 PostTaskToStartConnectLoop(result); | |
341 } | |
342 | |
319 int CastSocket::DoAuthChallengeSendComplete(int result) { | 343 int CastSocket::DoAuthChallengeSendComplete(int result) { |
320 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; | 344 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; |
321 if (result < 0) | 345 if (result < 0) |
322 return result; | 346 return result; |
323 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; | 347 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; |
324 // Post a task to start read loop so that DoReadLoop is not nested inside | 348 // Post a task to start read loop so that DoReadLoop is not nested inside |
325 // DoConnectLoop. This is not strictly necessary but keeps the read loop | 349 // DoConnectLoop. This is not strictly necessary but keeps the read loop |
326 // code decoupled from connect loop code. | 350 // code decoupled from connect loop code. |
327 PostTaskToStartReadLoop(); | 351 PostTaskToStartReadLoop(); |
328 // Always return IO_PENDING since the result is always asynchronous. | 352 // Always return IO_PENDING since the result is always asynchronous. |
(...skipping 18 matching lines...) Expand all Loading... | |
347 } else if (result == net::ERR_TIMED_OUT) { | 371 } else if (result == net::ERR_TIMED_OUT) { |
348 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; | 372 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; |
349 } else { | 373 } else { |
350 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; | 374 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; |
351 } | 375 } |
352 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; | 376 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; |
353 base::ResetAndReturn(&connect_callback_).Run(result); | 377 base::ResetAndReturn(&connect_callback_).Run(result); |
354 } | 378 } |
355 | 379 |
356 void CastSocket::Close(const net::CompletionCallback& callback) { | 380 void CastSocket::Close(const net::CompletionCallback& callback) { |
357 DCHECK(CalledOnValidThread()); | 381 CloseInternal(); |
382 RunPendingCallbacksOnClose(); | |
383 // Run this callback last. It may delete the socket. | |
384 callback.Run(net::OK); | |
385 } | |
386 | |
387 void CastSocket::CloseInternal() { | |
388 // TODO(mfoltz): Enforce this when CastChannelAPITest is rewritten to create | |
389 // and free sockets on the same thread. crbug.com/398242 | |
390 // DCHECK(CalledOnValidThread()); | |
391 if (ready_state_ == READY_STATE_CLOSED) { | |
392 return; | |
393 } | |
358 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; | 394 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; |
359 tcp_socket_.reset(); | 395 tcp_socket_.reset(); |
360 socket_.reset(); | 396 socket_.reset(); |
361 cert_verifier_.reset(); | 397 cert_verifier_.reset(); |
362 transport_security_state_.reset(); | 398 transport_security_state_.reset(); |
399 GetTimer()->Stop(); | |
400 | |
401 // Cancel callbacks that we queued ourselves to re-enter the connect or read | |
402 // loops. | |
403 connect_loop_callback_.Cancel(); | |
404 send_auth_challenge_callback_.Cancel(); | |
405 read_loop_callback_.Cancel(); | |
406 connect_timeout_callback_.Cancel(); | |
363 ready_state_ = READY_STATE_CLOSED; | 407 ready_state_ = READY_STATE_CLOSED; |
364 callback.Run(net::OK); | 408 } |
365 // |callback| can delete |this| | 409 |
410 void CastSocket::RunPendingCallbacksOnClose() { | |
411 DCHECK_EQ(ready_state_, READY_STATE_CLOSED); | |
412 if (!connect_callback_.is_null()) { | |
413 connect_callback_.Run(net::ERR_CONNECTION_FAILED); | |
414 connect_callback_.Reset(); | |
415 } | |
416 for (; !write_queue_.empty(); write_queue_.pop()) { | |
417 net::CompletionCallback& callback = write_queue_.front().callback; | |
418 callback.Run(net::ERR_FAILED); | |
419 callback.Reset(); | |
420 } | |
366 } | 421 } |
367 | 422 |
368 void CastSocket::SendMessage(const MessageInfo& message, | 423 void CastSocket::SendMessage(const MessageInfo& message, |
369 const net::CompletionCallback& callback) { | 424 const net::CompletionCallback& callback) { |
370 DCHECK(CalledOnValidThread()); | 425 DCHECK(CalledOnValidThread()); |
371 if (ready_state_ != READY_STATE_OPEN) { | 426 if (ready_state_ != READY_STATE_OPEN) { |
372 callback.Run(net::ERR_FAILED); | 427 callback.Run(net::ERR_FAILED); |
373 return; | 428 return; |
374 } | 429 } |
375 CastMessage message_proto; | 430 CastMessage message_proto; |
376 if (!MessageInfoToCastMessage(message, &message_proto)) { | 431 if (!MessageInfoToCastMessage(message, &message_proto)) { |
377 callback.Run(net::ERR_FAILED); | 432 callback.Run(net::ERR_FAILED); |
378 return; | 433 return; |
379 } | 434 } |
380 | |
381 SendCastMessageInternal(message_proto, callback); | 435 SendCastMessageInternal(message_proto, callback); |
382 } | 436 } |
383 | 437 |
384 void CastSocket::SendCastMessageInternal( | 438 void CastSocket::SendCastMessageInternal( |
385 const CastMessage& message, | 439 const CastMessage& message, |
386 const net::CompletionCallback& callback) { | 440 const net::CompletionCallback& callback) { |
387 WriteRequest write_request(callback); | 441 WriteRequest write_request(callback); |
388 if (!write_request.SetContent(message)) { | 442 if (!write_request.SetContent(message)) { |
389 callback.Run(net::ERR_FAILED); | 443 callback.Run(net::ERR_FAILED); |
390 return; | 444 return; |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
447 | 501 |
448 int CastSocket::DoWrite() { | 502 int CastSocket::DoWrite() { |
449 DCHECK(!write_queue_.empty()); | 503 DCHECK(!write_queue_.empty()); |
450 WriteRequest& request = write_queue_.front(); | 504 WriteRequest& request = write_queue_.front(); |
451 | 505 |
452 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " | 506 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " |
453 << request.io_buffer->size() << " bytes_written " | 507 << request.io_buffer->size() << " bytes_written " |
454 << request.io_buffer->BytesConsumed(); | 508 << request.io_buffer->BytesConsumed(); |
455 | 509 |
456 write_state_ = WRITE_STATE_WRITE_COMPLETE; | 510 write_state_ = WRITE_STATE_WRITE_COMPLETE; |
457 | |
458 return socket_->Write( | 511 return socket_->Write( |
459 request.io_buffer.get(), | 512 request.io_buffer.get(), |
460 request.io_buffer->BytesRemaining(), | 513 request.io_buffer->BytesRemaining(), |
461 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr())); | 514 base::Bind(&CastSocket::DoWriteLoop, base::Unretained(this))); |
462 } | 515 } |
463 | 516 |
464 int CastSocket::DoWriteComplete(int result) { | 517 int CastSocket::DoWriteComplete(int result) { |
465 DCHECK(!write_queue_.empty()); | 518 DCHECK(!write_queue_.empty()); |
466 if (result <= 0) { // NOTE that 0 also indicates an error | 519 if (result <= 0) { // NOTE that 0 also indicates an error |
467 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; | 520 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; |
468 write_state_ = WRITE_STATE_ERROR; | 521 write_state_ = WRITE_STATE_ERROR; |
469 return result == 0 ? net::ERR_FAILED : result; | 522 return result == 0 ? net::ERR_FAILED : result; |
470 } | 523 } |
471 | 524 |
472 // Some bytes were successfully written | 525 // Some bytes were successfully written |
473 WriteRequest& request = write_queue_.front(); | 526 WriteRequest& request = write_queue_.front(); |
474 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; | 527 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; |
475 io_buffer->DidConsume(result); | 528 io_buffer->DidConsume(result); |
476 if (io_buffer->BytesRemaining() == 0) // Message fully sent | 529 if (io_buffer->BytesRemaining() == 0) // Message fully sent |
477 write_state_ = WRITE_STATE_DO_CALLBACK; | 530 write_state_ = WRITE_STATE_DO_CALLBACK; |
478 else | 531 else |
479 write_state_ = WRITE_STATE_WRITE; | 532 write_state_ = WRITE_STATE_WRITE; |
480 | 533 |
481 return net::OK; | 534 return net::OK; |
482 } | 535 } |
483 | 536 |
484 int CastSocket::DoWriteCallback() { | 537 int CastSocket::DoWriteCallback() { |
485 DCHECK(!write_queue_.empty()); | 538 DCHECK(!write_queue_.empty()); |
539 write_state_ = WRITE_STATE_WRITE; | |
486 WriteRequest& request = write_queue_.front(); | 540 WriteRequest& request = write_queue_.front(); |
487 int bytes_consumed = request.io_buffer->BytesConsumed(); | 541 int bytes_consumed = request.io_buffer->BytesConsumed(); |
488 | 542 request.callback.Run(bytes_consumed); |
489 // If inside connection flow, then there should be exaclty one item in | 543 write_queue_.pop(); |
490 // the write queue. | |
491 if (ready_state_ == READY_STATE_CONNECTING) { | |
492 write_queue_.pop(); | |
493 DCHECK(write_queue_.empty()); | |
494 PostTaskToStartConnectLoop(bytes_consumed); | |
495 } else { | |
496 WriteRequest& request = write_queue_.front(); | |
497 request.callback.Run(bytes_consumed); | |
498 write_queue_.pop(); | |
499 } | |
500 write_state_ = WRITE_STATE_WRITE; | |
501 return net::OK; | 544 return net::OK; |
502 } | 545 } |
503 | 546 |
504 int CastSocket::DoWriteError(int result) { | 547 int CastSocket::DoWriteError(int result) { |
505 DCHECK(!write_queue_.empty()); | 548 DCHECK(!write_queue_.empty()); |
506 DCHECK_LT(result, 0); | 549 DCHECK_LT(result, 0); |
507 | 550 |
508 // If inside connection flow, then there should be exactly one item in | 551 // If inside connection flow, then there should be exactly one item in |
509 // the write queue. | 552 // the write queue. |
510 if (ready_state_ == READY_STATE_CONNECTING) { | 553 if (ready_state_ == READY_STATE_CONNECTING) { |
511 write_queue_.pop(); | 554 write_queue_.pop(); |
512 DCHECK(write_queue_.empty()); | 555 DCHECK(write_queue_.empty()); |
513 PostTaskToStartConnectLoop(result); | 556 PostTaskToStartConnectLoop(result); |
514 // Connect loop will handle the error. Return net::OK so that write flow | 557 // Connect loop will handle the error. Return net::OK so that write flow |
515 // does not try to report error also. | 558 // does not try to report error also. |
516 return net::OK; | 559 return net::OK; |
517 } | 560 } |
518 | 561 |
519 while (!write_queue_.empty()) { | 562 while (!write_queue_.empty()) { |
520 WriteRequest& request = write_queue_.front(); | 563 WriteRequest& request = write_queue_.front(); |
521 request.callback.Run(result); | 564 request.callback.Run(result); |
522 write_queue_.pop(); | 565 write_queue_.pop(); |
523 } | 566 } |
524 return net::ERR_FAILED; | 567 return net::ERR_FAILED; |
525 } | 568 } |
526 | 569 |
527 void CastSocket::PostTaskToStartReadLoop() { | 570 void CastSocket::PostTaskToStartReadLoop() { |
528 DCHECK(CalledOnValidThread()); | 571 DCHECK(CalledOnValidThread()); |
529 base::MessageLoop::current()->PostTask( | 572 DCHECK(read_loop_callback_.IsCancelled()); |
530 FROM_HERE, | 573 read_loop_callback_.Reset(base::Bind(&CastSocket::StartReadLoop, |
531 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr())); | 574 base::Unretained(this))); |
575 base::MessageLoop::current()->PostTask(FROM_HERE, | |
576 read_loop_callback_.callback()); | |
532 } | 577 } |
533 | 578 |
534 void CastSocket::StartReadLoop() { | 579 void CastSocket::StartReadLoop() { |
580 read_loop_callback_.Cancel(); | |
535 // Read loop would have already been started if read state is not NONE | 581 // Read loop would have already been started if read state is not NONE |
536 if (read_state_ == READ_STATE_NONE) { | 582 if (read_state_ == READ_STATE_NONE) { |
537 read_state_ = READ_STATE_READ; | 583 read_state_ = READ_STATE_READ; |
538 DoReadLoop(net::OK); | 584 DoReadLoop(net::OK); |
539 } | 585 } |
540 } | 586 } |
541 | 587 |
542 void CastSocket::DoReadLoop(int result) { | 588 void CastSocket::DoReadLoop(int result) { |
543 DCHECK(CalledOnValidThread()); | 589 DCHECK(CalledOnValidThread()); |
544 // Network operations can either finish synchronously or asynchronously. | 590 // Network operations can either finish synchronously or asynchronously. |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
596 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); | 642 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); |
597 current_read_buffer_ = body_read_buffer_; | 643 current_read_buffer_ = body_read_buffer_; |
598 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); | 644 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); |
599 } | 645 } |
600 CHECK_GT(num_bytes_to_read, 0U); | 646 CHECK_GT(num_bytes_to_read, 0U); |
601 | 647 |
602 // Read up to num_bytes_to_read into |current_read_buffer_|. | 648 // Read up to num_bytes_to_read into |current_read_buffer_|. |
603 return socket_->Read( | 649 return socket_->Read( |
604 current_read_buffer_.get(), | 650 current_read_buffer_.get(), |
605 num_bytes_to_read, | 651 num_bytes_to_read, |
606 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr())); | 652 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); |
607 } | 653 } |
608 | 654 |
609 int CastSocket::DoReadComplete(int result) { | 655 int CastSocket::DoReadComplete(int result) { |
610 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result | 656 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result |
611 << " header offset = " | 657 << " header offset = " |
612 << header_read_buffer_->offset() | 658 << header_read_buffer_->offset() |
613 << " body offset = " << body_read_buffer_->offset(); | 659 << " body offset = " << body_read_buffer_->offset(); |
614 if (result <= 0) { // 0 means EOF: the peer closed the socket | 660 if (result <= 0) { // 0 means EOF: the peer closed the socket |
615 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; | 661 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; |
616 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; | 662 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
716 return false; | 762 return false; |
717 } | 763 } |
718 CastSocket::MessageHeader header; | 764 CastSocket::MessageHeader header; |
719 header.SetMessageSize(message_size); | 765 header.SetMessageSize(message_size); |
720 header.PrependToString(message_data); | 766 header.PrependToString(message_data); |
721 return true; | 767 return true; |
722 } | 768 } |
723 | 769 |
724 void CastSocket::CloseWithError(ChannelError error) { | 770 void CastSocket::CloseWithError(ChannelError error) { |
725 DCHECK(CalledOnValidThread()); | 771 DCHECK(CalledOnValidThread()); |
726 socket_.reset(NULL); | 772 CloseInternal(); |
727 ready_state_ = READY_STATE_CLOSED; | |
728 error_state_ = error; | 773 error_state_ = error; |
774 RunPendingCallbacksOnClose(); | |
729 if (delegate_) | 775 if (delegate_) |
730 delegate_->OnError(this, error); | 776 delegate_->OnError(this, error); |
731 } | 777 } |
732 | 778 |
733 std::string CastSocket::CastUrl() const { | 779 std::string CastSocket::CastUrl() const { |
734 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? | 780 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? |
735 "casts://" : "cast://") + ip_endpoint_.ToString(); | 781 "casts://" : "cast://") + ip_endpoint_.ToString(); |
736 } | 782 } |
737 | 783 |
738 bool CastSocket::CalledOnValidThread() const { | 784 bool CastSocket::CalledOnValidThread() const { |
(...skipping 10 matching lines...) Expand all Loading... | |
749 DCHECK_LT(size, static_cast<size_t>(kuint32max)); | 795 DCHECK_LT(size, static_cast<size_t>(kuint32max)); |
750 DCHECK_GT(size, 0U); | 796 DCHECK_GT(size, 0U); |
751 message_size = size; | 797 message_size = size; |
752 } | 798 } |
753 | 799 |
754 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, | 800 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, |
755 // if bit-for-bit compatible. | 801 // if bit-for-bit compatible. |
756 void CastSocket::MessageHeader::PrependToString(std::string* str) { | 802 void CastSocket::MessageHeader::PrependToString(std::string* str) { |
757 MessageHeader output = *this; | 803 MessageHeader output = *this; |
758 output.message_size = base::HostToNet32(message_size); | 804 output.message_size = base::HostToNet32(message_size); |
759 size_t header_size = base::checked_cast<size_t,uint32>( | 805 size_t header_size = base::checked_cast<size_t, uint32>( |
760 MessageHeader::header_size()); | 806 MessageHeader::header_size()); |
761 scoped_ptr<char, base::FreeDeleter> char_array( | 807 scoped_ptr<char, base::FreeDeleter> char_array( |
762 static_cast<char*>(malloc(header_size))); | 808 static_cast<char*>(malloc(header_size))); |
763 memcpy(char_array.get(), &output, header_size); | 809 memcpy(char_array.get(), &output, header_size); |
764 str->insert(0, char_array.get(), header_size); | 810 str->insert(0, char_array.get(), header_size); |
765 } | 811 } |
766 | 812 |
767 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, | 813 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, |
768 // if bit-for-bit compatible. | 814 // if bit-for-bit compatible. |
769 void CastSocket::MessageHeader::ReadFromIOBuffer( | 815 void CastSocket::MessageHeader::ReadFromIOBuffer( |
770 net::GrowableIOBuffer* buffer, MessageHeader* header) { | 816 net::GrowableIOBuffer* buffer, MessageHeader* header) { |
771 uint32 message_size; | 817 uint32 message_size; |
772 size_t header_size = base::checked_cast<size_t,uint32>( | 818 size_t header_size = base::checked_cast<size_t, uint32>( |
773 MessageHeader::header_size()); | 819 MessageHeader::header_size()); |
774 memcpy(&message_size, buffer->StartOfBuffer(), header_size); | 820 memcpy(&message_size, buffer->StartOfBuffer(), header_size); |
775 header->message_size = base::NetToHost32(message_size); | 821 header->message_size = base::NetToHost32(message_size); |
776 } | 822 } |
777 | 823 |
778 std::string CastSocket::MessageHeader::ToString() { | 824 std::string CastSocket::MessageHeader::ToString() { |
779 return "{message_size: " + base::UintToString(message_size) + "}"; | 825 return "{message_size: " + base::UintToString(message_size) + "}"; |
780 } | 826 } |
781 | 827 |
782 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) | 828 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) |
783 : callback(callback) { } | 829 : callback(callback) { } |
784 | 830 |
785 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { | 831 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { |
786 DCHECK(!io_buffer.get()); | 832 DCHECK(!io_buffer.get()); |
787 std::string message_data; | 833 std::string message_data; |
788 if (!Serialize(message_proto, &message_data)) | 834 if (!Serialize(message_proto, &message_data)) |
789 return false; | 835 return false; |
790 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), | 836 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), |
791 message_data.size()); | 837 message_data.size()); |
792 return true; | 838 return true; |
793 } | 839 } |
794 | 840 |
795 CastSocket::WriteRequest::~WriteRequest() { } | 841 CastSocket::WriteRequest::~WriteRequest() { } |
796 | 842 |
797 } // namespace cast_channel | 843 } // namespace cast_channel |
798 } // namespace core_api | 844 } // namespace core_api |
799 } // namespace extensions | 845 } // namespace extensions |
800 | 846 |
801 #undef VLOG_WITH_CONNECTION | 847 #undef VLOG_WITH_CONNECTION |
OLD | NEW |