Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: extensions/browser/api/cast_channel/cast_socket.cc

Issue 505453002: Create dedicated class for handling wire message formatting and parsing. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fixed include order Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "extensions/browser/api/cast_channel/cast_socket.h" 5 #include "extensions/browser/api/cast_channel/cast_socket.h"
6 6
7 #include <stdlib.h> 7 #include <stdlib.h>
8 #include <string.h> 8 #include <string.h>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/callback_helpers.h" 11 #include "base/callback_helpers.h"
12 #include "base/format_macros.h" 12 #include "base/format_macros.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/numerics/safe_conversions.h" 14 #include "base/numerics/safe_conversions.h"
15 #include "base/strings/string_number_conversions.h" 15 #include "base/strings/string_number_conversions.h"
16 #include "base/strings/stringprintf.h" 16 #include "base/strings/stringprintf.h"
17 #include "base/sys_byteorder.h" 17 #include "base/sys_byteorder.h"
18 #include "extensions/browser/api/cast_channel/cast_auth_util.h" 18 #include "extensions/browser/api/cast_channel/cast_auth_util.h"
19 #include "extensions/browser/api/cast_channel/cast_channel.pb.h" 19 #include "extensions/browser/api/cast_channel/cast_channel.pb.h"
20 #include "extensions/browser/api/cast_channel/cast_message_util.h" 20 #include "extensions/browser/api/cast_channel/cast_message_util.h"
21 #include "extensions/browser/api/cast_channel/cast_socket_framer.h"
21 #include "extensions/browser/api/cast_channel/logger.h" 22 #include "extensions/browser/api/cast_channel/logger.h"
22 #include "extensions/browser/api/cast_channel/logger_util.h" 23 #include "extensions/browser/api/cast_channel/logger_util.h"
23 #include "net/base/address_list.h" 24 #include "net/base/address_list.h"
24 #include "net/base/host_port_pair.h" 25 #include "net/base/host_port_pair.h"
25 #include "net/base/net_errors.h" 26 #include "net/base/net_errors.h"
26 #include "net/base/net_util.h" 27 #include "net/base/net_util.h"
27 #include "net/cert/cert_verifier.h" 28 #include "net/cert/cert_verifier.h"
28 #include "net/cert/x509_certificate.h" 29 #include "net/cert/x509_certificate.h"
29 #include "net/http/transport_security_state.h" 30 #include "net/http/transport_security_state.h"
30 #include "net/socket/client_socket_factory.h" 31 #include "net/socket/client_socket_factory.h"
31 #include "net/socket/client_socket_handle.h" 32 #include "net/socket/client_socket_handle.h"
32 #include "net/socket/ssl_client_socket.h" 33 #include "net/socket/ssl_client_socket.h"
33 #include "net/socket/stream_socket.h" 34 #include "net/socket/stream_socket.h"
34 #include "net/socket/tcp_client_socket.h" 35 #include "net/socket/tcp_client_socket.h"
35 #include "net/ssl/ssl_config_service.h" 36 #include "net/ssl/ssl_config_service.h"
36 #include "net/ssl/ssl_info.h" 37 #include "net/ssl/ssl_info.h"
37 38
38 // Assumes |ip_endpoint_| of type net::IPEndPoint and |channel_auth_| of enum 39 // Assumes |ip_endpoint_| of type net::IPEndPoint and |channel_auth_| of enum
39 // type ChannelAuthType are available in the current scope. 40 // type ChannelAuthType are available in the current scope.
40 #define VLOG_WITH_CONNECTION(level) VLOG(level) << "[" << \ 41 #define VLOG_WITH_CONNECTION(level) VLOG(level) << "[" << \
41 ip_endpoint_.ToString() << ", auth=" << channel_auth_ << "] " 42 ip_endpoint_.ToString() << ", auth=" << channel_auth_ << "] "
42 43
43 namespace { 44 namespace {
44 45
45 // The default keepalive delay. On Linux, keepalives probes will be sent after 46 // The default keepalive delay. On Linux, keepalives probes will be sent after
46 // the socket is idle for this length of time, and the socket will be closed 47 // the socket is idle for this length of time, and the socket will be closed
47 // after 9 failed probes. So the total idle time before close is 10 * 48 // after 9 failed probes. So the total idle time before close is 10 *
48 // kTcpKeepAliveDelaySecs. 49 // kTcpKeepAliveDelaySecs.
49 const int kTcpKeepAliveDelaySecs = 10; 50 const int kTcpKeepAliveDelaySecs = 10;
50
51 } // namespace 51 } // namespace
52 52
53 namespace extensions { 53 namespace extensions {
54 54
55 static base::LazyInstance<BrowserContextKeyedAPIFactory< 55 static base::LazyInstance<BrowserContextKeyedAPIFactory<
56 ApiResourceManager<core_api::cast_channel::CastSocket> > > g_factory = 56 ApiResourceManager<core_api::cast_channel::CastSocket> > > g_factory =
57 LAZY_INSTANCE_INITIALIZER; 57 LAZY_INSTANCE_INITIALIZER;
58 58
59 // static 59 // static
60 template <> 60 template <>
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
182 ChannelAuthType channel_auth, 182 ChannelAuthType channel_auth,
183 CastSocket::Delegate* delegate, 183 CastSocket::Delegate* delegate,
184 net::NetLog* net_log, 184 net::NetLog* net_log,
185 const base::TimeDelta& timeout, 185 const base::TimeDelta& timeout,
186 const scoped_refptr<Logger>& logger) 186 const scoped_refptr<Logger>& logger)
187 : ApiResource(owner_extension_id), 187 : ApiResource(owner_extension_id),
188 channel_id_(0), 188 channel_id_(0),
189 ip_endpoint_(ip_endpoint), 189 ip_endpoint_(ip_endpoint),
190 channel_auth_(channel_auth), 190 channel_auth_(channel_auth),
191 delegate_(delegate), 191 delegate_(delegate),
192 current_message_size_(0),
193 current_message_(new CastMessage()),
194 net_log_(net_log), 192 net_log_(net_log),
195 logger_(logger), 193 logger_(logger),
196 connect_timeout_(timeout), 194 connect_timeout_(timeout),
197 connect_timeout_timer_(new base::OneShotTimer<CastSocket>), 195 connect_timeout_timer_(new base::OneShotTimer<CastSocket>),
198 is_canceled_(false), 196 is_canceled_(false),
199 connect_state_(CONN_STATE_NONE), 197 connect_state_(CONN_STATE_NONE),
200 write_state_(WRITE_STATE_NONE), 198 write_state_(WRITE_STATE_NONE),
201 read_state_(READ_STATE_NONE), 199 read_state_(READ_STATE_NONE),
202 error_state_(CHANNEL_ERROR_NONE), 200 error_state_(CHANNEL_ERROR_NONE),
203 ready_state_(READY_STATE_NONE) { 201 ready_state_(READY_STATE_NONE) {
204 DCHECK(net_log_); 202 DCHECK(net_log_);
205 DCHECK(channel_auth_ == CHANNEL_AUTH_TYPE_SSL || 203 DCHECK(channel_auth_ == CHANNEL_AUTH_TYPE_SSL ||
206 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED); 204 channel_auth_ == CHANNEL_AUTH_TYPE_SSL_VERIFIED);
207 net_log_source_.type = net::NetLog::SOURCE_SOCKET; 205 net_log_source_.type = net::NetLog::SOURCE_SOCKET;
208 net_log_source_.id = net_log_->NextID(); 206 net_log_source_.id = net_log_->NextID();
209 207
210 // Reuse these buffers for each message. 208 // Buffer is reused across messages.
211 header_read_buffer_ = new net::GrowableIOBuffer(); 209 read_buffer_ = new net::GrowableIOBuffer();
212 header_read_buffer_->SetCapacity(MessageHeader::header_size()); 210 read_buffer_->SetCapacity(kMaxMessageSizeBytes);
213 body_read_buffer_ = new net::GrowableIOBuffer(); 211 framer_.reset(new MessageFramer(read_buffer_));
214 body_read_buffer_->SetCapacity(MessageHeader::max_message_size());
215 current_read_buffer_ = header_read_buffer_;
216 } 212 }
217 213
218 CastSocket::~CastSocket() { 214 CastSocket::~CastSocket() {
219 // Ensure that resources are freed but do not run pending callbacks to avoid 215 // Ensure that resources are freed but do not run pending callbacks to avoid
220 // any re-entrancy. 216 // any re-entrancy.
221 CloseInternal(); 217 CloseInternal();
222 } 218 }
223 219
224 ReadyState CastSocket::ready_state() const { 220 ReadyState CastSocket::ready_state() const {
225 return ready_state_; 221 return ready_state_;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
265 ip_endpoint_); 261 ip_endpoint_);
266 262
267 return net::ClientSocketFactory::GetDefaultFactory()->CreateSSLClientSocket( 263 return net::ClientSocketFactory::GetDefaultFactory()->CreateSSLClientSocket(
268 connection.Pass(), host_and_port, ssl_config, context); 264 connection.Pass(), host_and_port, ssl_config, context);
269 } 265 }
270 266
271 bool CastSocket::ExtractPeerCert(std::string* cert) { 267 bool CastSocket::ExtractPeerCert(std::string* cert) {
272 DCHECK(cert); 268 DCHECK(cert);
273 DCHECK(peer_cert_.empty()); 269 DCHECK(peer_cert_.empty());
274 net::SSLInfo ssl_info; 270 net::SSLInfo ssl_info;
275 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) 271 if (!socket_->GetSSLInfo(&ssl_info) || !ssl_info.cert.get()) {
276 return false; 272 return false;
273 }
277 274
278 logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED); 275 logger_->LogSocketEvent(channel_id_, proto::SSL_INFO_OBTAINED);
279 276
280 bool result = net::X509Certificate::GetDEREncoded( 277 bool result = net::X509Certificate::GetDEREncoded(
281 ssl_info.cert->os_cert_handle(), cert); 278 ssl_info.cert->os_cert_handle(), cert);
282 if (result) 279 if (result) {
283 VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: " 280 VLOG_WITH_CONNECTION(1) << "Successfully extracted peer certificate: "
284 << *cert; 281 << *cert;
282 }
285 283
286 logger_->LogSocketEventWithRv( 284 logger_->LogSocketEventWithRv(
287 channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0); 285 channel_id_, proto::DER_ENCODED_CERT_OBTAIN, result ? 1 : 0);
288 return result; 286 return result;
289 } 287 }
290 288
291 bool CastSocket::VerifyChallengeReply() { 289 bool CastSocket::VerifyChallengeReply() {
292 AuthResult result = AuthenticateChallengeReply(*challenge_reply_, peer_cert_); 290 AuthResult result = AuthenticateChallengeReply(*challenge_reply_, peer_cert_);
293 logger_->LogSocketChallengeReplyEvent(channel_id_, result); 291 logger_->LogSocketChallengeReplyEvent(channel_id_, result);
294 return result.success(); 292 return result.success();
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
481 void CastSocket::DoAuthChallengeSendWriteComplete(int result) { 479 void CastSocket::DoAuthChallengeSendWriteComplete(int result) {
482 send_auth_challenge_callback_.Cancel(); 480 send_auth_challenge_callback_.Cancel();
483 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result; 481 VLOG_WITH_CONNECTION(2) << "DoAuthChallengeSendWriteComplete: " << result;
484 DCHECK_GT(result, 0); 482 DCHECK_GT(result, 0);
485 DCHECK_EQ(write_queue_.size(), 1UL); 483 DCHECK_EQ(write_queue_.size(), 1UL);
486 PostTaskToStartConnectLoop(result); 484 PostTaskToStartConnectLoop(result);
487 } 485 }
488 486
489 int CastSocket::DoAuthChallengeSendComplete(int result) { 487 int CastSocket::DoAuthChallengeSendComplete(int result) {
490 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result; 488 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeSendComplete: " << result;
491 if (result < 0) 489 if (result < 0) {
492 return result; 490 return result;
491 }
493 SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE); 492 SetConnectState(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE);
494 493
495 // Post a task to start read loop so that DoReadLoop is not nested inside 494 // Post a task to start read loop so that DoReadLoop is not nested inside
496 // DoConnectLoop. This is not strictly necessary but keeps the read loop 495 // DoConnectLoop. This is not strictly necessary but keeps the read loop
497 // code decoupled from connect loop code. 496 // code decoupled from connect loop code.
498 PostTaskToStartReadLoop(); 497 PostTaskToStartReadLoop();
499 // Always return IO_PENDING since the result is always asynchronous. 498 // Always return IO_PENDING since the result is always asynchronous.
500 return net::ERR_IO_PENDING; 499 return net::ERR_IO_PENDING;
501 } 500 }
502 501
503 int CastSocket::DoAuthChallengeReplyComplete(int result) { 502 int CastSocket::DoAuthChallengeReplyComplete(int result) {
504 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result; 503 VLOG_WITH_CONNECTION(1) << "DoAuthChallengeReplyComplete: " << result;
505 if (result < 0) 504 if (result < 0) {
506 return result; 505 return result;
507 if (!VerifyChallengeReply()) 506 }
507 if (!VerifyChallengeReply()) {
508 return net::ERR_FAILED; 508 return net::ERR_FAILED;
509 }
509 VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded"; 510 VLOG_WITH_CONNECTION(1) << "Auth challenge verification succeeded";
510 return net::OK; 511 return net::OK;
511 } 512 }
512 513
513 void CastSocket::DoConnectCallback(int result) { 514 void CastSocket::DoConnectCallback(int result) {
514 SetReadyState((result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED); 515 SetReadyState((result == net::OK) ? READY_STATE_OPEN : READY_STATE_CLOSED);
515 if (result == net::OK) { 516 if (result == net::OK) {
516 SetErrorState(CHANNEL_ERROR_NONE); 517 SetErrorState(CHANNEL_ERROR_NONE);
517 PostTaskToStartReadLoop(); 518 PostTaskToStartReadLoop();
518 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback"; 519 VLOG_WITH_CONNECTION(1) << "Calling Connect_Callback";
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after
659 write_state_ != WRITE_STATE_NONE); 660 write_state_ != WRITE_STATE_NONE);
660 661
661 // No state change occurred in do-while loop above. This means state has 662 // No state change occurred in do-while loop above. This means state has
662 // transitioned to NONE. 663 // transitioned to NONE.
663 if (write_state_ == WRITE_STATE_NONE) { 664 if (write_state_ == WRITE_STATE_NONE) {
664 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); 665 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
665 } 666 }
666 667
667 // If write loop is done because the queue is empty then set write 668 // If write loop is done because the queue is empty then set write
668 // state to NONE 669 // state to NONE
669 if (write_queue_.empty()) 670 if (write_queue_.empty()) {
670 SetWriteState(WRITE_STATE_NONE); 671 SetWriteState(WRITE_STATE_NONE);
672 }
671 673
672 // Write loop is done - if the result is ERR_FAILED then close with error. 674 // Write loop is done - if the result is ERR_FAILED then close with error.
673 if (rv == net::ERR_FAILED) 675 if (rv == net::ERR_FAILED) {
674 CloseWithError(); 676 CloseWithError();
677 }
675 } 678 }
676 679
677 int CastSocket::DoWrite() { 680 int CastSocket::DoWrite() {
678 DCHECK(!write_queue_.empty()); 681 DCHECK(!write_queue_.empty());
679 WriteRequest& request = write_queue_.front(); 682 WriteRequest& request = write_queue_.front();
680 683
681 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " 684 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
682 << request.io_buffer->size() << " bytes_written " 685 << request.io_buffer->size() << " bytes_written "
683 << request.io_buffer->BytesConsumed(); 686 << request.io_buffer->BytesConsumed();
684 687
(...skipping 13 matching lines...) Expand all
698 if (result <= 0) { // NOTE that 0 also indicates an error 701 if (result <= 0) { // NOTE that 0 also indicates an error
699 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); 702 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
700 SetWriteState(WRITE_STATE_ERROR); 703 SetWriteState(WRITE_STATE_ERROR);
701 return result == 0 ? net::ERR_FAILED : result; 704 return result == 0 ? net::ERR_FAILED : result;
702 } 705 }
703 706
704 // Some bytes were successfully written 707 // Some bytes were successfully written
705 WriteRequest& request = write_queue_.front(); 708 WriteRequest& request = write_queue_.front();
706 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; 709 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
707 io_buffer->DidConsume(result); 710 io_buffer->DidConsume(result);
708 if (io_buffer->BytesRemaining() == 0) // Message fully sent 711 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
709 SetWriteState(WRITE_STATE_DO_CALLBACK); 712 SetWriteState(WRITE_STATE_DO_CALLBACK);
710 else 713 } else {
711 SetWriteState(WRITE_STATE_WRITE); 714 SetWriteState(WRITE_STATE_WRITE);
715 }
712 716
713 return net::OK; 717 return net::OK;
714 } 718 }
715 719
716 int CastSocket::DoWriteCallback() { 720 int CastSocket::DoWriteCallback() {
717 DCHECK(!write_queue_.empty()); 721 DCHECK(!write_queue_.empty());
718 722
719 SetWriteState(WRITE_STATE_WRITE); 723 SetWriteState(WRITE_STATE_WRITE);
720 724
721 WriteRequest& request = write_queue_.front(); 725 WriteRequest& request = write_queue_.front();
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
817 } else { 821 } else {
818 // Connection is already established. Close and send error status via the 822 // Connection is already established. Close and send error status via the
819 // OnError delegate. 823 // OnError delegate.
820 CloseWithError(); 824 CloseWithError();
821 } 825 }
822 } 826 }
823 } 827 }
824 828
825 int CastSocket::DoRead() { 829 int CastSocket::DoRead() {
826 SetReadState(READ_STATE_READ_COMPLETE); 830 SetReadState(READ_STATE_READ_COMPLETE);
827 // Figure out whether to read header or body, and the remaining bytes. 831
828 uint32 num_bytes_to_read = 0; 832 // Determine how many bytes need to be read.
829 if (header_read_buffer_->RemainingCapacity() > 0) { 833 size_t num_bytes_to_read = framer_->BytesRequested();
830 current_read_buffer_ = header_read_buffer_;
831 num_bytes_to_read = header_read_buffer_->RemainingCapacity();
832 CHECK_LE(num_bytes_to_read, MessageHeader::header_size());
833 } else {
834 DCHECK_GT(current_message_size_, 0U);
835 num_bytes_to_read = current_message_size_ - body_read_buffer_->offset();
836 current_read_buffer_ = body_read_buffer_;
837 CHECK_LE(num_bytes_to_read, MessageHeader::max_message_size());
838 }
839 CHECK_GT(num_bytes_to_read, 0U);
840 834
841 // Read up to num_bytes_to_read into |current_read_buffer_|. 835 // Read up to num_bytes_to_read into |current_read_buffer_|.
842 int rv = socket_->Read( 836 int rv = socket_->Read(
843 current_read_buffer_.get(), 837 read_buffer_.get(),
844 num_bytes_to_read, 838 base::checked_cast<uint32>(num_bytes_to_read),
845 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this))); 839 base::Bind(&CastSocket::DoReadLoop, base::Unretained(this)));
846 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv); 840 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, rv);
847 841
848 return rv; 842 return rv;
849 } 843 }
850 844
851 int CastSocket::DoReadComplete(int result) { 845 int CastSocket::DoReadComplete(int result) {
852 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result 846 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
853 << " header offset = " 847
854 << header_read_buffer_->offset()
855 << " body offset = " << body_read_buffer_->offset();
856 if (result <= 0) { // 0 means EOF: the peer closed the socket 848 if (result <= 0) { // 0 means EOF: the peer closed the socket
857 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket"; 849 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket";
858 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); 850 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
859 SetReadState(READ_STATE_ERROR); 851 SetReadState(READ_STATE_ERROR);
860 return result == 0 ? net::ERR_FAILED : result; 852 return result == 0 ? net::ERR_FAILED : result;
861 } 853 }
862 854
863 // Some data was read. Move the offset in the current buffer forward. 855 size_t message_size;
864 CHECK_LE(current_read_buffer_->offset() + result, 856 DCHECK(current_message_.get() == NULL);
865 current_read_buffer_->capacity()); 857 current_message_ = framer_->Ingest(result, &message_size, &error_state_);
866 current_read_buffer_->set_offset(current_read_buffer_->offset() + result); 858 if (current_message_.get()) {
867 859 DCHECK_EQ(error_state_, CHANNEL_ERROR_NONE);
868 if (current_read_buffer_.get() == header_read_buffer_.get() && 860 DCHECK_GT(message_size, static_cast<size_t>(0));
869 current_read_buffer_->RemainingCapacity() == 0) { 861 logger_->LogSocketEventForMessage(
870 // A full header is read, process the contents. 862 channel_id_,
871 if (!ProcessHeader()) { 863 proto::MESSAGE_READ,
872 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); 864 current_message_->namespace_(),
873 SetReadState(READ_STATE_ERROR); 865 base::StringPrintf("Message size: %zu", message_size));
874 } else { 866 SetReadState(READ_STATE_DO_CALLBACK);
875 // Processed header, now read the body. 867 } else if (error_state_ != CHANNEL_ERROR_NONE) {
876 SetReadState(READ_STATE_READ); 868 DCHECK(current_message_.get() == NULL);
877 } 869 SetReadState(READ_STATE_ERROR);
878 } else if (current_read_buffer_.get() == body_read_buffer_.get() &&
879 static_cast<uint32>(current_read_buffer_->offset()) ==
880 current_message_size_) {
881 // Store a copy of current_message_size_ since it will be reset by
882 // ProcessBody().
883 uint32 message_size = current_message_size_;
884 // Full body is read, process the contents.
885 if (ProcessBody()) {
886 logger_->LogSocketEventForMessage(
887 channel_id_,
888 proto::MESSAGE_READ,
889 current_message_->namespace_(),
890 base::StringPrintf("Message size: %u", message_size));
891 SetReadState(READ_STATE_DO_CALLBACK);
892 } else {
893 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
894 SetReadState(READ_STATE_ERROR);
895 }
896 } else { 870 } else {
897 // Have not received full header or full body yet; keep reading. 871 DCHECK(current_message_.get() == NULL);
898 SetReadState(READ_STATE_READ); 872 SetReadState(READ_STATE_READ);
899 } 873 }
900
901 return net::OK; 874 return net::OK;
902 } 875 }
903 876
904 int CastSocket::DoReadCallback() { 877 int CastSocket::DoReadCallback() {
905 SetReadState(READ_STATE_READ); 878 SetReadState(READ_STATE_READ);
906 const CastMessage& message = *current_message_; 879 const CastMessage& message = *current_message_;
907 if (ready_state_ == READY_STATE_CONNECTING) { 880 if (ready_state_ == READY_STATE_CONNECTING) {
908 if (IsAuthMessage(message)) { 881 if (IsAuthMessage(message)) {
909 challenge_reply_.reset(new CastMessage(message)); 882 challenge_reply_.reset(new CastMessage(message));
910 logger_->LogSocketEvent(channel_id_, proto::RECEIVED_CHALLENGE_REPLY); 883 logger_->LogSocketEvent(channel_id_, proto::RECEIVED_CHALLENGE_REPLY);
911 PostTaskToStartConnectLoop(net::OK); 884 PostTaskToStartConnectLoop(net::OK);
885 current_message_.reset();
912 return net::OK; 886 return net::OK;
913 } else { 887 } else {
914 SetReadState(READ_STATE_ERROR); 888 SetReadState(READ_STATE_ERROR);
915 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); 889 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
890 current_message_.reset();
916 return net::ERR_INVALID_RESPONSE; 891 return net::ERR_INVALID_RESPONSE;
917 } 892 }
918 } 893 }
919 894
920 MessageInfo message_info; 895 MessageInfo message_info;
921 if (!CastMessageToMessageInfo(message, &message_info)) { 896 if (!CastMessageToMessageInfo(message, &message_info)) {
922 current_message_->Clear(); 897 current_message_.reset();
923 SetReadState(READ_STATE_ERROR); 898 SetReadState(READ_STATE_ERROR);
924 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); 899 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
925 return net::ERR_INVALID_RESPONSE; 900 return net::ERR_INVALID_RESPONSE;
926 } 901 }
927 902
928 logger_->LogSocketEventForMessage(channel_id_, 903 logger_->LogSocketEventForMessage(channel_id_,
929 proto::NOTIFY_ON_MESSAGE, 904 proto::NOTIFY_ON_MESSAGE,
930 message.namespace_(), 905 message.namespace_(),
931 std::string()); 906 std::string());
932 delegate_->OnMessage(this, message_info); 907 delegate_->OnMessage(this, message_info);
933 current_message_->Clear(); 908 current_message_.reset();
934 909
935 return net::OK; 910 return net::OK;
936 } 911 }
937 912
938 int CastSocket::DoReadError(int result) { 913 int CastSocket::DoReadError(int result) {
939 DCHECK_LE(result, 0); 914 DCHECK_LE(result, 0);
940 return net::ERR_FAILED; 915 return net::ERR_FAILED;
941 } 916 }
942 917
943 bool CastSocket::ProcessHeader() {
944 CHECK_EQ(static_cast<uint32>(header_read_buffer_->offset()),
945 MessageHeader::header_size());
946 MessageHeader header;
947 MessageHeader::ReadFromIOBuffer(header_read_buffer_.get(), &header);
948 if (header.message_size > MessageHeader::max_message_size())
949 return false;
950 918
951 VLOG_WITH_CONNECTION(2) << "Parsed header { message_size: "
952 << header.message_size << " }";
953 current_message_size_ = header.message_size;
954 return true;
955 }
956
957 bool CastSocket::ProcessBody() {
958 CHECK_EQ(static_cast<uint32>(body_read_buffer_->offset()),
959 current_message_size_);
960 if (!current_message_->ParseFromArray(
961 body_read_buffer_->StartOfBuffer(), current_message_size_)) {
962 return false;
963 }
964 current_message_size_ = 0;
965 header_read_buffer_->set_offset(0);
966 body_read_buffer_->set_offset(0);
967 current_read_buffer_ = header_read_buffer_;
968 return true;
969 }
970
971 // static
972 bool CastSocket::Serialize(const CastMessage& message_proto,
973 std::string* message_data) {
974 DCHECK(message_data);
975 message_proto.SerializeToString(message_data);
976 size_t message_size = message_data->size();
977 if (message_size > MessageHeader::max_message_size()) {
978 message_data->clear();
979 return false;
980 }
981 CastSocket::MessageHeader header;
982 header.SetMessageSize(message_size);
983 header.PrependToString(message_data);
984 return true;
985 }
986 919
987 void CastSocket::CloseWithError() { 920 void CastSocket::CloseWithError() {
988 DCHECK(CalledOnValidThread()); 921 DCHECK(CalledOnValidThread());
989 CloseInternal(); 922 CloseInternal();
990 RunPendingCallbacksOnClose(); 923 RunPendingCallbacksOnClose();
991 if (delegate_) { 924 if (delegate_) {
992 logger_->LogSocketEvent(channel_id_, proto::NOTIFY_ON_ERROR); 925 logger_->LogSocketEvent(channel_id_, proto::NOTIFY_ON_ERROR);
993 delegate_->OnError(this, error_state_, logger_->GetLastErrors(channel_id_)); 926 delegate_->OnError(this, error_state_, logger_->GetLastErrors(channel_id_));
994 } 927 }
995 } 928 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1036 } 969 }
1037 } 970 }
1038 971
1039 void CastSocket::SetWriteState(WriteState write_state) { 972 void CastSocket::SetWriteState(WriteState write_state) {
1040 if (write_state_ != write_state) { 973 if (write_state_ != write_state) {
1041 write_state_ = write_state; 974 write_state_ = write_state;
1042 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); 975 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
1043 } 976 }
1044 } 977 }
1045 978
1046 CastSocket::MessageHeader::MessageHeader() : message_size(0) { }
1047
1048 void CastSocket::MessageHeader::SetMessageSize(size_t size) {
1049 DCHECK_LT(size, static_cast<size_t>(kuint32max));
1050 DCHECK_GT(size, 0U);
1051 message_size = size;
1052 }
1053
1054 // TODO(mfoltz): Investigate replacing header serialization with base::Pickle,
1055 // if bit-for-bit compatible.
1056 void CastSocket::MessageHeader::PrependToString(std::string* str) {
1057 MessageHeader output = *this;
1058 output.message_size = base::HostToNet32(message_size);
1059 size_t header_size = base::checked_cast<size_t, uint32>(
1060 MessageHeader::header_size());
1061 scoped_ptr<char, base::FreeDeleter> char_array(
1062 static_cast<char*>(malloc(header_size)));
1063 memcpy(char_array.get(), &output, header_size);
1064 str->insert(0, char_array.get(), header_size);
1065 }
1066
1067 // TODO(mfoltz): Investigate replacing header deserialization with base::Pickle,
1068 // if bit-for-bit compatible.
1069 void CastSocket::MessageHeader::ReadFromIOBuffer(
1070 net::GrowableIOBuffer* buffer, MessageHeader* header) {
1071 uint32 message_size;
1072 size_t header_size = base::checked_cast<size_t, uint32>(
1073 MessageHeader::header_size());
1074 memcpy(&message_size, buffer->StartOfBuffer(), header_size);
1075 header->message_size = base::NetToHost32(message_size);
1076 }
1077
1078 std::string CastSocket::MessageHeader::ToString() {
1079 return "{message_size: " + base::UintToString(message_size) + "}";
1080 }
1081
1082 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback) 979 CastSocket::WriteRequest::WriteRequest(const net::CompletionCallback& callback)
1083 : callback(callback) { } 980 : callback(callback) { }
1084 981
1085 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) { 982 bool CastSocket::WriteRequest::SetContent(const CastMessage& message_proto) {
1086 DCHECK(!io_buffer.get()); 983 DCHECK(!io_buffer.get());
1087 std::string message_data; 984 std::string message_data;
1088 if (!Serialize(message_proto, &message_data)) 985 if (!MessageFramer::Serialize(message_proto, &message_data)) {
1089 return false; 986 return false;
987 }
1090 message_namespace = message_proto.namespace_(); 988 message_namespace = message_proto.namespace_();
1091 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data), 989 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(message_data),
1092 message_data.size()); 990 message_data.size());
1093 return true; 991 return true;
1094 } 992 }
1095 993
1096 CastSocket::WriteRequest::~WriteRequest() { } 994 CastSocket::WriteRequest::~WriteRequest() { }
1097
1098 } // namespace cast_channel 995 } // namespace cast_channel
1099 } // namespace core_api 996 } // namespace core_api
1100 } // namespace extensions 997 } // namespace extensions
1101 998
1102 #undef VLOG_WITH_CONNECTION 999 #undef VLOG_WITH_CONNECTION
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698