Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "extensions/browser/api/cast_channel/cast_socket.h" | 5 #include "extensions/browser/api/cast_channel/cast_socket.h" |
| 6 | 6 |
| 7 #include <stdlib.h> | 7 #include <stdlib.h> |
| 8 #include <string.h> | 8 #include <string.h> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 29 matching lines...) Expand all Loading... | |
| 40 #define VLOG_WITH_CONNECTION(level) VLOG(level) << "[" << \ | 40 #define VLOG_WITH_CONNECTION(level) VLOG(level) << "[" << \ |
| 41 ip_endpoint_.ToString() << ", auth=" << channel_auth_ << "] " | 41 ip_endpoint_.ToString() << ", auth=" << channel_auth_ << "] " |
| 42 | 42 |
| 43 namespace { | 43 namespace { |
| 44 | 44 |
| 45 // The default keepalive delay. On Linux, keepalives probes will be sent after | 45 // The default keepalive delay. On Linux, keepalives probes will be sent after |
| 46 // the socket is idle for this length of time, and the socket will be closed | 46 // the socket is idle for this length of time, and the socket will be closed |
| 47 // after 9 failed probes. So the total idle time before close is 10 * | 47 // after 9 failed probes. So the total idle time before close is 10 * |
| 48 // kTcpKeepAliveDelaySecs. | 48 // kTcpKeepAliveDelaySecs. |
| 49 const int kTcpKeepAliveDelaySecs = 10; | 49 const int kTcpKeepAliveDelaySecs = 10; |
| 50 | 50 // Size of a CastSocket header payload. |
| 51 const size_t kHeaderSizeBytes = sizeof(int32); | |
| 52 // Maximum byte count for a CastSocket message. | |
| 53 const size_t kMaxMessageSizeBytes = 65536; | |
| 51 } // namespace | 54 } // namespace |
| 52 | 55 |
| 53 namespace extensions { | 56 namespace extensions { |
| 54 | 57 |
| 55 static base::LazyInstance<BrowserContextKeyedAPIFactory< | 58 static base::LazyInstance<BrowserContextKeyedAPIFactory< |
| 56 ApiResourceManager<core_api::cast_channel::CastSocket> > > g_factory = | 59 ApiResourceManager<core_api::cast_channel::CastSocket> > > g_factory = |
| 57 LAZY_INSTANCE_INITIALIZER; | 60 LAZY_INSTANCE_INITIALIZER; |
| 58 | 61 |
| 59 // static | 62 // static |
| 60 template <> | 63 template <> |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 182 ChannelAuthType channel_auth, | 185 ChannelAuthType channel_auth, |
| 183 CastSocket::Delegate* delegate, | 186 CastSocket::Delegate* delegate, |
| 184 net::NetLog* net_log, | 187 net::NetLog* net_log, |
| 185 const base::TimeDelta& timeout, | 188 const base::TimeDelta& timeout, |
| 186 const scoped_refptr<Logger>& logger) | 189 const scoped_refptr<Logger>& logger) |
| 187 : ApiResource(owner_extension_id), | 190 : ApiResource(owner_extension_id), |
| 188 channel_id_(0), | 191 channel_id_(0), |
| 189 ip_endpoint_(ip_endpoint), | 192 ip_endpoint_(ip_endpoint), |
| 190 channel_auth_(channel_auth), | 193 channel_auth_(channel_auth), |
| 191 delegate_(delegate), | 194 delegate_(delegate), |
| 192 current_message_size_(0), | |
| 193 current_message_(new CastMessage()), | |
| 194 net_log_(net_log), | 195 net_log_(net_log), |
| 195 logger_(logger), | 196 logger_(logger), |
| 196 connect_timeout_(timeout), | 197 connect_timeout_(timeout), |
| 197 connect_timeout_timer_(new base::OneShotTimer<CastSocket>), | 198 connect_timeout_timer_(new base::OneShotTimer<CastSocket>), |
| 198 is_canceled_(false), | 199 is_canceled_(false), |
| 200 current_message_(new CastMessage), | |
| 199 connect_state_(CONN_STATE_NONE), | 201 connect_state_(CONN_STATE_NONE), |
| 200 write_state_(WRITE_STATE_NONE), | 202 write_state_(WRITE_STATE_NONE), |
| 201 read_state_(READ_STATE_NONE), | 203 read_state_(READ_STATE_NONE), |
| 202 error_state_(CHANNEL_ERROR_NONE), | 204 error_state_(CHANNEL_ERROR_NONE), |
| 203 ready_state_(READY_STATE_NONE) { | 205 ready_state_(READY_STATE_NONE) { |
| 204 DCHECK(net_log_); | 206 DCHECK(net_log_); |
| 205 DCHECK(channel_auth_ == CHANNEL_AUTH_TYPE_SSL || | 207 DCHECK(channel_auth_ == CHANNEL_AUTH_TYPE_SSL || |
| 206 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED); | 208 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED); |
| 207 net_log_source_.type = net::NetLog::SOURCE_SOCKET; | 209 net_log_source_.type = net::NetLog::SOURCE_SOCKET; |
| 208 net_log_source_.id = net_log_->NextID(); | 210 net_log_source_.id = net_log_->NextID(); |
| 209 | 211 |
| 210 // Reuse these buffers for each message. | 212 // Buffer is reused across messages. |
| 211 header_read_buffer_ = new net::GrowableIOBuffer(); | 213 read_buffer_ = new net::GrowableIOBuffer(); |
| 212 header_read_buffer_->SetCapacity(MessageHeader::header_size()); | 214 read_buffer_->SetCapacity(kMaxMessageSizeBytes); |
| 213 body_read_buffer_ = new net::GrowableIOBuffer(); | 215 framer_.reset(new PacketFramer(read_buffer_)); |
| 214 body_read_buffer_->SetCapacity(MessageHeader::max_message_size()); | |
| 215 current_read_buffer_ = header_read_buffer_; | |
| 216 } | 216 } |
| 217 | 217 |
| 218 CastSocket::~CastSocket() { | 218 CastSocket::~CastSocket() { |
| 219 // Ensure that resources are freed but do not run pending callbacks to avoid | 219 // Ensure that resources are freed but do not run pending callbacks to avoid |
| 220 // any re-entrancy. | 220 // any re-entrancy. |
| 221 CloseInternal(); | 221 CloseInternal(); |
| 222 } | 222 } |
| 223 | 223 |
| 224 ReadyState CastSocket::ready_state() const { | 224 ReadyState CastSocket::ready_state() const { |
| 225 return ready_state_; | 225 return ready_state_; |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 265 ip_endpoint_); | 265 ip_endpoint_); |
| 266 | 266 |
| 267 return net::ClientSocketFactory::GetDefaultFactory()->CreateSSLClientSocket( | 267 return net::ClientSocketFactory::GetDefaultFactory()->CreateSSLClientSocket( |
| 268 connection.Pass(), host_and_port, ssl_config, context); | 268 connection.Pass(), host_and_port, ssl_config, context); |
| 269 } | 269 } |
| 270 | 270 |
| 271 bool CastSocket::ExtractPeerCert(std::string* cert) { | 271 bool CastSocket::ExtractPeerCert(std::string* cert) { |
| 272 DCHECK(cert); | 272 DCHECK(cert); |
| 273 DCHECK(peer_cert_.empty()); | 273 DCHECK(peer_cert_.empty()); |
| 274 net::SSLInfo ssl_info; | 274 net::SSLInfo ssl_info; |
| 275 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) | 275 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) { |
| 276 return false; | 276 return false; |
| 277 } | |
| 277 | 278 |
| 278 logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED); | 279 logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED); |
| 279 | 280 |
| 280 bool result = net::X509Certificate::GetDEREncoded( | 281 bool result = net::X509Certificate::GetDEREncoded( |
| 281 ssl_info.cert->os_cert_handle(), cert); | 282 ssl_info.cert->os_cert_handle(), cert); |
| 282 if (result) | 283 if (result) { |
| 283 VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: " | 284 VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: " |
| 284 << *cert; | 285 << *cert; |
| 286 } | |
| 285 | 287 |
| 286 logger_->LogSocketEventWithRv( | 288 logger_->LogSocketEventWithRv( |
| 287 channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0); | 289 channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0); |
| 288 return result; | 290 return result; |
| 289 } | 291 } |
| 290 | 292 |
| 291 bool CastSocket::VerifyChallengeReply() { | 293 bool CastSocket::VerifyChallengeReply() { |
| 292 AuthResult result = AuthenticateChallengeReply(*challenge_reply_, peer_cert_); | 294 AuthResult result = AuthenticateChallengeReply(*challenge_reply_, peer_cert_); |
| 293 logger_->LogSocketChallengeReplyEvent(channel_id_, result); | 295 logger_->LogSocketChallengeReplyEvent(channel_id_, result); |
| 294 return result.success(); | 296 return result.success(); |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 481 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { | 483 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { |
| 482 send_auth_challenge_callback_.Cancel(); | 484 send_auth_challenge_callback_.Cancel(); |
| 483 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; | 485 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; |
| 484 DCHECK_GT(result, 0); | 486 DCHECK_GT(result, 0); |
| 485 DCHECK_EQ(write_queue_.size(), 1UL); | 487 DCHECK_EQ(write_queue_.size(), 1UL); |
| 486 PostTaskToStartConnectLoop(result); | 488 PostTaskToStartConnectLoop(result); |
| 487 } | 489 } |
| 488 | 490 |
| 489 int CastSocket::DoAuthChallengeSendComplete(int result) { | 491 int CastSocket::DoAuthChallengeSendComplete(int result) { |
| 490 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; | 492 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; |
| 491 if (result < 0) | 493 if (result < 0) { |
| 492 return result; | 494 return result; |
| 495 } | |
| 493 SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE); | 496 SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE); |
| 494 | 497 |
| 495 // Post a task to start read loop so that DoReadLoop is not nested inside | 498 // Post a task to start read loop so that DoReadLoop is not nested inside |
| 496 // DoConnectLoop. This is not strictly necessary but keeps the read loop | 499 // DoConnectLoop. This is not strictly necessary but keeps the read loop |
| 497 // code decoupled from connect loop code. | 500 // code decoupled from connect loop code. |
| 498 PostTaskToStartReadLoop(); | 501 PostTaskToStartReadLoop(); |
| 499 // Always return IO_PENDING since the result is always asynchronous. | 502 // Always return IO_PENDING since the result is always asynchronous. |
| 500 return net::ERR_IO_PENDING; | 503 return net::ERR_IO_PENDING; |
| 501 } | 504 } |
| 502 | 505 |
| 503 int CastSocket::DoAuthChallengeReplyComplete(int result) { | 506 int CastSocket::DoAuthChallengeReplyComplete(int result) { |
| 504 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result; | 507 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result; |
| 505 if (result < 0) | 508 if (result < 0) { |
| 506 return result; | 509 return result; |
| 507 if (!VerifyChallengeReply()) | 510 } |
| 511 if (!VerifyChallengeReply()) { | |
| 508 return net::ERR_FAILED; | 512 return net::ERR_FAILED; |
| 513 } | |
| 509 VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded"; | 514 VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded"; |
| 510 return net::OK; | 515 return net::OK; |
| 511 } | 516 } |
| 512 | 517 |
| 513 void CastSocket::DoConnectCallback(int result) { | 518 void CastSocket::DoConnectCallback(int result) { |
| 514 SetReadyState((result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED); | 519 SetReadyState((result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED); |
| 515 if (result == net::OK) { | 520 if (result == net::OK) { |
| 516 SetErrorState(CHANNEL_ERROR_NONE); | 521 SetErrorState(CHANNEL_ERROR_NONE); |
| 517 PostTaskToStartReadLoop(); | 522 PostTaskToStartReadLoop(); |
| 518 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; | 523 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; |
| (...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 659 write_state_ != WRITE_STATE_NONE); | 664 write_state_ != WRITE_STATE_NONE); |
| 660 | 665 |
| 661 // No state change occurred in do-while loop above. This means state has | 666 // No state change occurred in do-while loop above. This means state has |
| 662 // transitioned to NONE. | 667 // transitioned to NONE. |
| 663 if (write_state_ == WRITE_STATE_NONE) { | 668 if (write_state_ == WRITE_STATE_NONE) { |
| 664 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); | 669 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); |
| 665 } | 670 } |
| 666 | 671 |
| 667 // If write loop is done because the queue is empty then set write | 672 // If write loop is done because the queue is empty then set write |
| 668 // state to NONE | 673 // state to NONE |
| 669 if (write_queue_.empty()) | 674 if (write_queue_.empty()) { |
| 670 SetWriteState(WRITE_STATE_NONE); | 675 SetWriteState(WRITE_STATE_NONE); |
| 676 } | |
| 671 | 677 |
| 672 // Write loop is done - if the result is ERR_FAILED then close with error. | 678 // Write loop is done - if the result is ERR_FAILED then close with error. |
| 673 if (rv == net::ERR_FAILED) | 679 if (rv == net::ERR_FAILED) { |
| 674 CloseWithError(); | 680 CloseWithError(); |
| 681 } | |
| 675 } | 682 } |
| 676 | 683 |
| 677 int CastSocket::DoWrite() { | 684 int CastSocket::DoWrite() { |
| 678 DCHECK(!write_queue_.empty()); | 685 DCHECK(!write_queue_.empty()); |
| 679 WriteRequest& request = write_queue_.front(); | 686 WriteRequest& request = write_queue_.front(); |
| 680 | 687 |
| 681 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " | 688 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " |
| 682 << request.io_buffer->size() << " bytes_written " | 689 << request.io_buffer->size() << " bytes_written " |
| 683 << request.io_buffer->BytesConsumed(); | 690 << request.io_buffer->BytesConsumed(); |
| 684 | 691 |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 698 if (result <= 0) { // NOTE that 0 also indicates an error | 705 if (result <= 0) { // NOTE that 0 also indicates an error |
| 699 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); | 706 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); |
| 700 SetWriteState(WRITE_STATE_ERROR); | 707 SetWriteState(WRITE_STATE_ERROR); |
| 701 return result == 0 ? net::ERR_FAILED : result; | 708 return result == 0 ? net::ERR_FAILED : result; |
| 702 } | 709 } |
| 703 | 710 |
| 704 // Some bytes were successfully written | 711 // Some bytes were successfully written |
| 705 WriteRequest& request = write_queue_.front(); | 712 WriteRequest& request = write_queue_.front(); |
| 706 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; | 713 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; |
| 707 io_buffer->DidConsume(result); | 714 io_buffer->DidConsume(result); |
| 708 if (io_buffer->BytesRemaining() == 0) // Message fully sent | 715 if (io_buffer->BytesRemaining() == 0) { // Message fully sent |
| 709 SetWriteState(WRITE_STATE_DO_CALLBACK); | 716 SetWriteState(WRITE_STATE_DO_CALLBACK); |
| 710 else | 717 } else { |
| 711 SetWriteState(WRITE_STATE_WRITE); | 718 SetWriteState(WRITE_STATE_WRITE); |
| 719 } | |
| 712 | 720 |
| 713 return net::OK; | 721 return net::OK; |
| 714 } | 722 } |
| 715 | 723 |
| 716 int CastSocket::DoWriteCallback() { | 724 int CastSocket::DoWriteCallback() { |
| 717 DCHECK(!write_queue_.empty()); | 725 DCHECK(!write_queue_.empty()); |
| 718 | 726 |
| 719 SetWriteState(WRITE_STATE_WRITE); | 727 SetWriteState(WRITE_STATE_WRITE); |
| 720 | 728 |
| 721 WriteRequest& request = write_queue_.front(); | 729 WriteRequest& request = write_queue_.front(); |
| (...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 817 } else { | 825 } else { |
| 818 // Connection is already established. Close and send error status via the | 826 // Connection is already established. Close and send error status via the |
| 819 // OnError delegate. | 827 // OnError delegate. |
| 820 CloseWithError(); | 828 CloseWithError(); |
| 821 } | 829 } |
| 822 } | 830 } |
| 823 } | 831 } |
| 824 | 832 |
| 825 int CastSocket::DoRead() { | 833 int CastSocket::DoRead() { |
| 826 SetReadState(READ_STATE_READ_COMPLETE); | 834 SetReadState(READ_STATE_READ_COMPLETE); |
| 827 // Figure out whether to read header or body, and the remaining bytes. | 835 |
| 828 uint32 num_bytes_to_read = 0; | 836 // Determine how many bytes need to be read. |
| 829 if (header_read_buffer_->RemainingCapacity() > 0) { | 837 uint32 num_bytes_to_read = framer_->BytesRequested(); |
|
mark a. foltz
2014/08/26 20:37:02
Slightly prefer that this be a size_t and then is
Kevin M
2014/08/27 01:14:03
Done.
| |
| 830 current_read_buffer_ = header_read_buffer_; | |
| 831 num_bytes_to_read = header_read_buffer_->RemainingCapacity(); | |
| 832 CHECK_LE(num_bytes_to_read, MessageHeader::header_size()); | |
| 833 } else { | |
| 834 DCHECK_GT(current_message_size_, 0U); | |
| 835 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset(); | |
| 836 current_read_buffer_ = body_read_buffer_; | |
| 837 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size()); | |
| 838 } | |
| 839 CHECK_GT(num_bytes_to_read, 0U); | |
| 840 | 838 |
| 841 // Read up to num_bytes_to_read into |current_read_buffer_|. | 839 // Read up to num_bytes_to_read into |current_read_buffer_|. |
| 842 int rv = socket_->Read( | 840 int rv = socket_->Read( |
| 843 current_read_buffer_.get(), | 841 read_buffer_.get(), |
| 844 num_bytes_to_read, | 842 num_bytes_to_read, |
| 845 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); | 843 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); |
| 846 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv); | 844 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv); |
| 847 | 845 |
| 848 return rv; | 846 return rv; |
| 849 } | 847 } |
| 850 | 848 |
| 851 int CastSocket::DoReadComplete(int result) { | 849 int CastSocket::DoReadComplete(int result) { |
| 852 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result | 850 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result; |
| 853 << " header offset = " | 851 |
| 854 << header_read_buffer_->offset() | |
| 855 << " body offset = " << body_read_buffer_->offset(); | |
| 856 if (result <= 0) { // 0 means EOF: the peer closed the socket | 852 if (result <= 0) { // 0 means EOF: the peer closed the socket |
| 857 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; | 853 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; |
| 858 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); | 854 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); |
| 859 SetReadState(READ_STATE_ERROR); | 855 SetReadState(READ_STATE_ERROR); |
| 860 return result == 0 ? net::ERR_FAILED : result; | 856 return result == 0 ? net::ERR_FAILED : result; |
| 861 } | 857 } |
| 862 | 858 |
| 863 // Some data was read. Move the offset in the current buffer forward. | 859 size_t message_size; |
| 864 CHECK_LE(current_read_buffer_->offset() + result, | 860 if (framer_->Ingest( |
| 865 current_read_buffer_->capacity()); | 861 result, current_message_.get(), &message_size, &error_state_)) { |
| 866 current_read_buffer_->set_offset(current_read_buffer_->offset() + result); | 862 logger_->LogSocketEventForMessage( |
| 867 | 863 channel_id_, |
| 868 if (current_read_buffer_.get() == header_read_buffer_.get() && | 864 proto::MESSAGE_READ, |
| 869 current_read_buffer_->RemainingCapacity() == 0) { | 865 current_message_->namespace_(), |
| 870 // A full header is read, process the contents. | 866 base::StringPrintf("Message size: %zu", message_size)); |
| 871 if (!ProcessHeader()) { | 867 SetReadState(READ_STATE_DO_CALLBACK); |
| 872 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); | 868 } else if (error_state_ != CHANNEL_ERROR_NONE) { |
| 873 SetReadState(READ_STATE_ERROR); | 869 SetReadState(READ_STATE_ERROR); |
| 874 } else { | |
| 875 // Processed header, now read the body. | |
| 876 SetReadState(READ_STATE_READ); | |
| 877 } | |
| 878 } else if (current_read_buffer_.get() == body_read_buffer_.get() && | |
| 879 static_cast<uint32>(current_read_buffer_->offset()) == | |
| 880 current_message_size_) { | |
| 881 // Store a copy of current_message_size_ since it will be reset by | |
| 882 // ProcessBody(). | |
| 883 uint32 message_size = current_message_size_; | |
| 884 // Full body is read, process the contents. | |
| 885 if (ProcessBody()) { | |
| 886 logger_->LogSocketEventForMessage( | |
| 887 channel_id_, | |
| 888 proto::MESSAGE_READ, | |
| 889 current_message_->namespace_(), | |
| 890 base::StringPrintf("Message size: %u", message_size)); | |
| 891 SetReadState(READ_STATE_DO_CALLBACK); | |
| 892 } else { | |
| 893 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); | |
| 894 SetReadState(READ_STATE_ERROR); | |
| 895 } | |
| 896 } else { | 870 } else { |
| 897 // Have not received full header or full body yet; keep reading. | |
| 898 SetReadState(READ_STATE_READ); | 871 SetReadState(READ_STATE_READ); |
| 899 } | 872 } |
| 900 | |
| 901 return net::OK; | 873 return net::OK; |
| 902 } | 874 } |
| 903 | 875 |
| 904 int CastSocket::DoReadCallback() { | 876 int CastSocket::DoReadCallback() { |
| 905 SetReadState(READ_STATE_READ); | 877 SetReadState(READ_STATE_READ); |
| 906 const CastMessage& message = *current_message_; | 878 const CastMessage& message = *current_message_; |
| 907 if (ready_state_ == READY_STATE_CONNECTING) { | 879 if (ready_state_ == READY_STATE_CONNECTING) { |
| 908 if (IsAuthMessage(message)) { | 880 if (IsAuthMessage(message)) { |
| 909 challenge_reply_.reset(new CastMessage(message)); | 881 challenge_reply_.reset(new CastMessage(message)); |
| 910 logger_->LogSocketEvent(channel_id_, proto::RECEIVED_CHALLENGE_REPLY); | 882 logger_->LogSocketEvent(channel_id_, proto::RECEIVED_CHALLENGE_REPLY); |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 933 current_message_->Clear(); | 905 current_message_->Clear(); |
| 934 | 906 |
| 935 return net::OK; | 907 return net::OK; |
| 936 } | 908 } |
| 937 | 909 |
| 938 int CastSocket::DoReadError(int result) { | 910 int CastSocket::DoReadError(int result) { |
| 939 DCHECK_LE(result, 0); | 911 DCHECK_LE(result, 0); |
| 940 return net::ERR_FAILED; | 912 return net::ERR_FAILED; |
| 941 } | 913 } |
| 942 | 914 |
| 943 bool CastSocket::ProcessHeader() { | |
| 944 CHECK_EQ(static_cast<uint32>(header_read_buffer_->offset()), | |
| 945 MessageHeader::header_size()); | |
| 946 MessageHeader header; | |
| 947 MessageHeader::ReadFromIOBuffer(header_read_buffer_.get(), &header); | |
| 948 if (header.message_size > MessageHeader::max_message_size()) | |
| 949 return false; | |
| 950 | 915 |
| 951 VLOG_WITH_CONNECTION(2) << "Parsed header { message_size: " | |
| 952 << header.message_size << " }"; | |
| 953 current_message_size_ = header.message_size; | |
| 954 return true; | |
| 955 } | |
| 956 | |
| 957 bool CastSocket::ProcessBody() { | |
| 958 CHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()), | |
| 959 current_message_size_); | |
| 960 if (!current_message_->ParseFromArray( | |
| 961 body_read_buffer_->StartOfBuffer(), current_message_size_)) { | |
| 962 return false; | |
| 963 } | |
| 964 current_message_size_ = 0; | |
| 965 header_read_buffer_->set_offset(0); | |
| 966 body_read_buffer_->set_offset(0); | |
| 967 current_read_buffer_ = header_read_buffer_; | |
| 968 return true; | |
| 969 } | |
| 970 | |
| 971 // static | |
| 972 bool CastSocket::Serialize(const CastMessage& message_proto, | |
| 973 std::string* message_data) { | |
| 974 DCHECK(message_data); | |
| 975 message_proto.SerializeToString(message_data); | |
| 976 size_t message_size = message_data->size(); | |
| 977 if (message_size > MessageHeader::max_message_size()) { | |
| 978 message_data->clear(); | |
| 979 return false; | |
| 980 } | |
| 981 CastSocket::MessageHeader header; | |
| 982 header.SetMessageSize(message_size); | |
| 983 header.PrependToString(message_data); | |
| 984 return true; | |
| 985 } | |
| 986 | 916 |
| 987 void CastSocket::CloseWithError() { | 917 void CastSocket::CloseWithError() { |
| 988 DCHECK(CalledOnValidThread()); | 918 DCHECK(CalledOnValidThread()); |
| 989 CloseInternal(); | 919 CloseInternal(); |
| 990 RunPendingCallbacksOnClose(); | 920 RunPendingCallbacksOnClose(); |
| 991 if (delegate_) { | 921 if (delegate_) { |
| 992 logger_->LogSocketEvent(channel_id_, proto::NOTIFY_ON_ERROR); | 922 logger_->LogSocketEvent(channel_id_, proto::NOTIFY_ON_ERROR); |
| 993 delegate_->OnError(this, error_state_, logger_->GetLastErrors(channel_id_)); | 923 delegate_->OnError(this, error_state_, logger_->GetLastErrors(channel_id_)); |
| 994 } | 924 } |
| 995 } | 925 } |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1036 } | 966 } |
| 1037 } | 967 } |
| 1038 | 968 |
| 1039 void CastSocket::SetWriteState(WriteState write_state) { | 969 void CastSocket::SetWriteState(WriteState write_state) { |
| 1040 if (write_state_ != write_state) { | 970 if (write_state_ != write_state) { |
| 1041 write_state_ = write_state; | 971 write_state_ = write_state; |
| 1042 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); | 972 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); |
| 1043 } | 973 } |
| 1044 } | 974 } |
| 1045 | 975 |
| 1046 CastSocket::MessageHeader::MessageHeader() : message_size(0) { } | 976 PacketFramer::PacketFramer(scoped_refptr<net::GrowableIOBuffer> buffer) |
| 977 : buffer_(buffer) { | |
| 978 Reset(); | |
| 979 } | |
| 1047 | 980 |
| 1048 void CastSocket::MessageHeader::SetMessageSize(size_t size) { | 981 PacketFramer::~PacketFramer() { |
| 982 } | |
| 983 | |
| 984 PacketFramer::MessageHeader::MessageHeader() : message_size(0) { | |
| 985 } | |
| 986 | |
| 987 void PacketFramer::MessageHeader::SetMessageSize(size_t size) { | |
| 1049 DCHECK_LT(size, static_cast<size_t>(kuint32max)); | 988 DCHECK_LT(size, static_cast<size_t>(kuint32max)); |
| 1050 DCHECK_GT(size, 0U); | 989 DCHECK_GT(size, 0U); |
| 1051 message_size = size; | 990 message_size = size; |
| 1052 } | 991 } |
| 1053 | 992 |
| 1054 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, | 993 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle, |
| 1055 // if bit-for-bit compatible. | 994 // if bit-for-bit compatible. |
| 1056 void CastSocket::MessageHeader::PrependToString(std::string* str) { | 995 void PacketFramer::MessageHeader::PrependToString(std::string* str) { |
| 1057 MessageHeader output = *this; | 996 MessageHeader output = *this; |
| 1058 output.message_size = base::HostToNet32(message_size); | 997 output.message_size = base::HostToNet32(message_size); |
| 1059 size_t header_size = base::checked_cast<size_t, uint32>( | 998 size_t header_size = base::checked_cast<size_t, uint32>( |
| 1060 MessageHeader::header_size()); | 999 MessageHeader::header_size()); |
| 1061 scoped_ptr<char, base::FreeDeleter> char_array( | 1000 scoped_ptr<char, base::FreeDeleter> char_array( |
| 1062 static_cast<char*>(malloc(header_size))); | 1001 static_cast<char*>(malloc(header_size))); |
| 1063 memcpy(char_array.get(), &output, header_size); | 1002 memcpy(char_array.get(), &output, header_size); |
| 1064 str->insert(0, char_array.get(), header_size); | 1003 str->insert(0, char_array.get(), header_size); |
| 1065 } | 1004 } |
| 1066 | 1005 |
| 1067 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, | 1006 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle, |
| 1068 // if bit-for-bit compatible. | 1007 // if bit-for-bit compatible. |
| 1069 void CastSocket::MessageHeader::ReadFromIOBuffer( | 1008 void PacketFramer::MessageHeader::Deserialize(char* data, |
| 1070 net::GrowableIOBuffer* buffer, MessageHeader* header) { | 1009 MessageHeader* header) { |
| 1071 uint32 message_size; | 1010 uint32 message_size; |
| 1072 size_t header_size = base::checked_cast<size_t, uint32>( | 1011 memcpy(&message_size, data, kHeaderSizeBytes); |
| 1073 MessageHeader::header_size()); | |
| 1074 memcpy(&message_size, buffer->StartOfBuffer(), header_size); | |
| 1075 header->message_size = base::NetToHost32(message_size); | 1012 header->message_size = base::NetToHost32(message_size); |
| 1076 } | 1013 } |
| 1077 | 1014 |
| 1078 std::string CastSocket::MessageHeader::ToString() { | 1015 // static |
| 1016 uint32 PacketFramer::MessageHeader::header_size() { | |
| 1017 return kHeaderSizeBytes; | |
| 1018 } | |
| 1019 | |
| 1020 // static | |
| 1021 uint32 PacketFramer::MessageHeader::max_message_size() { | |
| 1022 return kMaxMessageSizeBytes; | |
| 1023 } | |
| 1024 | |
| 1025 std::string PacketFramer::MessageHeader::ToString() { | |
| 1079 return "{message_size: " + base::UintToString(message_size) + "}"; | 1026 return "{message_size: " + base::UintToString(message_size) + "}"; |
| 1080 } | 1027 } |
| 1081 | 1028 |
| 1082 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) | 1029 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) |
| 1083 : callback(callback) { } | 1030 : callback(callback) { } |
| 1084 | 1031 |
| 1085 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { | 1032 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { |
| 1086 DCHECK(!io_buffer.get()); | 1033 DCHECK(!io_buffer.get()); |
| 1087 std::string message_data; | 1034 std::string message_data; |
| 1088 if (!Serialize(message_proto, &message_data)) | 1035 if (!PacketFramer::Serialize(message_proto, &message_data)) { |
| 1089 return false; | 1036 return false; |
| 1037 } | |
| 1090 message_namespace = message_proto.namespace_(); | 1038 message_namespace = message_proto.namespace_(); |
| 1091 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), | 1039 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), |
| 1092 message_data.size()); | 1040 message_data.size()); |
| 1093 return true; | 1041 return true; |
| 1094 } | 1042 } |
| 1095 | 1043 |
| 1096 CastSocket::WriteRequest::~WriteRequest() { } | 1044 CastSocket::WriteRequest::~WriteRequest() { } |
| 1097 | 1045 |
| 1046 // static | |
| 1047 bool PacketFramer::Serialize(const CastMessage& message_proto, | |
| 1048 std::string* message_data) { | |
| 1049 DCHECK(message_data); | |
| 1050 message_proto.SerializeToString(message_data); | |
| 1051 size_t message_size = message_data->size(); | |
| 1052 if (message_size > kMaxMessageSizeBytes) { | |
| 1053 message_data->clear(); | |
| 1054 return false; | |
| 1055 } | |
| 1056 MessageHeader header; | |
| 1057 header.SetMessageSize(message_size); | |
| 1058 header.PrependToString(message_data); | |
| 1059 return true; | |
| 1060 } | |
| 1061 | |
| 1062 size_t PacketFramer::BytesRequested() { | |
| 1063 if (current_element_ == HEADER) { | |
| 1064 size_t bytes_left = kHeaderSizeBytes - packet_bytes_read_; | |
| 1065 VLOG(2) << "Bytes needed for header: " << bytes_left; | |
| 1066 return bytes_left; | |
| 1067 } else if (current_element_ == BODY) { | |
| 1068 size_t bytes_left = (message_size_ + kHeaderSizeBytes) - packet_bytes_read_; | |
| 1069 VLOG(2) << "Bytes needed for body: " << bytes_left; | |
| 1070 return bytes_left; | |
| 1071 } else { | |
| 1072 NOTREACHED() << "Unhandled packet element type."; | |
| 1073 return 0; | |
| 1074 } | |
| 1075 } | |
| 1076 | |
| 1077 bool PacketFramer::Ingest(uint32 num_bytes, | |
| 1078 CastMessage* message, | |
|
mark a. foltz
2014/08/26 20:37:02
I feel like the message being read should be owned
Kevin M
2014/08/27 01:14:03
Done.
| |
| 1079 size_t* message_length, | |
| 1080 ChannelError* error) { | |
| 1081 DCHECK_EQ(base::checked_cast<int32>(packet_bytes_read_), buffer_->offset()); | |
| 1082 DCHECK(message); | |
| 1083 DCHECK(error); | |
| 1084 CHECK_LE(num_bytes, BytesRequested()); | |
| 1085 | |
| 1086 bool was_message_parsed = false; | |
| 1087 packet_bytes_read_ += num_bytes; | |
| 1088 *error = CHANNEL_ERROR_NONE; | |
| 1089 if (current_element_ == HEADER) { | |
| 1090 if (BytesRequested() == 0) { | |
| 1091 MessageHeader header; | |
| 1092 MessageHeader::Deserialize(buffer_.get()->StartOfBuffer(), &header); | |
| 1093 if (header.message_size > MessageHeader::max_message_size()) { | |
| 1094 VLOG(1) << "Error parsing header (message size too large)."; | |
| 1095 *error = CHANNEL_ERROR_INVALID_MESSAGE; | |
| 1096 return false; | |
| 1097 } | |
| 1098 current_element_ = BODY; | |
| 1099 message_size_ = header.message_size; | |
| 1100 } | |
| 1101 } else if (current_element_ == BODY) { | |
| 1102 if (BytesRequested() == 0) { | |
| 1103 CastMessage parsed_message; | |
| 1104 if (!parsed_message.ParseFromArray( | |
| 1105 buffer_->StartOfBuffer() + kHeaderSizeBytes, message_size_)) { | |
| 1106 VLOG(1) << "Error parsing packet body."; | |
| 1107 *error = CHANNEL_ERROR_INVALID_MESSAGE; | |
| 1108 return false; | |
| 1109 } | |
| 1110 parsed_message.Swap(message); | |
| 1111 *message_length = message_size_; | |
| 1112 was_message_parsed = true; | |
| 1113 Reset(); | |
| 1114 } | |
| 1115 } | |
| 1116 | |
| 1117 buffer_->set_offset(packet_bytes_read_); | |
| 1118 return was_message_parsed; | |
| 1119 } | |
| 1120 | |
| 1121 void PacketFramer::Reset() { | |
| 1122 current_element_ = HEADER; | |
| 1123 packet_bytes_read_ = 0; | |
| 1124 message_size_ = 0; | |
| 1125 buffer_->set_offset(0); | |
| 1126 } | |
| 1098 } // namespace cast_channel | 1127 } // namespace cast_channel |
| 1099 } // namespace core_api | 1128 } // namespace core_api |
| 1100 } // namespace extensions | 1129 } // namespace extensions |
| 1101 | 1130 |
| 1102 #undef VLOG_WITH_CONNECTION | 1131 #undef VLOG_WITH_CONNECTION |
| OLD | NEW |