OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "extensions/browser/api/cast_channel/cast_socket.h" | 5 #include "extensions/browser/api/cast_channel/cast_socket.h" |
6 | 6 |
7 #include <stdlib.h> | 7 #include <stdlib.h> |
8 #include <string.h> | 8 #include <string.h> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
92 net_log_source_.id = net_log_->NextID(); | 92 net_log_source_.id = net_log_->NextID(); |
93 | 93 |
94 // Reuse these buffers for each message. | 94 // Reuse these buffers for each message. |
95 header_read_buffer_ = new net::GrowableIOBuffer(); | 95 header_read_buffer_ = new net::GrowableIOBuffer(); |
96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); | 96 header_read_buffer_->SetCapacity(MessageHeader::header_size()); |
97 body_read_buffer_ = new net::GrowableIOBuffer(); | 97 body_read_buffer_ = new net::GrowableIOBuffer(); |
98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); | 98 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); |
99 current_read_buffer_ = header_read_buffer_; | 99 current_read_buffer_ = header_read_buffer_; |
100 } | 100 } |
101 | 101 |
102 CastSocket::~CastSocket() { } | 102 CastSocket::~CastSocket() { |
103 CloseInternal(); | |
104 } | |
103 | 105 |
104 ReadyState CastSocket::ready_state() const { | 106 ReadyState CastSocket::ready_state() const { |
105 return ready_state_; | 107 return ready_state_; |
106 } | 108 } |
107 | 109 |
108 ChannelError CastSocket::error_state() const { | 110 ChannelError CastSocket::error_state() const { |
109 return error_state_; | 111 return error_state_; |
110 } | 112 } |
111 | 113 |
112 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { | 114 scoped_ptr<net::TCPClientSocket> CastSocket::CreateTcpSocket() { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
169 DCHECK(CalledOnValidThread()); | 171 DCHECK(CalledOnValidThread()); |
170 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; | 172 VLOG_WITH_CONNECTION(1) << "Connect readyState = " << ready_state_; |
171 if (ready_state_ != READY_STATE_NONE) { | 173 if (ready_state_ != READY_STATE_NONE) { |
172 callback.Run(net::ERR_CONNECTION_FAILED); | 174 callback.Run(net::ERR_CONNECTION_FAILED); |
173 return; | 175 return; |
174 } | 176 } |
175 ready_state_ = READY_STATE_CONNECTING; | 177 ready_state_ = READY_STATE_CONNECTING; |
176 connect_callback_ = callback; | 178 connect_callback_ = callback; |
177 connect_state_ = CONN_STATE_TCP_CONNECT; | 179 connect_state_ = CONN_STATE_TCP_CONNECT; |
178 if (connect_timeout_.InMicroseconds() > 0) { | 180 if (connect_timeout_.InMicroseconds() > 0) { |
179 GetTimer()->Start( | 181 DCHECK(cancel_connect_callback_.IsCancelled()); |
180 FROM_HERE, | 182 cancel_connect_callback_.Reset(base::Bind(&CastSocket::CancelConnect, |
181 connect_timeout_, | 183 base::Unretained(this))); |
Wez
2014/07/30 19:14:53
nit: Style guide would prefer wrapping this after
mark a. foltz
2014/07/31 18:33:17
It prefers wrapping at parenthesis
| |
182 base::Bind(&CastSocket::CancelConnect, AsWeakPtr())); | 184 GetTimer()->Start(FROM_HERE, |
185 connect_timeout_, | |
186 cancel_connect_callback_.callback()); | |
183 } | 187 } |
184 DoConnectLoop(net::OK); | 188 DoConnectLoop(net::OK); |
185 } | 189 } |
186 | 190 |
187 void CastSocket::PostTaskToStartConnectLoop(int result) { | 191 void CastSocket::PostTaskToStartConnectLoop(int result) { |
188 DCHECK(CalledOnValidThread()); | 192 DCHECK(CalledOnValidThread()); |
189 base::MessageLoop::current()->PostTask( | 193 DCHECK(connect_loop_callback_.IsCancelled()); |
190 FROM_HERE, | 194 connect_loop_callback_.Reset(base::Bind(&CastSocket::DoConnectLoop, |
191 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr(), result)); | 195 base::Unretained(this), |
196 result)); | |
197 base::MessageLoop::current()->PostTask(FROM_HERE, | |
198 connect_loop_callback_.callback()); | |
192 } | 199 } |
193 | 200 |
194 void CastSocket::CancelConnect() { | 201 void CastSocket::CancelConnect() { |
195 DCHECK(CalledOnValidThread()); | 202 DCHECK(CalledOnValidThread()); |
196 // Stop all pending connection setup tasks and report back to the client. | 203 // Stop all pending connection setup tasks and report back to the client. |
197 is_canceled_ = true; | 204 is_canceled_ = true; |
198 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; | 205 VLOG_WITH_CONNECTION(1) << "Timeout while establishing a connection."; |
199 DoConnectCallback(net::ERR_TIMED_OUT); | 206 DoConnectCallback(net::ERR_TIMED_OUT); |
200 } | 207 } |
201 | 208 |
202 // This method performs the state machine transitions for connection flow. | 209 // This method performs the state machine transitions for connection flow. |
203 // There are two entry points to this method: | 210 // There are two entry points to this method: |
204 // 1. Connect method: this starts the flow | 211 // 1. Connect method: this starts the flow |
205 // 2. Callback from network operations that finish asynchronously | 212 // 2. Callback from network operations that finish asynchronously |
206 void CastSocket::DoConnectLoop(int result) { | 213 void CastSocket::DoConnectLoop(int result) { |
214 connect_loop_callback_.Cancel(); | |
207 if (is_canceled_) { | 215 if (is_canceled_) { |
208 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; | 216 LOG(ERROR) << "CANCELLED - Aborting DoConnectLoop."; |
209 return; | 217 return; |
210 } | 218 } |
211 // Network operations can either finish synchronously or asynchronously. | 219 // Network operations can either finish synchronously or asynchronously. |
212 // This method executes the state machine transitions in a loop so that | 220 // This method executes the state machine transitions in a loop so that |
213 // correct state transitions happen even when network operations finish | 221 // correct state transitions happen even when network operations finish |
214 // synchronously. | 222 // synchronously. |
215 int rv = result; | 223 int rv = result; |
216 do { | 224 do { |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
254 if (rv != net::ERR_IO_PENDING) { | 262 if (rv != net::ERR_IO_PENDING) { |
255 GetTimer()->Stop(); | 263 GetTimer()->Stop(); |
256 DoConnectCallback(rv); | 264 DoConnectCallback(rv); |
257 } | 265 } |
258 } | 266 } |
259 | 267 |
260 int CastSocket::DoTcpConnect() { | 268 int CastSocket::DoTcpConnect() { |
261 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; | 269 VLOG_WITH_CONNECTION(1) << "DoTcpConnect"; |
262 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; | 270 connect_state_ = CONN_STATE_TCP_CONNECT_COMPLETE; |
263 tcp_socket_ = CreateTcpSocket(); | 271 tcp_socket_ = CreateTcpSocket(); |
272 DCHECK(connect_loop_callback_.IsCancelled()); | |
Wez
2014/07/30 19:14:53
nit: Check this on entry to DoTcpConnect()?
Wez
2014/07/30 19:14:53
IIUC this is basically checking that after a PostT
mark a. foltz
2014/07/31 18:33:17
Done.
mark a. foltz
2014/07/31 18:33:17
There should be at most one pending callback to re
| |
264 return tcp_socket_->Connect( | 273 return tcp_socket_->Connect( |
265 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); | 274 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this))); |
266 } | 275 } |
267 | 276 |
268 int CastSocket::DoTcpConnectComplete(int result) { | 277 int CastSocket::DoTcpConnectComplete(int result) { |
269 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; | 278 VLOG_WITH_CONNECTION(1) << "DoTcpConnectComplete: " << result; |
270 if (result == net::OK) { | 279 if (result == net::OK) { |
271 // Enable TCP protocol-level keep-alive. | 280 // Enable TCP protocol-level keep-alive. |
272 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); | 281 bool result = tcp_socket_->SetKeepAlive(true, kTcpKeepAliveDelaySecs); |
273 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; | 282 LOG_IF(WARNING, !result) << "Failed to SetKeepAlive."; |
274 connect_state_ = CONN_STATE_SSL_CONNECT; | 283 connect_state_ = CONN_STATE_SSL_CONNECT; |
275 } | 284 } |
276 return result; | 285 return result; |
277 } | 286 } |
278 | 287 |
279 int CastSocket::DoSslConnect() { | 288 int CastSocket::DoSslConnect() { |
280 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; | 289 VLOG_WITH_CONNECTION(1) << "DoSslConnect"; |
281 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; | 290 connect_state_ = CONN_STATE_SSL_CONNECT_COMPLETE; |
282 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); | 291 socket_ = CreateSslSocket(tcp_socket_.PassAs<net::StreamSocket>()); |
292 DCHECK(connect_loop_callback_.IsCancelled()); | |
Wez
2014/07/30 19:14:53
nit: Check this on entry to the method?
mark a. foltz
2014/07/31 18:33:16
Done.
| |
283 return socket_->Connect( | 293 return socket_->Connect( |
284 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr())); | 294 base::Bind(&CastSocket::DoConnectLoop, base::Unretained(this))); |
285 } | 295 } |
286 | 296 |
287 int CastSocket::DoSslConnectComplete(int result) { | 297 int CastSocket::DoSslConnectComplete(int result) { |
288 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; | 298 VLOG_WITH_CONNECTION(1) << "DoSslConnectComplete: " << result; |
289 if (result == net::ERR_CERT_AUTHORITY_INVALID && | 299 if (result == net::ERR_CERT_AUTHORITY_INVALID && |
290 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { | 300 peer_cert_.empty() && ExtractPeerCert(&peer_cert_)) { |
291 connect_state_ = CONN_STATE_TCP_CONNECT; | 301 connect_state_ = CONN_STATE_TCP_CONNECT; |
292 } else if (result == net::OK && | 302 } else if (result == net::OK && |
293 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { | 303 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) { |
294 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; | 304 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND; |
295 } | 305 } |
296 return result; | 306 return result; |
297 } | 307 } |
298 | 308 |
299 int CastSocket::DoAuthChallengeSend() { | 309 int CastSocket::DoAuthChallengeSend() { |
300 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; | 310 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSend"; |
301 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; | 311 connect_state_ = CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE; |
302 CastMessage challenge_message; | 312 CastMessage challenge_message; |
303 CreateAuthChallengeMessage(&challenge_message); | 313 CreateAuthChallengeMessage(&challenge_message); |
304 VLOG_WITH_CONNECTION(1) << "Sending challenge: " | 314 VLOG_WITH_CONNECTION(1) << "Sending challenge: " |
305 << CastMessageToString(challenge_message); | 315 << CastMessageToString(challenge_message); |
306 // Post a task to send auth challenge so that DoWriteLoop is not nested inside | 316 // Post a task to send auth challenge so that DoWriteLoop is not nested inside |
307 // DoConnectLoop. This is not strictly necessary but keeps the write loop | 317 // DoConnectLoop. This is not strictly necessary but keeps the write loop |
308 // code decoupled from connect loop code. | 318 // code decoupled from connect loop code. |
319 DCHECK(send_auth_challenge_callback_.IsCancelled()); | |
320 send_auth_challenge_callback_.Reset( | |
321 base::Bind(&CastSocket::SendCastMessageInternal, | |
322 base::Unretained(this), | |
323 challenge_message, | |
324 base::Bind(&CastSocket::DoAuthChallengeSendWriteComplete, | |
325 base::Unretained(this)))); | |
Wez
2014/07/30 19:14:53
Ick... do we really need a clean stack for SendCas
mark a. foltz
2014/07/31 18:33:17
It appears to be "not strictly necessary" but othe
| |
309 base::MessageLoop::current()->PostTask( | 326 base::MessageLoop::current()->PostTask( |
310 FROM_HERE, | 327 FROM_HERE, |
311 base::Bind(&CastSocket::SendCastMessageInternal, | 328 send_auth_challenge_callback_.callback()); |
312 AsWeakPtr(), | |
313 challenge_message, | |
314 base::Bind(&CastSocket::DoConnectLoop, AsWeakPtr()))); | |
315 // Always return IO_PENDING since the result is always asynchronous. | 329 // Always return IO_PENDING since the result is always asynchronous. |
316 return net::ERR_IO_PENDING; | 330 return net::ERR_IO_PENDING; |
317 } | 331 } |
318 | 332 |
333 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { | |
334 send_auth_challenge_callback_.Cancel(); | |
335 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; | |
336 DCHECK_GT(result, 0); | |
337 DCHECK_EQ(write_queue_.size(), 1UL); | |
338 PostTaskToStartConnectLoop(result); | |
339 } | |
340 | |
319 int CastSocket::DoAuthChallengeSendComplete(int result) { | 341 int CastSocket::DoAuthChallengeSendComplete(int result) { |
320 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; | 342 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; |
321 if (result < 0) | 343 if (result < 0) |
322 return result; | 344 return result; |
323 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; | 345 connect_state_ = CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE; |
324 // Post a task to start read loop so that DoReadLoop is not nested inside | 346 // Post a task to start read loop so that DoReadLoop is not nested inside |
325 // DoConnectLoop. This is not strictly necessary but keeps the read loop | 347 // DoConnectLoop. This is not strictly necessary but keeps the read loop |
326 // code decoupled from connect loop code. | 348 // code decoupled from connect loop code. |
327 PostTaskToStartReadLoop(); | 349 PostTaskToStartReadLoop(); |
328 // Always return IO_PENDING since the result is always asynchronous. | 350 // Always return IO_PENDING since the result is always asynchronous. |
(...skipping 18 matching lines...) Expand all Loading... | |
347 } else if (result == net::ERR_TIMED_OUT) { | 369 } else if (result == net::ERR_TIMED_OUT) { |
348 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; | 370 error_state_ = CHANNEL_ERROR_CONNECT_TIMEOUT; |
349 } else { | 371 } else { |
350 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; | 372 error_state_ = CHANNEL_ERROR_CONNECT_ERROR; |
351 } | 373 } |
352 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; | 374 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; |
353 base::ResetAndReturn(&connect_callback_).Run(result); | 375 base::ResetAndReturn(&connect_callback_).Run(result); |
354 } | 376 } |
355 | 377 |
356 void CastSocket::Close(const net::CompletionCallback& callback) { | 378 void CastSocket::Close(const net::CompletionCallback& callback) { |
357 DCHECK(CalledOnValidThread()); | 379 CloseInternal(); |
380 callback.Run(net::OK); | |
381 // |callback| can delete |this| | |
Wez
2014/07/30 19:14:53
nit: Callers must inherently cope with asynchronou
mark a. foltz
2014/07/31 18:33:16
This comment is out of date and removed. Deletion
| |
382 } | |
383 | |
384 void CastSocket::CloseInternal() { | |
385 // TODO(mfoltz): Enforce this when CastChannelAPITest is rewritten to create | |
386 // and free sockets on the same thread. crbug.com/398242 | |
387 // DCHECK(CalledOnValidThread()); | |
388 if (ready_state_ == READY_STATE_CLOSED) { | |
389 return; | |
390 } | |
358 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; | 391 VLOG_WITH_CONNECTION(1) << "Close ReadyState = " << ready_state_; |
359 tcp_socket_.reset(); | 392 tcp_socket_.reset(); |
360 socket_.reset(); | 393 socket_.reset(); |
361 cert_verifier_.reset(); | 394 cert_verifier_.reset(); |
362 transport_security_state_.reset(); | 395 transport_security_state_.reset(); |
396 GetTimer()->Stop(); | |
397 // Reset callbacks passed into us. Or should we invoke them with an error? | |
Wez
2014/07/30 19:14:53
nit: Suggest blank line before this comment, and b
Wez
2014/07/30 19:14:53
If the socket is being closed then there shouldn't
mark a. foltz
2014/07/31 18:33:16
Done.
mark a. foltz
2014/07/31 18:33:17
Actually, there may be a Promise pending on the ca
| |
398 connect_callback_.Reset(); | |
399 for (; !write_queue_.empty(); write_queue_.pop()) { | |
400 write_queue_.front().callback.Reset(); | |
401 } | |
402 // Cancel callbacks that we queued ourselves to re-enter the connect or read | |
403 // loops. | |
404 connect_loop_callback_.Cancel(); | |
405 send_auth_challenge_callback_.Cancel(); | |
406 read_loop_callback_.Cancel(); | |
407 cancel_connect_callback_.Cancel(); | |
363 ready_state_ = READY_STATE_CLOSED; | 408 ready_state_ = READY_STATE_CLOSED; |
364 callback.Run(net::OK); | |
365 // |callback| can delete |this| | |
366 } | 409 } |
367 | 410 |
368 void CastSocket::SendMessage(const MessageInfo& message, | 411 void CastSocket::SendMessage(const MessageInfo& message, |
369 const net::CompletionCallback& callback) { | 412 const net::CompletionCallback& callback) { |
370 DCHECK(CalledOnValidThread()); | 413 DCHECK(CalledOnValidThread()); |
371 if (ready_state_ != READY_STATE_OPEN) { | 414 if (ready_state_ != READY_STATE_OPEN) { |
372 callback.Run(net::ERR_FAILED); | 415 callback.Run(net::ERR_FAILED); |
373 return; | 416 return; |
374 } | 417 } |
375 CastMessage message_proto; | 418 CastMessage message_proto; |
376 if (!MessageInfoToCastMessage(message, &message_proto)) { | 419 if (!MessageInfoToCastMessage(message, &message_proto)) { |
377 callback.Run(net::ERR_FAILED); | 420 callback.Run(net::ERR_FAILED); |
378 return; | 421 return; |
379 } | 422 } |
380 | |
381 SendCastMessageInternal(message_proto, callback); | 423 SendCastMessageInternal(message_proto, callback); |
382 } | 424 } |
383 | 425 |
384 void CastSocket::SendCastMessageInternal( | 426 void CastSocket::SendCastMessageInternal( |
385 const CastMessage& message, | 427 const CastMessage& message, |
386 const net::CompletionCallback& callback) { | 428 const net::CompletionCallback& callback) { |
387 WriteRequest write_request(callback); | 429 WriteRequest write_request(callback); |
388 if (!write_request.SetContent(message)) { | 430 if (!write_request.SetContent(message)) { |
389 callback.Run(net::ERR_FAILED); | 431 callback.Run(net::ERR_FAILED); |
390 return; | 432 return; |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
447 | 489 |
448 int CastSocket::DoWrite() { | 490 int CastSocket::DoWrite() { |
449 DCHECK(!write_queue_.empty()); | 491 DCHECK(!write_queue_.empty()); |
450 WriteRequest& request = write_queue_.front(); | 492 WriteRequest& request = write_queue_.front(); |
451 | 493 |
452 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " | 494 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " |
453 << request.io_buffer->size() << " bytes_written " | 495 << request.io_buffer->size() << " bytes_written " |
454 << request.io_buffer->BytesConsumed(); | 496 << request.io_buffer->BytesConsumed(); |
455 | 497 |
456 write_state_ = WRITE_STATE_WRITE_COMPLETE; | 498 write_state_ = WRITE_STATE_WRITE_COMPLETE; |
457 | |
458 return socket_->Write( | 499 return socket_->Write( |
459 request.io_buffer.get(), | 500 request.io_buffer.get(), |
460 request.io_buffer->BytesRemaining(), | 501 request.io_buffer->BytesRemaining(), |
461 base::Bind(&CastSocket::DoWriteLoop, AsWeakPtr())); | 502 base::Bind(&CastSocket::DoWriteLoop, base::Unretained(this))); |
462 } | 503 } |
463 | 504 |
464 int CastSocket::DoWriteComplete(int result) { | 505 int CastSocket::DoWriteComplete(int result) { |
465 DCHECK(!write_queue_.empty()); | 506 DCHECK(!write_queue_.empty()); |
466 if (result <= 0) { // NOTE that 0 also indicates an error | 507 if (result <= 0) { // NOTE that 0 also indicates an error |
467 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; | 508 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; |
468 write_state_ = WRITE_STATE_ERROR; | 509 write_state_ = WRITE_STATE_ERROR; |
469 return result == 0 ? net::ERR_FAILED : result; | 510 return result == 0 ? net::ERR_FAILED : result; |
470 } | 511 } |
471 | 512 |
472 // Some bytes were successfully written | 513 // Some bytes were successfully written |
473 WriteRequest& request = write_queue_.front(); | 514 WriteRequest& request = write_queue_.front(); |
474 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; | 515 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; |
475 io_buffer->DidConsume(result); | 516 io_buffer->DidConsume(result); |
476 if (io_buffer->BytesRemaining() == 0) // Message fully sent | 517 if (io_buffer->BytesRemaining() == 0) // Message fully sent |
477 write_state_ = WRITE_STATE_DO_CALLBACK; | 518 write_state_ = WRITE_STATE_DO_CALLBACK; |
478 else | 519 else |
479 write_state_ = WRITE_STATE_WRITE; | 520 write_state_ = WRITE_STATE_WRITE; |
480 | 521 |
481 return net::OK; | 522 return net::OK; |
482 } | 523 } |
483 | 524 |
484 int CastSocket::DoWriteCallback() { | 525 int CastSocket::DoWriteCallback() { |
485 DCHECK(!write_queue_.empty()); | 526 DCHECK(!write_queue_.empty()); |
527 write_state_ = WRITE_STATE_WRITE; | |
486 WriteRequest& request = write_queue_.front(); | 528 WriteRequest& request = write_queue_.front(); |
487 int bytes_consumed = request.io_buffer->BytesConsumed(); | 529 int bytes_consumed = request.io_buffer->BytesConsumed(); |
488 | 530 request.callback.Run(bytes_consumed); |
489 // If inside connection flow, then there should be exaclty one item in | 531 write_queue_.pop(); |
490 // the write queue. | |
491 if (ready_state_ == READY_STATE_CONNECTING) { | |
492 write_queue_.pop(); | |
493 DCHECK(write_queue_.empty()); | |
494 PostTaskToStartConnectLoop(bytes_consumed); | |
495 } else { | |
496 WriteRequest& request = write_queue_.front(); | |
497 request.callback.Run(bytes_consumed); | |
498 write_queue_.pop(); | |
499 } | |
500 write_state_ = WRITE_STATE_WRITE; | |
501 return net::OK; | 532 return net::OK; |
502 } | 533 } |
503 | 534 |
504 int CastSocket::DoWriteError(int result) { | 535 int CastSocket::DoWriteError(int result) { |
505 DCHECK(!write_queue_.empty()); | 536 DCHECK(!write_queue_.empty()); |
506 DCHECK_LT(result, 0); | 537 DCHECK_LT(result, 0); |
507 | 538 |
508 // If inside connection flow, then there should be exactly one item in | 539 // If inside connection flow, then there should be exactly one item in |
509 // the write queue. | 540 // the write queue. |
510 if (ready_state_ == READY_STATE_CONNECTING) { | 541 if (ready_state_ == READY_STATE_CONNECTING) { |
511 write_queue_.pop(); | 542 write_queue_.pop(); |
512 DCHECK(write_queue_.empty()); | 543 DCHECK(write_queue_.empty()); |
513 PostTaskToStartConnectLoop(result); | 544 PostTaskToStartConnectLoop(result); |
514 // Connect loop will handle the error. Return net::OK so that write flow | 545 // Connect loop will handle the error. Return net::OK so that write flow |
515 // does not try to report error also. | 546 // does not try to report error also. |
516 return net::OK; | 547 return net::OK; |
517 } | 548 } |
518 | 549 |
519 while (!write_queue_.empty()) { | 550 while (!write_queue_.empty()) { |
520 WriteRequest& request = write_queue_.front(); | 551 WriteRequest& request = write_queue_.front(); |
521 request.callback.Run(result); | 552 request.callback.Run(result); |
522 write_queue_.pop(); | 553 write_queue_.pop(); |
523 } | 554 } |
524 return net::ERR_FAILED; | 555 return net::ERR_FAILED; |
525 } | 556 } |
526 | 557 |
527 void CastSocket::PostTaskToStartReadLoop() { | 558 void CastSocket::PostTaskToStartReadLoop() { |
528 DCHECK(CalledOnValidThread()); | 559 DCHECK(CalledOnValidThread()); |
529 base::MessageLoop::current()->PostTask( | 560 DCHECK(read_loop_callback_.IsCancelled()); |
530 FROM_HERE, | 561 read_loop_callback_.Reset(base::Bind(&CastSocket::StartReadLoop, |
531 base::Bind(&CastSocket::StartReadLoop, AsWeakPtr())); | 562 base::Unretained(this))); |
563 base::MessageLoop::current()->PostTask(FROM_HERE, | |
564 read_loop_callback_.callback()); | |
532 } | 565 } |
533 | 566 |
534 void CastSocket::StartReadLoop() { | 567 void CastSocket::StartReadLoop() { |
568 read_loop_callback_.Cancel(); | |
535 // Read loop would have already been started if read state is not NONE | 569 // Read loop would have already been started if read state is not NONE |
536 if (read_state_ == READ_STATE_NONE) { | 570 if (read_state_ == READ_STATE_NONE) { |
537 read_state_ = READ_STATE_READ; | 571 read_state_ = READ_STATE_READ; |
538 DoReadLoop(net::OK); | 572 DoReadLoop(net::OK); |
539 } | 573 } |
540 } | 574 } |
541 | 575 |
542 void CastSocket::DoReadLoop(int result) { | 576 void CastSocket::DoReadLoop(int result) { |
543 DCHECK(CalledOnValidThread()); | 577 DCHECK(CalledOnValidThread()); |
544 // Network operations can either finish synchronously or asynchronously. | 578 // Network operations can either finish synchronously or asynchronously. |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
596 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); | 630 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); |
597 current_read_buffer_ = body_read_buffer_; | 631 current_read_buffer_ = body_read_buffer_; |
598 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); | 632 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); |
599 } | 633 } |
600 CHECK_GT(num_bytes_to_read, 0U); | 634 CHECK_GT(num_bytes_to_read, 0U); |
601 | 635 |
602 // Read up to num_bytes_to_read into |current_read_buffer_|. | 636 // Read up to num_bytes_to_read into |current_read_buffer_|. |
603 return socket_->Read( | 637 return socket_->Read( |
604 current_read_buffer_.get(), | 638 current_read_buffer_.get(), |
605 num_bytes_to_read, | 639 num_bytes_to_read, |
606 base::Bind(&CastSocket::DoReadLoop, AsWeakPtr())); | 640 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); |
607 } | 641 } |
608 | 642 |
609 int CastSocket::DoReadComplete(int result) { | 643 int CastSocket::DoReadComplete(int result) { |
610 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result | 644 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result |
611 << " header offset = " | 645 << " header offset = " |
612 << header_read_buffer_->offset() | 646 << header_read_buffer_->offset() |
613 << " body offset = " << body_read_buffer_->offset(); | 647 << " body offset = " << body_read_buffer_->offset(); |
614 if (result <= 0) { // 0 means EOF: the peer closed the socket | 648 if (result <= 0) { // 0 means EOF: the peer closed the socket |
615 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; | 649 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; |
616 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; | 650 error_state_ = CHANNEL_ERROR_SOCKET_ERROR; |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
716 return false; | 750 return false; |
717 } | 751 } |
718 CastSocket::MessageHeader header; | 752 CastSocket::MessageHeader header; |
719 header.SetMessageSize(message_size); | 753 header.SetMessageSize(message_size); |
720 header.PrependToString(message_data); | 754 header.PrependToString(message_data); |
721 return true; | 755 return true; |
722 } | 756 } |
723 | 757 |
724 void CastSocket::CloseWithError(ChannelError error) { | 758 void CastSocket::CloseWithError(ChannelError error) { |
725 DCHECK(CalledOnValidThread()); | 759 DCHECK(CalledOnValidThread()); |
726 socket_.reset(NULL); | 760 CloseInternal(); |
727 ready_state_ = READY_STATE_CLOSED; | |
728 error_state_ = error; | 761 error_state_ = error; |
729 if (delegate_) | 762 if (delegate_) |
730 delegate_->OnError(this, error); | 763 delegate_->OnError(this, error); |
731 } | 764 } |
732 | 765 |
733 std::string CastSocket::CastUrl() const { | 766 std::string CastSocket::CastUrl() const { |
734 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? | 767 return ((channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED) ? |
735 "casts://" : "cast://") + ip_endpoint_.ToString(); | 768 "casts://" : "cast://") + ip_endpoint_.ToString(); |
736 } | 769 } |
737 | 770 |
(...skipping 11 matching lines...) Expand all Loading... | |
749 DCHECK_LT(size, static_cast<size_t>(kuint32max)); | 782 DCHECK_LT(size, static_cast<size_t>(kuint32max)); |
750 DCHECK_GT(size, 0U); | 783 DCHECK_GT(size, 0U); |
751 message_size = size; | 784 message_size = size; |
752 } | 785 } |
753 | 786 |
754 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, | 787 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, |
755 // if bit-for-bit compatible. | 788 // if bit-for-bit compatible. |
756 void CastSocket::MessageHeader::PrependToString(std::string* str) { | 789 void CastSocket::MessageHeader::PrependToString(std::string* str) { |
757 MessageHeader output = *this; | 790 MessageHeader output = *this; |
758 output.message_size = base::HostToNet32(message_size); | 791 output.message_size = base::HostToNet32(message_size); |
759 size_t header_size = base::checked_cast<size_t,uint32>( | 792 size_t header_size = base::checked_cast<size_t, uint32>( |
760 MessageHeader::header_size()); | 793 MessageHeader::header_size()); |
761 scoped_ptr<char, base::FreeDeleter> char_array( | 794 scoped_ptr<char, base::FreeDeleter> char_array( |
762 static_cast<char*>(malloc(header_size))); | 795 static_cast<char*>(malloc(header_size))); |
763 memcpy(char_array.get(), &output, header_size); | 796 memcpy(char_array.get(), &output, header_size); |
764 str->insert(0, char_array.get(), header_size); | 797 str->insert(0, char_array.get(), header_size); |
765 } | 798 } |
766 | 799 |
767 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, | 800 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, |
768 // if bit-for-bit compatible. | 801 // if bit-for-bit compatible. |
769 void CastSocket::MessageHeader::ReadFromIOBuffer( | 802 void CastSocket::MessageHeader::ReadFromIOBuffer( |
770 net::GrowableIOBuffer* buffer, MessageHeader* header) { | 803 net::GrowableIOBuffer* buffer, MessageHeader* header) { |
771 uint32 message_size; | 804 uint32 message_size; |
772 size_t header_size = base::checked_cast<size_t,uint32>( | 805 size_t header_size = base::checked_cast<size_t, uint32>( |
773 MessageHeader::header_size()); | 806 MessageHeader::header_size()); |
774 memcpy(&message_size, buffer->StartOfBuffer(), header_size); | 807 memcpy(&message_size, buffer->StartOfBuffer(), header_size); |
775 header->message_size = base::NetToHost32(message_size); | 808 header->message_size = base::NetToHost32(message_size); |
776 } | 809 } |
777 | 810 |
778 std::string CastSocket::MessageHeader::ToString() { | 811 std::string CastSocket::MessageHeader::ToString() { |
779 return "{message_size: " + base::UintToString(message_size) + "}"; | 812 return "{message_size: " + base::UintToString(message_size) + "}"; |
780 } | 813 } |
781 | 814 |
782 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) | 815 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) |
783 : callback(callback) { } | 816 : callback(callback) { } |
784 | 817 |
785 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { | 818 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { |
786 DCHECK(!io_buffer.get()); | 819 DCHECK(!io_buffer.get()); |
787 std::string message_data; | 820 std::string message_data; |
788 if (!Serialize(message_proto, &message_data)) | 821 if (!Serialize(message_proto, &message_data)) |
789 return false; | 822 return false; |
790 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), | 823 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), |
791 message_data.size()); | 824 message_data.size()); |
792 return true; | 825 return true; |
793 } | 826 } |
794 | 827 |
795 CastSocket::WriteRequest::~WriteRequest() { } | 828 CastSocket::WriteRequest::~WriteRequest() { } |
796 | 829 |
797 } // namespace cast_channel | 830 } // namespace cast_channel |
798 } // namespace core_api | 831 } // namespace core_api |
799 } // namespace extensions | 832 } // namespace extensions |
800 | 833 |
801 #undef VLOG_WITH_CONNECTION | 834 #undef VLOG_WITH_CONNECTION |
OLD | NEW |