OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "extensions/browser/api/cast_channel/cast_socket.h" | 5 #include "extensions/browser/api/cast_channel/cast_socket.h" |
6 | 6 |
7 #include <stdlib.h> | 7 #include <stdlib.h> |
8 #include <string.h> | 8 #include <string.h> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
92 net_log_source_.id = net_log_->NextID(); | 92 net_log_source_.id = net_log_->NextID(); |
93 | 93 |
94 // Reuse these buffers for each message. | 94 // Reuse these buffers for each message. |
95 header_read_buffer_ = new net::GrowableIOBuffer(); | 95 header_read_buffer_ = new net::GrowableIOBuffer(); |
96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); | 96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); |
97 body_read_buffer_ = new net::GrowableIOBuffer(); | 97 body_read_buffer_ = new net::GrowableIOBuffer(); |
98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); | 98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); |
99 current_read_buffer_ = header_read_buffer_; | 99 current_read_buffer_ = header_read_buffer_; |
100 } | 100 } |
101 | 101 |
102 CastSocket::~CastSocket() { } | 102 CastSocket::~CastSocket() { |
103 CloseInternal(); | |
104 } | |
103 | 105 |
104 ReadyState CastSocket::ready_state() const { | 106 ReadyState CastSocket::ready_state() const { |
105 return ready_state_; | 107 return ready_state_; |
106 } | 108 } |
107 | 109 |
108 ChannelError CastSocket::error_state() const { | 110 ChannelError CastSocket::error_state() const { |
109 return error_state_; | 111 return error_state_; |
110 } | 112 } |
111 | 113 |
112 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { | 114 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
169 DCHECK(CalledOnValidThread()); | 171 DCHECK(CalledOnValidThread()); |
170 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; | 172 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; |
171 if (ready_state_ != READY_STATE_NONE) { | 173 if (ready_state_ != READY_STATE_NONE) { |
172 callback.Run(net::ERR_CONNECTION_FAILED); | 174 callback.Run(net::ERR_CONNECTION_FAILED); |
173 return; | 175 return; |
174 } | 176 } |
175 ready_state_ = READY_STATE_CONNECTING; | 177 ready_state_ = READY_STATE_CONNECTING; |
176 connect_callback_ = callback; | 178 connect_callback_ = callback; |
177 connect_state_ = CONN_STATE_TCP_CONNECT; | 179 connect_state_ = CONN_STATE_TCP_CONNECT; |
178 if (connect_timeout_.InMicroseconds() > 0) { | 180 if (connect_timeout_.InMicroseconds() > 0) { |
179 GetTimer()->Start( | 181 DCHECK(connect_timeout_callback_.IsCancelled()); |
180 FROM_HERE, | 182 connect_timeout_callback_.Reset(base::Bind(&CastSocket::CancelConnect, |
181 connect_timeout_, | 183 base::Unretained(this))); |
182 base::Bind(&CastSocket::CancelConnect, AsWeakPtr())); | 184 GetTimer()->Start(FROM_HERE, |
185 connect_timeout_, | |
186 connect_timeout_callback_.callback()); | |
183 } | 187 } |
184 DoConnectLoop(net::OK); | 188 DoConnectLoop(net::OK); |
185 } | 189 } |
186 | 190 |
187 void CastSocket::PostTaskToStartConnectLoop(int result) { | 191 void CastSocket::PostTaskToStartConnectLoop(int result) { |
188 DCHECK(CalledOnValidThread()); | 192 DCHECK(CalledOnValidThread()); |
189 base::MessageLoop::current()->PostTask( | 193 DCHECK(connect_loop_callback_.IsCancelled()); |
190 FROM_HERE, | 194 connect_loop_callback_.Reset(base::Bind(&CastSocket::DoConnectLoop, |
191 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr(), result)); | 195 base::Unretained(this), |
196 result)); | |
197 base::MessageLoop::current()->PostTask(FROM_HERE, | |
198 connect_loop_callback_.callback()); | |
192 } | 199 } |
193 | 200 |
194 void CastSocket::CancelConnect() { | 201 void CastSocket::CancelConnect() { |
195 DCHECK(CalledOnValidThread()); | 202 DCHECK(CalledOnValidThread()); |
196 // Stop all pending connection setup tasks and report back to the client. | 203 // Stop all pending connection setup tasks and report back to the client. |
197 is_canceled_ = true; | 204 is_canceled_ = true; |
198 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; | 205 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; |
199 DoConnectCallback(net::ERR_TIMED_OUT); | 206 DoConnectCallback(net::ERR_TIMED_OUT); |
200 } | 207 } |
201 | 208 |
202 // This method performs the state machine transitions for connection flow. | 209 // This method performs the state machine transitions for connection flow. |
203 // There are two entry points to this method: | 210 // There are two entry points to this method: |
204 // 1. Connect method: this starts the flow | 211 // 1. Connect method: this starts the flow |
205 // 2. Callback from network operations that finish asynchronously | 212 // 2. Callback from network operations that finish asynchronously |
206 void CastSocket::DoConnectLoop(int result) { | 213 void CastSocket::DoConnectLoop(int result) { |
214 connect_loop_callback_.Cancel(); | |
207 if (is_canceled_) { | 215 if (is_canceled_) { |
208 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; | 216 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; |
209 return; | 217 return; |
210 } | 218 } |
211 // Network operations can either finish synchronously or asynchronously. | 219 // Network operations can either finish synchronously or asynchronously. |
212 // This method executes the state machine transitions in a loop so that | 220 // This method executes the state machine transitions in a loop so that |
213 // correct state transitions happen even when network operations finish | 221 // correct state transitions happen even when network operations finish |
214 // synchronously. | 222 // synchronously. |
215 int rv = result; | 223 int rv = result; |
216 do { | 224 do { |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
251 // b. The Do* method called did not change state | 259 // b. The Do* method called did not change state |
252 | 260 |
253 // Connect loop is finished: if there is no pending IO invoke the callback. | 261 // Connect loop is finished: if there is no pending IO invoke the callback. |
254 if (rv != net::ERR_IO_PENDING) { | 262 if (rv != net::ERR_IO_PENDING) { |
255 GetTimer()->Stop(); | 263 GetTimer()->Stop(); |
256 DoConnectCallback(rv); | 264 DoConnectCallback(rv); |
257 } | 265 } |
258 } | 266 } |
259 | 267 |
260 int CastSocket::DoTcpConnect() { | 268 int CastSocket::DoTcpConnect() { |
269 DCHECK(connect_loop_callback_.IsCancelled()); | |
261 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; | 270 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; |
262 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; | 271 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; |
263 tcp_socket_ = CreateTcpSocket(); | 272 tcp_socket_ = CreateTcpSocket(); |
264 return tcp_socket_->Connect( | 273 return tcp_socket_->Connect( |
265 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); | 274 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this))); |
266 } | 275 } |
267 | 276 |
268 int CastSocket::DoTcpConnectComplete(int result) { | 277 int CastSocket::DoTcpConnectComplete(int result) { |
269 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; | 278 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; |
270 if (result == net::OK) { | 279 if (result == net::OK) { |
271 // Enable TCP protocol-level keep-alive. | 280 // Enable TCP protocol-level keep-alive. |
272 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); | 281 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); |
273 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; | 282 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; |
274 connect_state_ = CONN_STATE_SSL_CONNECT; | 283 connect_state_ = CONN_STATE_SSL_CONNECT; |
275 } | 284 } |
276 return result; | 285 return result; |
277 } | 286 } |
278 | 287 |
279 int CastSocket::DoSslConnect() { | 288 int CastSocket::DoSslConnect() { |
289 DCHECK(connect_loop_callback_.IsCancelled()); | |
280 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; | 290 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; |
281 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; | 291 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; |
282 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); | 292 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); |
283 return socket_->Connect( | 293 return socket_->Connect( |
284 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); | 294 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this))); |
285 } | 295 } |
286 | 296 |
287 int CastSocket::DoSslConnectComplete(int result) { | 297 int CastSocket::DoSslConnectComplete(int result) { |
288 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; | 298 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; |
289 if (result == net::ERR_CERT_AUTHORITY_INVALID && | 299 if (result == net::ERR_CERT_AUTHORITY_INVALID && |
290 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { | 300 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { |
291 connect_state_ = CONN_STATE_TCP_CONNECT; | 301 connect_state_ = CONN_STATE_TCP_CONNECT; |
292 } else if (result == net::OK && | 302 } else if (result == net::OK && |
293 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { | 303 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { |
294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; | 304 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; |
295 } | 305 } |
296 return result; | 306 return result; |
297 } | 307 } |
298 | 308 |
299 int CastSocket::DoAuthChallengeSend() { | 309 int CastSocket::DoAuthChallengeSend() { |
300 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; | 310 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; |
301 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; | 311 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; |
302 CastMessage challenge_message; | 312 CastMessage challenge_message; |
303 CreateAuthChallengeMessage(&challenge_message); | 313 CreateAuthChallengeMessage(&challenge_message); |
304 VLOG_WITH_CONNECTION(1) << "Sending challenge: " | 314 VLOG_WITH_CONNECTION(1) << "Sending challenge: " |
305 << CastMessageToString(challenge_message); | 315 << CastMessageToString(challenge_message); |
306 // Post a task to send auth challenge so that DoWriteLoop is not nested inside | 316 // Post a task to send auth challenge so that DoWriteLoop is not nested inside |
307 // DoConnectLoop. This is not strictly necessary but keeps the write loop | 317 // DoConnectLoop. This is not strictly necessary but keeps the write loop |
308 // code decoupled from connect loop code. | 318 // code decoupled from connect loop code. |
319 DCHECK(send_auth_challenge_callback_.IsCancelled()); | |
320 send_auth_challenge_callback_.Reset( | |
321 base::Bind(&CastSocket::SendCastMessageInternal, | |
322 base::Unretained(this), | |
323 challenge_message, | |
324 base::Bind(&CastSocket::DoAuthChallengeSendWriteComplete, | |
325 base::Unretained(this)))); | |
309 base::MessageLoop::current()->PostTask( | 326 base::MessageLoop::current()->PostTask( |
310 FROM_HERE, | 327 FROM_HERE, |
311 base::Bind(&CastSocket::SendCastMessageInternal, | 328 send_auth_challenge_callback_.callback()); |
312 AsWeakPtr(), | |
313 challenge_message, | |
314 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr()))); | |
315 // Always return IO_PENDING since the result is always asynchronous. | 329 // Always return IO_PENDING since the result is always asynchronous. |
316 return net::ERR_IO_PENDING; | 330 return net::ERR_IO_PENDING; |
317 } | 331 } |
318 | 332 |
333 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { | |
334 send_auth_challenge_callback_.Cancel(); | |
335 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; | |
336 DCHECK_GT(result, 0); | |
337 DCHECK_EQ(write_queue_.size(), 1UL); | |
338 PostTaskToStartConnectLoop(result); | |
339 } | |
340 | |
319 int CastSocket::DoAuthChallengeSendComplete(int result) { | 341 int CastSocket::DoAuthChallengeSendComplete(int result) { |
320 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; | 342 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; |
321 if (result < 0) | 343 if (result < 0) |
322 return result; | 344 return result; |
323 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; | 345 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; |
324 // Post a task to start read loop so that DoReadLoop is not nested inside | 346 // Post a task to start read loop so that DoReadLoop is not nested inside |
325 // DoConnectLoop. This is not strictly necessary but keeps the read loop | 347 // DoConnectLoop. This is not strictly necessary but keeps the read loop |
326 // code decoupled from connect loop code. | 348 // code decoupled from connect loop code. |
327 PostTaskToStartReadLoop(); | 349 PostTaskToStartReadLoop(); |
328 // Always return IO_PENDING since the result is always asynchronous. | 350 // Always return IO_PENDING since the result is always asynchronous. |
(...skipping 18 matching lines...) Expand all Loading... | |
347 } else if (result == net::ERR_TIMED_OUT) { | 369 } else if (result == net::ERR_TIMED_OUT) { |
348 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; | 370 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; |
349 } else { | 371 } else { |
350 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; | 372 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; |
351 } | 373 } |
352 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; | 374 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; |
353 base::ResetAndReturn(&connect_callback_).Run(result); | 375 base::ResetAndReturn(&connect_callback_).Run(result); |
354 } | 376 } |
355 | 377 |
356 void CastSocket::Close(const net::CompletionCallback& callback) { | 378 void CastSocket::Close(const net::CompletionCallback& callback) { |
357 DCHECK(CalledOnValidThread()); | 379 CloseInternal(); |
380 callback.Run(net::OK); | |
381 } | |
382 | |
383 void CastSocket::CloseInternal() { | |
384 // TODO(mfoltz): Enforce this when CastChannelAPITest is rewritten to create | |
385 // and free sockets on the same thread. crbug.com/398242 | |
386 // DCHECK(CalledOnValidThread()); | |
387 if (ready_state_ == READY_STATE_CLOSED) { | |
388 return; | |
389 } | |
358 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; | 390 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; |
359 tcp_socket_.reset(); | 391 tcp_socket_.reset(); |
360 socket_.reset(); | 392 socket_.reset(); |
361 cert_verifier_.reset(); | 393 cert_verifier_.reset(); |
362 transport_security_state_.reset(); | 394 transport_security_state_.reset(); |
395 GetTimer()->Stop(); | |
396 | |
397 // Fire any remaining callbacks passed into us. | |
398 if (!connect_callback_.is_null()) { | |
399 connect_callback_.Run(net::ERR_CONNECTION_FAILED); | |
Wez
2014/07/31 22:57:53
So there is no possibility of this callback deleti
mark a. foltz
2014/08/01 23:21:48
Yes this is possible and that would be bad. The o
| |
400 connect_callback_.Reset(); | |
401 } | |
402 for (; !write_queue_.empty(); write_queue_.pop()) { | |
403 net::CompletionCallback& callback = write_queue_.front().callback; | |
404 callback.Run(net::ERR_FAILED); | |
405 callback.Reset(); | |
406 } | |
407 | |
408 // Cancel callbacks that we queued ourselves to re-enter the connect or read | |
409 // loops. | |
410 connect_loop_callback_.Cancel(); | |
411 send_auth_challenge_callback_.Cancel(); | |
412 read_loop_callback_.Cancel(); | |
413 connect_timeout_callback_.Cancel(); | |
363 ready_state_ = READY_STATE_CLOSED; | 414 ready_state_ = READY_STATE_CLOSED; |
364 callback.Run(net::OK); | |
365 // |callback| can delete |this| | |
366 } | 415 } |
367 | 416 |
368 void CastSocket::SendMessage(const MessageInfo& message, | 417 void CastSocket::SendMessage(const MessageInfo& message, |
369 const net::CompletionCallback& callback) { | 418 const net::CompletionCallback& callback) { |
370 DCHECK(CalledOnValidThread()); | 419 DCHECK(CalledOnValidThread()); |
371 if (ready_state_ != READY_STATE_OPEN) { | 420 if (ready_state_ != READY_STATE_OPEN) { |
372 callback.Run(net::ERR_FAILED); | 421 callback.Run(net::ERR_FAILED); |
373 return; | 422 return; |
374 } | 423 } |
375 CastMessage message_proto; | 424 CastMessage message_proto; |
376 if (!MessageInfoToCastMessage(message, &message_proto)) { | 425 if (!MessageInfoToCastMessage(message, &message_proto)) { |
377 callback.Run(net::ERR_FAILED); | 426 callback.Run(net::ERR_FAILED); |
378 return; | 427 return; |
379 } | 428 } |
380 | |
381 SendCastMessageInternal(message_proto, callback); | 429 SendCastMessageInternal(message_proto, callback); |
382 } | 430 } |
383 | 431 |
384 void CastSocket::SendCastMessageInternal( | 432 void CastSocket::SendCastMessageInternal( |
385 const CastMessage& message, | 433 const CastMessage& message, |
386 const net::CompletionCallback& callback) { | 434 const net::CompletionCallback& callback) { |
387 WriteRequest write_request(callback); | 435 WriteRequest write_request(callback); |
388 if (!write_request.SetContent(message)) { | 436 if (!write_request.SetContent(message)) { |
389 callback.Run(net::ERR_FAILED); | 437 callback.Run(net::ERR_FAILED); |
390 return; | 438 return; |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
447 | 495 |
448 int CastSocket::DoWrite() { | 496 int CastSocket::DoWrite() { |
449 DCHECK(!write_queue_.empty()); | 497 DCHECK(!write_queue_.empty()); |
450 WriteRequest& request = write_queue_.front(); | 498 WriteRequest& request = write_queue_.front(); |
451 | 499 |
452 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " | 500 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " |
453 << request.io_buffer->size() << " bytes_written " | 501 << request.io_buffer->size() << " bytes_written " |
454 << request.io_buffer->BytesConsumed(); | 502 << request.io_buffer->BytesConsumed(); |
455 | 503 |
456 write_state_ = WRITE_STATE_WRITE_COMPLETE; | 504 write_state_ = WRITE_STATE_WRITE_COMPLETE; |
457 | |
458 return socket_->Write( | 505 return socket_->Write( |
459 request.io_buffer.get(), | 506 request.io_buffer.get(), |
460 request.io_buffer->BytesRemaining(), | 507 request.io_buffer->BytesRemaining(), |
461 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr())); | 508 base::Bind(&CastSocket::DoWriteLoop, base::Unretained(this))); |
462 } | 509 } |
463 | 510 |
464 int CastSocket::DoWriteComplete(int result) { | 511 int CastSocket::DoWriteComplete(int result) { |
465 DCHECK(!write_queue_.empty()); | 512 DCHECK(!write_queue_.empty()); |
466 if (result <= 0) { // NOTE that 0 also indicates an error | 513 if (result <= 0) { // NOTE that 0 also indicates an error |
467 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; | 514 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; |
468 write_state_ = WRITE_STATE_ERROR; | 515 write_state_ = WRITE_STATE_ERROR; |
469 return result == 0 ? net::ERR_FAILED : result; | 516 return result == 0 ? net::ERR_FAILED : result; |
470 } | 517 } |
471 | 518 |
472 // Some bytes were successfully written | 519 // Some bytes were successfully written |
473 WriteRequest& request = write_queue_.front(); | 520 WriteRequest& request = write_queue_.front(); |
474 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; | 521 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; |
475 io_buffer->DidConsume(result); | 522 io_buffer->DidConsume(result); |
476 if (io_buffer->BytesRemaining() == 0) // Message fully sent | 523 if (io_buffer->BytesRemaining() == 0) // Message fully sent |
477 write_state_ = WRITE_STATE_DO_CALLBACK; | 524 write_state_ = WRITE_STATE_DO_CALLBACK; |
478 else | 525 else |
479 write_state_ = WRITE_STATE_WRITE; | 526 write_state_ = WRITE_STATE_WRITE; |
480 | 527 |
481 return net::OK; | 528 return net::OK; |
482 } | 529 } |
483 | 530 |
484 int CastSocket::DoWriteCallback() { | 531 int CastSocket::DoWriteCallback() { |
485 DCHECK(!write_queue_.empty()); | 532 DCHECK(!write_queue_.empty()); |
533 write_state_ = WRITE_STATE_WRITE; | |
486 WriteRequest& request = write_queue_.front(); | 534 WriteRequest& request = write_queue_.front(); |
487 int bytes_consumed = request.io_buffer->BytesConsumed(); | 535 int bytes_consumed = request.io_buffer->BytesConsumed(); |
488 | 536 request.callback.Run(bytes_consumed); |
489 // If inside connection flow, then there should be exaclty one item in | 537 write_queue_.pop(); |
490 // the write queue. | |
491 if (ready_state_ == READY_STATE_CONNECTING) { | |
492 write_queue_.pop(); | |
493 DCHECK(write_queue_.empty()); | |
494 PostTaskToStartConnectLoop(bytes_consumed); | |
495 } else { | |
496 WriteRequest& request = write_queue_.front(); | |
497 request.callback.Run(bytes_consumed); | |
498 write_queue_.pop(); | |
499 } | |
500 write_state_ = WRITE_STATE_WRITE; | |
501 return net::OK; | 538 return net::OK; |
502 } | 539 } |
503 | 540 |
504 int CastSocket::DoWriteError(int result) { | 541 int CastSocket::DoWriteError(int result) { |
505 DCHECK(!write_queue_.empty()); | 542 DCHECK(!write_queue_.empty()); |
506 DCHECK_LT(result, 0); | 543 DCHECK_LT(result, 0); |
507 | 544 |
508 // If inside connection flow, then there should be exactly one item in | 545 // If inside connection flow, then there should be exactly one item in |
509 // the write queue. | 546 // the write queue. |
510 if (ready_state_ == READY_STATE_CONNECTING) { | 547 if (ready_state_ == READY_STATE_CONNECTING) { |
511 write_queue_.pop(); | 548 write_queue_.pop(); |
512 DCHECK(write_queue_.empty()); | 549 DCHECK(write_queue_.empty()); |
513 PostTaskToStartConnectLoop(result); | 550 PostTaskToStartConnectLoop(result); |
514 // Connect loop will handle the error. Return net::OK so that write flow | 551 // Connect loop will handle the error. Return net::OK so that write flow |
515 // does not try to report error also. | 552 // does not try to report error also. |
516 return net::OK; | 553 return net::OK; |
517 } | 554 } |
518 | 555 |
519 while (!write_queue_.empty()) { | 556 while (!write_queue_.empty()) { |
520 WriteRequest& request = write_queue_.front(); | 557 WriteRequest& request = write_queue_.front(); |
521 request.callback.Run(result); | 558 request.callback.Run(result); |
522 write_queue_.pop(); | 559 write_queue_.pop(); |
523 } | 560 } |
524 return net::ERR_FAILED; | 561 return net::ERR_FAILED; |
525 } | 562 } |
526 | 563 |
527 void CastSocket::PostTaskToStartReadLoop() { | 564 void CastSocket::PostTaskToStartReadLoop() { |
528 DCHECK(CalledOnValidThread()); | 565 DCHECK(CalledOnValidThread()); |
529 base::MessageLoop::current()->PostTask( | 566 DCHECK(read_loop_callback_.IsCancelled()); |
530 FROM_HERE, | 567 read_loop_callback_.Reset(base::Bind(&CastSocket::StartReadLoop, |
531 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr())); | 568 base::Unretained(this))); |
569 base::MessageLoop::current()->PostTask(FROM_HERE, | |
570 read_loop_callback_.callback()); | |
532 } | 571 } |
533 | 572 |
534 void CastSocket::StartReadLoop() { | 573 void CastSocket::StartReadLoop() { |
574 read_loop_callback_.Cancel(); | |
535 // Read loop would have already been started if read state is not NONE | 575 // Read loop would have already been started if read state is not NONE |
536 if (read_state_ == READ_STATE_NONE) { | 576 if (read_state_ == READ_STATE_NONE) { |
537 read_state_ = READ_STATE_READ; | 577 read_state_ = READ_STATE_READ; |
538 DoReadLoop(net::OK); | 578 DoReadLoop(net::OK); |
539 } | 579 } |
540 } | 580 } |
541 | 581 |
542 void CastSocket::DoReadLoop(int result) { | 582 void CastSocket::DoReadLoop(int result) { |
543 DCHECK(CalledOnValidThread()); | 583 DCHECK(CalledOnValidThread()); |
544 // Network operations can either finish synchronously or asynchronously. | 584 // Network operations can either finish synchronously or asynchronously. |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
596 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); | 636 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); |
597 current_read_buffer_ = body_read_buffer_; | 637 current_read_buffer_ = body_read_buffer_; |
598 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); | 638 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); |
599 } | 639 } |
600 CHECK_GT(num_bytes_to_read, 0U); | 640 CHECK_GT(num_bytes_to_read, 0U); |
601 | 641 |
602 // Read up to num_bytes_to_read into |current_read_buffer_|. | 642 // Read up to num_bytes_to_read into |current_read_buffer_|. |
603 return socket_->Read( | 643 return socket_->Read( |
604 current_read_buffer_.get(), | 644 current_read_buffer_.get(), |
605 num_bytes_to_read, | 645 num_bytes_to_read, |
606 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr())); | 646 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); |
607 } | 647 } |
608 | 648 |
609 int CastSocket::DoReadComplete(int result) { | 649 int CastSocket::DoReadComplete(int result) { |
610 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result | 650 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result |
611 << " header offset = " | 651 << " header offset = " |
612 << header_read_buffer_->offset() | 652 << header_read_buffer_->offset() |
613 << " body offset = " << body_read_buffer_->offset(); | 653 << " body offset = " << body_read_buffer_->offset(); |
614 if (result <= 0) { // 0 means EOF: the peer closed the socket | 654 if (result <= 0) { // 0 means EOF: the peer closed the socket |
615 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; | 655 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; |
616 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; | 656 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
716 return false; | 756 return false; |
717 } | 757 } |
718 CastSocket::MessageHeader header; | 758 CastSocket::MessageHeader header; |
719 header.SetMessageSize(message_size); | 759 header.SetMessageSize(message_size); |
720 header.PrependToString(message_data); | 760 header.PrependToString(message_data); |
721 return true; | 761 return true; |
722 } | 762 } |
723 | 763 |
724 void CastSocket::CloseWithError(ChannelError error) { | 764 void CastSocket::CloseWithError(ChannelError error) { |
725 DCHECK(CalledOnValidThread()); | 765 DCHECK(CalledOnValidThread()); |
726 socket_.reset(NULL); | 766 CloseInternal(); |
727 ready_state_ = READY_STATE_CLOSED; | |
728 error_state_ = error; | 767 error_state_ = error; |
729 if (delegate_) | 768 if (delegate_) |
730 delegate_->OnError(this, error); | 769 delegate_->OnError(this, error); |
731 } | 770 } |
732 | 771 |
733 std::string CastSocket::CastUrl() const { | 772 std::string CastSocket::CastUrl() const { |
734 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? | 773 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? |
735 "casts://" : "cast://") + ip_endpoint_.ToString(); | 774 "casts://" : "cast://") + ip_endpoint_.ToString(); |
736 } | 775 } |
737 | 776 |
(...skipping 11 matching lines...) Expand all Loading... | |
749 DCHECK_LT(size, static_cast<size_t>(kuint32max)); | 788 DCHECK_LT(size, static_cast<size_t>(kuint32max)); |
750 DCHECK_GT(size, 0U); | 789 DCHECK_GT(size, 0U); |
751 message_size = size; | 790 message_size = size; |
752 } | 791 } |
753 | 792 |
754 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, | 793 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, |
755 // if bit-for-bit compatible. | 794 // if bit-for-bit compatible. |
756 void CastSocket::MessageHeader::PrependToString(std::string* str) { | 795 void CastSocket::MessageHeader::PrependToString(std::string* str) { |
757 MessageHeader output = *this; | 796 MessageHeader output = *this; |
758 output.message_size = base::HostToNet32(message_size); | 797 output.message_size = base::HostToNet32(message_size); |
759 size_t header_size = base::checked_cast<size_t,uint32>( | 798 size_t header_size = base::checked_cast<size_t, uint32>( |
760 MessageHeader::header_size()); | 799 MessageHeader::header_size()); |
761 scoped_ptr<char, base::FreeDeleter> char_array( | 800 scoped_ptr<char, base::FreeDeleter> char_array( |
762 static_cast<char*>(malloc(header_size))); | 801 static_cast<char*>(malloc(header_size))); |
763 memcpy(char_array.get(), &output, header_size); | 802 memcpy(char_array.get(), &output, header_size); |
764 str->insert(0, char_array.get(), header_size); | 803 str->insert(0, char_array.get(), header_size); |
765 } | 804 } |
766 | 805 |
767 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, | 806 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, |
768 // if bit-for-bit compatible. | 807 // if bit-for-bit compatible. |
769 void CastSocket::MessageHeader::ReadFromIOBuffer( | 808 void CastSocket::MessageHeader::ReadFromIOBuffer( |
770 net::GrowableIOBuffer* buffer, MessageHeader* header) { | 809 net::GrowableIOBuffer* buffer, MessageHeader* header) { |
771 uint32 message_size; | 810 uint32 message_size; |
772 size_t header_size = base::checked_cast<size_t,uint32>( | 811 size_t header_size = base::checked_cast<size_t, uint32>( |
773 MessageHeader::header_size()); | 812 MessageHeader::header_size()); |
774 memcpy(&message_size, buffer->StartOfBuffer(), header_size); | 813 memcpy(&message_size, buffer->StartOfBuffer(), header_size); |
775 header->message_size = base::NetToHost32(message_size); | 814 header->message_size = base::NetToHost32(message_size); |
776 } | 815 } |
777 | 816 |
778 std::string CastSocket::MessageHeader::ToString() { | 817 std::string CastSocket::MessageHeader::ToString() { |
779 return "{message_size: " + base::UintToString(message_size) + "}"; | 818 return "{message_size: " + base::UintToString(message_size) + "}"; |
780 } | 819 } |
781 | 820 |
782 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) | 821 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) |
783 : callback(callback) { } | 822 : callback(callback) { } |
784 | 823 |
785 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { | 824 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { |
786 DCHECK(!io_buffer.get()); | 825 DCHECK(!io_buffer.get()); |
787 std::string message_data; | 826 std::string message_data; |
788 if (!Serialize(message_proto, &message_data)) | 827 if (!Serialize(message_proto, &message_data)) |
789 return false; | 828 return false; |
790 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), | 829 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), |
791 message_data.size()); | 830 message_data.size()); |
792 return true; | 831 return true; |
793 } | 832 } |
794 | 833 |
795 CastSocket::WriteRequest::~WriteRequest() { } | 834 CastSocket::WriteRequest::~WriteRequest() { } |
796 | 835 |
797 } // namespace cast_channel | 836 } // namespace cast_channel |
798 } // namespace core_api | 837 } // namespace core_api |
799 } // namespace extensions | 838 } // namespace extensions |
800 | 839 |
801 #undef VLOG_WITH_CONNECTION | 840 #undef VLOG_WITH_CONNECTION |
OLD | NEW |